Collector notebook¶
fabric/notebooks/FDA_Collector.py
is the heart of the pipeline: a scheduled Fabric notebook, authenticated as a service principal, that pulls the
three capture surfaces, correlates them into the curated triple, ingests everything into the Eventhouse, and
advances per-source watermarks.
Design principle — never fatal
Every source is wrapped so a missing or preview surface is skipped and logged, not fatal. A run with only workspace monitoring available still lands DAX rows; a run with only Graph available lands question/answer text.
Running it¶
The file is a Fabric notebook saved as .py with cells split on # %%. To use it:
- Import the
.pyinto a Fabric notebook (cells split on# %%). - On first run inside Fabric, uncomment the install line:
%pip install -U azure-identity azure-kusto-data azure-kusto-ingest msal requests - Set the
# %% [parameters]cell (see Configuration). - Run once interactively to verify, then attach a schedule (every 15–60 min) — via the Fabric UI or
Deploy-FdaObservability.ps1 -ScheduleNotebook.
Cell-by-cell¶
| Cell | Responsibility |
|---|---|
[parameters] |
All tunables — identity, endpoints, user set, behaviour. |
[imports] |
Imports; detects the Fabric runtime (notebookutils) and the optional ingest package; computes UTCNOW and the LOOKBACK_FROM window. |
[auth] |
ClientSecretCredential + per-scope token helper; Kusto client factory (SP key auth); lazily-built queued-ingest client. |
[ingest-helper] |
Per-table column order, _prep (serialize dynamic columns, reorder), queued vs inline ingest with one-time fallback, watermark get/set. |
[source-A-graph] |
resolve_users (group expansion) + pull_graph (paged Graph export with 429 back-off). |
[source-B-audit] |
pull_audit — ensure subscription, list content blobs, keep CopilotInteraction records. |
[source-C-monitoring] |
pull_executed_dax — query SemanticModelLogs for executed DAX + perf. |
[correlate] |
corr_key + build_curated — pair prompts/responses, window-join DAX, score confidence, keep orphans. |
[main] |
Orchestration with per-source fatal-guards, de-dup before append, curated ingest, watermark advance, summary log. |
Authentication¶
_secret() resolves the client secret in priority order: inline (prototype only) → Key Vault via
notebookutils.credentials.getSecret (in Fabric) → FDA_CLIENT_SECRET env var (outside Fabric). Tokens are
acquired per scope (graph.microsoft.com, manage.office.com), and Kusto connections use the SP's
application-key auth against both the destination Eventhouse and the monitoring cluster.
Ingestion¶
ingest_df(table, df, dynamic_cols) is the single ingest path:
_prepJSON-serializes thedynamiccolumns, ensures every expected column exists, and reorders to the table's authoritative column order (positional CSV ingest depends on this).- Queued ingestion (
azure-kusto-ingest, via thehttps://ingest-<cluster>…DM endpoint) is the default and is forced for batches ≥INGEST_QUEUED_THRESHOLD(200) — it writes a quoted CSV that survives multiline DAX and embedded JSON, and is retried server-side. - Inline management ingestion (
.ingest inline into table …) is used for tiny batches or when the ingest package is unavailable. - On failure, the other path is tried once before giving up.
Source A — Graph (pull_graph)¶
resolve_usersexpandsFDA_USER_GROUP_ID(transitive members, paged, 429-aware) and merges withFDA_USERS, de-duplicated. No users → the source is skipped.- For each user, pages
GET /copilot/users/{user}/interactionHistory/getAllEnterpriseInteractionsfiltered by thegraphwatermark window, honouringRetry-Afteron 429. - Keeps rows whose
appClassis inGRAPH_APP_CLASSES; maps toRaw_GraphInteractions; advances the watermark.
Source B — Audit (pull_audit)¶
- Ensures the
Audit.Generalcontent subscription exists (idempotentPOST …/subscriptions/start). - Lists content blobs for the
auditwatermark window, fetches each, and keeps records whereOperation == "CopilotInteraction". - Extracts agent id from
AISystemPlugin, applies the optionalAGENT_NAME_FILTER, maps toRaw_AuditInteractions, advances the watermark.
Source C — Monitoring (pull_executed_dax)¶
Queries the monitoring cluster's SemanticModelLogs for the monitoring watermark window where
OperationName in ("QueryEnd","ExecuteQueryEnd","CommandEnd") and EventText has "EVALUATE", optionally filtered
to SEMANTIC_MODEL_IDS. Projects to the Raw_ExecutedDax shape (Dax = EventText,
ExecutingUser = coalesce(ExecutingUser, User), etc.). If the query fails (e.g. monitoring not enabled), the source
is skipped with a log line.
Correlation — build_curated¶
The pairing, window-join, confidence scoring, and orphan handling are documented end-to-end on the
Correlation model page. Output is a DataFrame in FdaInteractions shape.
Orchestration — main¶
graph_df = pull_graph() # each wrapped in a fatal-guard try/except
audit_df = pull_audit()
dax_df = pull_executed_dax()
curated = build_curated(graph_df, audit_df, dax_df)
# de-dup curated InteractionIds already in FdaInteractions, then ingest + advance 'curated' watermark
Each source pull is wrapped so an exception degrades to an empty frame rather than aborting the run. Before
appending, the collector drops curated rows whose InteractionId already exists in FdaInteractions. The run ends
with a summary line: graph=… audit=… dax=… curated=….
First-run order
Provision the schema (01/02) before the first collector run, or ingestion has nowhere to land. Then run
the notebook once interactively and confirm the summary log before scheduling.