cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to Track Hourly or Daily # of Upsert/Delete Metrics in a DLT Streaming Pipeline

_its_akshaye
New Contributor

We created a Delta Live Tables (DLT) streaming pipeline to ingest data from the Bronze layer to the Silver layer using Change Data Feed (CDF) enabled.

The stream runs continuously and shows # of upserted and deleted rows at an aggregate level from the time it started. Is there a way to capture these metrics at a daily or hourly level to better understand the ingestion load on the stream?

2 REPLIES 2

Saritha_S
Databricks Employee
Databricks Employee

Hi @_its_akshaye 

Yesโ€”capture it from the DLT event log and derive it directly from the target tableโ€™s CDF, then aggregate by time.

 

Options that work well

  • Use the DLT event log โ€œrows writtenโ€ metrics
    • Every pipeline writes a structured event log to your pipeline storage/UC TVF that includes per-table batch stats such as rows written/processed per update. You can query it and bucket by hour/day to get upsert/delete counts over time.
  • Derive metrics from the Silver tableโ€™s Change Data Feed (CDF)
    • CDF exposes row-level changes with _change_type values like insert, delete, update_preimage, update_postimage, which you can group by a timestamp column (for example, your sequencing column) to produce hourly/daily counts of inserts/updates/deletes that actually landed in Silver.

Example queries

Below are templates you can run as-is after replacing identifiers; they produce hourly metrics. Switch date_trunc('hour', ...) to 'day' for daily.

1) From the DLT event log

  • If your pipeline is in Unity Catalog, use the TVF: SQL: SELECT date_trunc('hour', timestamp) AS hour_bucket, details:flow_progress.metrics.num_output_rows::bigint AS rows_written, details:flow_progress.output_dataset AS output_dataset FROM event_log('<pipeline_id>') WHERE event_type = 'flow_progress' AND details:flow_progress.metrics.num_output_rows IS NOT NULL QUALIFY output_dataset = '<your_silver_table_uc_name>' GROUP BY 1, 3 ORDER BY 1;
  • If your pipeline uses HMS storage paths, read the Delta table at system/events: SQL: SELECT date_trunc('hour', timestamp) AS hour_bucket, details:flow_progress.metrics.num_output_rows::bigint AS rows_written, details:flow_progress.output_dataset AS output_dataset FROM delta.'dbfs:/pipelines/<pipeline-id>/system/events' WHERE event_type = 'flow_progress' AND details:flow_progress.metrics.num_output_rows IS NOT NULL AND details:flow_progress.output_dataset = '<your_silver_table_uc_name>' GROUP BY 1, 3 ORDER BY 1;

Notes:

  • flow_progress carries structured metrics for each table update; num_output_rows aligns with what the UI shows and is easy to roll up by time buckets.
  • If you need to separate updates vs deletes precisely, the event log provides totals written per output dataset, but not semantically split by operation type. For that breakdown, use CDF (next option).

2) From the Silver tableโ€™s CDF (semantic split: insert/update/delete)

SQL: SELECT date_trunc('hour', COALESCE(_commit_timestamp, <your_event_ts>)) AS hour_bucket, SUM(CASE WHEN _change_type = 'insert' THEN 1 ELSE 0 END) AS inserts, SUM(CASE WHEN _change_type = 'delete' THEN 1 ELSE 0 END) AS deletes, SUM(CASE WHEN _change_type IN ('update_postimage') THEN 1 ELSE 0 END) AS updates FROM table_changes('<catalog>.<schema>.<silver_table>', START => '2024-12-01') GROUP BY 1 ORDER BY 1;

 

Tips:

  • _commit_timestamp exists on modern runtimes; otherwise use your sequencing column (e.g., operation_date) as the time reference.
  • For updates, you typically count update_postimage rows; update_preimage rows represent the before-image and usually arenโ€™t counted as โ€œapplied rows.โ€
  • Retention: CDF is subject to the tableโ€™s retention policy and VACUUM; keep that in mind for long lookbacks.

