Skip to content

Collector notebook

fabric/notebooks/FDA_Collector.py is the heart of the pipeline: a scheduled Fabric notebook, authenticated as a service principal, that pulls the three capture surfaces, correlates them into the curated triple, ingests everything into the Eventhouse, and advances per-source watermarks.

Design principle — never fatal

Every source is wrapped so a missing or preview surface is skipped and logged, not fatal. A run with only workspace monitoring available still lands DAX rows; a run with only Graph available lands question/answer text.

Running it

The file is a Fabric notebook saved as .py with cells split on # %%. To use it:

  1. Import the .py into a Fabric notebook (cells split on # %%).
  2. On first run inside Fabric, uncomment the install line: %pip install -U azure-identity azure-kusto-data azure-kusto-ingest msal requests
  3. Set the # %% [parameters] cell (see Configuration).
  4. Run once interactively to verify, then attach a schedule (every 15–60 min) — via the Fabric UI or Deploy-FdaObservability.ps1 -ScheduleNotebook.

Cell-by-cell

Cell Responsibility
[parameters] All tunables — identity, endpoints, user set, behaviour.
[imports] Imports; detects the Fabric runtime (notebookutils) and the optional ingest package; computes UTCNOW and the LOOKBACK_FROM window.
[auth] ClientSecretCredential + per-scope token helper; Kusto client factory (SP key auth); lazily-built queued-ingest client.
[ingest-helper] Per-table column order, _prep (serialize dynamic columns, reorder), queued vs inline ingest with one-time fallback, watermark get/set.
[source-A-graph] resolve_users (group expansion) + pull_graph (paged Graph export with 429 back-off).
[source-B-audit] pull_audit — ensure subscription, list content blobs, keep CopilotInteraction records.
[source-C-monitoring] pull_executed_dax — query SemanticModelLogs for executed DAX + perf.
[correlate] corr_key + build_curated — pair prompts/responses, window-join DAX, score confidence, keep orphans.
[main] Orchestration with per-source fatal-guards, de-dup before append, curated ingest, watermark advance, summary log.

Authentication

_CRED = ClientSecretCredential(TENANT_ID, CLIENT_ID, _secret())

_secret() resolves the client secret in priority order: inline (prototype only) → Key Vault via notebookutils.credentials.getSecret (in Fabric) → FDA_CLIENT_SECRET env var (outside Fabric). Tokens are acquired per scope (graph.microsoft.com, manage.office.com), and Kusto connections use the SP's application-key auth against both the destination Eventhouse and the monitoring cluster.

Ingestion

ingest_df(table, df, dynamic_cols) is the single ingest path:

  • _prep JSON-serializes the dynamic columns, ensures every expected column exists, and reorders to the table's authoritative column order (positional CSV ingest depends on this).
  • Queued ingestion (azure-kusto-ingest, via the https://ingest-<cluster>… DM endpoint) is the default and is forced for batches ≥ INGEST_QUEUED_THRESHOLD (200) — it writes a quoted CSV that survives multiline DAX and embedded JSON, and is retried server-side.
  • Inline management ingestion (.ingest inline into table …) is used for tiny batches or when the ingest package is unavailable.
  • On failure, the other path is tried once before giving up.

Source A — Graph (pull_graph)

  1. resolve_users expands FDA_USER_GROUP_ID (transitive members, paged, 429-aware) and merges with FDA_USERS, de-duplicated. No users → the source is skipped.
  2. For each user, pages GET /copilot/users/{user}/interactionHistory/getAllEnterpriseInteractions filtered by the graph watermark window, honouring Retry-After on 429.
  3. Keeps rows whose appClass is in GRAPH_APP_CLASSES; maps to Raw_GraphInteractions; advances the watermark.

Source B — Audit (pull_audit)

  1. Ensures the Audit.General content subscription exists (idempotent POST …/subscriptions/start).
  2. Lists content blobs for the audit watermark window, fetches each, and keeps records where Operation == "CopilotInteraction".
  3. Extracts agent id from AISystemPlugin, applies the optional AGENT_NAME_FILTER, maps to Raw_AuditInteractions, advances the watermark.

Source C — Monitoring (pull_executed_dax)

Queries the monitoring cluster's SemanticModelLogs for the monitoring watermark window where OperationName in ("QueryEnd","ExecuteQueryEnd","CommandEnd") and EventText has "EVALUATE", optionally filtered to SEMANTIC_MODEL_IDS. Projects to the Raw_ExecutedDax shape (Dax = EventText, ExecutingUser = coalesce(ExecutingUser, User), etc.). If the query fails (e.g. monitoring not enabled), the source is skipped with a log line.

Correlation — build_curated

The pairing, window-join, confidence scoring, and orphan handling are documented end-to-end on the Correlation model page. Output is a DataFrame in FdaInteractions shape.

Orchestration — main

graph_df = pull_graph()        # each wrapped in a fatal-guard try/except
audit_df = pull_audit()
dax_df   = pull_executed_dax()
curated  = build_curated(graph_df, audit_df, dax_df)
# de-dup curated InteractionIds already in FdaInteractions, then ingest + advance 'curated' watermark

Each source pull is wrapped so an exception degrades to an empty frame rather than aborting the run. Before appending, the collector drops curated rows whose InteractionId already exists in FdaInteractions. The run ends with a summary line: graph=… audit=… dax=… curated=….

First-run order

Provision the schema (01/02) before the first collector run, or ingestion has nowhere to land. Then run the notebook once interactively and confirm the summary log before scheduling.