Hi @_its_akshaye,
The pipeline event log captures exactly what you need. Every Lakeflow Spark Declarative Pipeline (SDP, formerly known as DLT) records flow_progress events that include per-flow metrics with num_upserted_rows and num_deleted_rows fields. You can query these and bucket them by hour or day.
OPTION 1: QUERY THE EVENT LOG (RECOMMENDED)
The event log's flow_progress events contain a metrics object with these fields:
num_output_rows - total output rows written
num_upserted_rows - rows upserted into the target
num_deleted_rows - rows deleted from the target
If your pipeline is registered in Unity Catalog, use the event_log() table-valued function:
SELECT
date_trunc('hour', timestamp) AS hour_bucket,
details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
details:flow_progress.flow_name AS flow_name
FROM event_log('<your_pipeline_id>')
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY 1, 2, 3, 4, 5
ORDER BY 1
Replace '<your_pipeline_id>' with either your pipeline ID or the fully qualified table name if you have published the event log. Switch 'hour' to 'day' for daily granularity.
For Hive metastore pipelines (not Unity Catalog), you can read the event log directly as a Delta table:
SELECT
date_trunc('hour', timestamp) AS hour_bucket,
details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
details:flow_progress.flow_name AS flow_name
FROM delta.`dbfs:/pipelines/<pipeline-id>/system/events`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY 1, 2, 3, 4, 5
ORDER BY 1
To aggregate into true hourly/daily totals (summing across multiple micro-batches), wrap the query:
SELECT
hour_bucket,
flow_name,
SUM(upserted_rows) AS total_upserted,
SUM(deleted_rows) AS total_deleted,
SUM(total_output_rows) AS total_output
FROM (
SELECT
date_trunc('hour', timestamp) AS hour_bucket,
details:flow_progress.metrics.num_upserted_rows::bigint AS upserted_rows,
details:flow_progress.metrics.num_deleted_rows::bigint AS deleted_rows,
details:flow_progress.metrics.num_output_rows::bigint AS total_output_rows,
details:flow_progress.flow_name AS flow_name
FROM event_log('<your_pipeline_id>')
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
)
GROUP BY hour_bucket, flow_name
ORDER BY hour_bucket
OPTION 2: QUERY CHANGE DATA FEED ON THE SILVER TABLE
If you also want to see the semantic breakdown of inserts vs. updates vs. deletes from the target table's perspective, you can query the Silver table's Change Data Feed directly using table_changes():
SELECT
date_trunc('hour', _commit_timestamp) AS hour_bucket,
SUM(CASE WHEN _change_type = 'insert' THEN 1 ELSE 0 END) AS inserts,
SUM(CASE WHEN _change_type = 'update_postimage' THEN 1 ELSE 0 END) AS updates,
SUM(CASE WHEN _change_type = 'delete' THEN 1 ELSE 0 END) AS deletes
FROM table_changes('<catalog>.<schema>.<silver_table>', 1)
GROUP BY 1
ORDER BY 1
Notes on CDF:
- Count update_postimage for updates (update_preimage is the before-image).
- The second argument can be a version number or a timestamp string.
- CDF data is subject to the table's data retention and VACUUM settings, so keep that in mind for long lookback periods.
WHICH APPROACH TO USE
- Event log (Option 1): lightweight, gives you batch-level throughput numbers that match what the pipeline UI shows. Best for monitoring ingestion load over time.
- CDF (Option 2): gives you the exact semantic split of insert/update/delete at the row level from the target table. Best when you need precise operation-type breakdowns.
You can combine both for a complete picture.
DOCUMENTATION REFERENCES
Event log schema (includes the full FlowMetrics specification):
https://docs.databricks.com/aws/en/ldp/monitor-event-log-schema
Querying the event log:
https://docs.databricks.com/aws/en/ldp/monitor-event-logs
Change Data Feed:
https://docs.databricks.com/aws/en/delta/delta-change-data-feed.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.