When to use which

  • Need exact operation split (insert/update/delete) at Silver: use CDF on the Silver table and aggregate by hour/day.
  • Need a lightweight, โ€œwhat was written per batchโ€ view that mirrors the DLT UI: query event_log flow_progress and bucket by time (hour/day).

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @_its_akshaye,

The pipeline event log captures exactly what you need. Every Lakeflow Spark Declarative Pipeline (SDP, formerly known as DLT) records flow_progress events that include per-flow metrics with num_upserted_rows and num_deleted_rows fields. You can query these and bucket them by hour or day.

OPTION 1: QUERY THE EVENT LOG (RECOMMENDED)

The event log's flow_progress events contain a metrics object with these fields:

num_output_rows    - total output rows written
num_upserted_rows  - rows upserted into the target
num_deleted_rows   - rows deleted from the target

If your pipeline is registered in Unity Catalog, use the event_log() table-valued function:

SELECT
date_trunc('hour', timestamp) AS hour_bucket,
details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
details:flow_progress.flow_name AS flow_name
FROM event_log('<your_pipeline_id>')
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY 1, 2, 3, 4, 5
ORDER BY 1

Replace '<your_pipeline_id>' with either your pipeline ID or the fully qualified table name if you have published the event log. Switch 'hour' to 'day' for daily granularity.

For Hive metastore pipelines (not Unity Catalog), you can read the event log directly as a Delta table:

SELECT
date_trunc('hour', timestamp) AS hour_bucket,
details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
details:flow_progress.flow_name AS flow_name
FROM delta.`dbfs:/pipelines/<pipeline-id>/system/events`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY 1, 2, 3, 4, 5
ORDER BY 1

To aggregate into true hourly/daily totals (summing across multiple micro-batches), wrap the query:

SELECT
hour_bucket,
flow_name,
SUM(upserted_rows) AS total_upserted,
SUM(deleted_rows) AS total_deleted,
SUM(total_output_rows) AS total_output
FROM (
SELECT
  date_trunc('hour', timestamp) AS hour_bucket,
  details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
  details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
  details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
  details:flow_progress.flow_name AS flow_name
FROM event_log('<your_pipeline_id>')
WHERE event_type = 'flow_progress'
  AND details:flow_progress.status = 'COMPLETED'
)
GROUP BY hour_bucket, flow_name
ORDER BY hour_bucket

OPTION 2: QUERY CHANGE DATA FEED ON THE SILVER TABLE

If you also want to see the semantic breakdown of inserts vs. updates vs. deletes from the target table's perspective, you can query the Silver table's Change Data Feed directly using table_changes():

SELECT
date_trunc('hour', _commit_timestamp) AS hour_bucket,
SUM(CASE WHEN _change_type = 'insert' THEN 1 ELSE 0 END) AS inserts,
SUM(CASE WHEN _change_type = 'update_postimage' THEN 1 ELSE 0 END) AS updates,
SUM(CASE WHEN _change_type = 'delete' THEN 1 ELSE 0 END) AS deletes
FROM table_changes('<catalog>.<schema>.<silver_table>', 1)
GROUP BY 1
ORDER BY 1

Notes on CDF:
- Count update_postimage for updates (update_preimage is the before-image).
- The second argument can be a version number or a timestamp string.
- CDF data is subject to the table's data retention and VACUUM settings, so keep that in mind for long lookback periods.

WHICH APPROACH TO USE

- Event log (Option 1): lightweight, gives you batch-level throughput numbers that match what the pipeline UI shows. Best for monitoring ingestion load over time.
- CDF (Option 2): gives you the exact semantic split of insert/update/delete at the row level from the target table. Best when you need precise operation-type breakdowns.

You can combine both for a complete picture.

DOCUMENTATION REFERENCES

Event log schema (includes the full FlowMetrics specification):
https://docs.databricks.com/aws/en/ldp/monitor-event-log-schema

Querying the event log:
https://docs.databricks.com/aws/en/ldp/monitor-event-logs

Change Data Feed:
https://docs.databricks.com/aws/en/delta/delta-change-data-feed.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.