2 weeks ago
I'm looking to track how much time is being spent running updates for my DLT pipelines.
When querying the list pipeline updates REST API endpoint I can see start and end times being returned, however, these fields are not listed in the documentation. They also don't appear to be available in the Java SDK.
https://docs.databricks.com/api/workspace/pipelines/listupdates
I'm wondering if these values should always be available in the results and/or if this is simply a miss in the API spec?
Alternatively, is there any other way to programmatically get this information?
// Example response seen from `api/2.0/pipelines/<pipeline-id>/updates`:
{
"updates": [
{
"pipeline_id": "ec65fedc-c87a-4b05-9178-f3e8d70c2abc",
"update_id": "20eab4bf-ecbe-4d1e-8b29-5c0e97f149aa",
"cause": "USER_ACTION",
"state": "COMPLETED",
"creation_time": 1758657962989,
"start_time": "2025-09-23T20:06:02.989Z",
"end_time": "2025-09-23T20:06:16.115Z",
"full_refresh": false,
"request_id": "20eab4bf-ecbe-4d1e-8b29-5c0e97f149aa",
"validate_only": false,
"development": true,
"explore_only": false
}
],
2 weeks ago
Yep, this is exact same SQL query that is reading from event_log that I shared with above link:
Lakeflow Declarative Pipelines event log | Databricks on AWS
2 weeks ago
Hi @jakesippy ,
Maybe you have outdated Java SDK version and that's why you can't see them.
But, alternative way to obtain this information programatically is to use pipeline's event log. You can read more about this at following link.
Lakeflow Declarative Pipelines event log | Databricks on AWS
So you can prepare your query first and then use statement execution api to obtain result:
Statement Execution API | REST API reference | Databricks on AWS
2 weeks ago - last edited 2 weeks ago
Hello @jakesippy ,
Instead of using any rest APIs, just run the below cell using spark.sql() programatically, and you will get all the info. The query below will depend on the pipeline event logs and will always give you accurate information.
%sql
DROP VIEW IF EXISTS event_log_raw;
CREATE VIEW event_log_raw AS
SELECT * FROM event_log("3a08341f-1acd-4edd-bf93-691c65907436");
WITH last_status_per_update AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
COALESCE(
MAX(CASE WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
IN ('COMPLETED','FAILED','CANCELED')
THEN timestamp END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update','update_progress')
AND origin.update_id IS NOT NULL
GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
HAVING start_time IS NOT NULL
)
SELECT
d.pipeline_id,
d.pipeline_name,
d.update_id,
d.start_time,
d.end_time,
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
WHERE d.update_id In ("<update_id>")
ORDER BY d.start_time DESC;
from pyspark.sql import Row
def get_dlt_update_duration_seconds(pipeline_id: str, update_id: str) -> Row:
"""
Returns a Row with: start_time, end_time, duration_seconds, update_state
for the given pipeline_id & update_id. Raises ValueError if not found.
"""
def _sq(s: str) -> str:
return s.replace("'", "''")
# Step 1: Create the temp view
spark.sql(
f"""
CREATE OR REPLACE TEMP VIEW event_log_raw AS
SELECT * FROM event_log('{_sq(pipeline_id)}')
"""
)
# Step 2: Run the main query
sql = f"""
WITH last_status_per_update AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
COALESCE(
MAX(CASE WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
IN ('COMPLETED','FAILED','CANCELED')
THEN timestamp END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update','update_progress')
AND origin.update_id IS NOT NULL
GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
HAVING start_time IS NOT NULL
)
SELECT
d.start_time,
d.end_time,
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
WHERE d.update_id = '{_sq(update_id)}'
"""
res = spark.sql(sql)
rows = res.collect()
if not rows:
raise ValueError(f"No update found for pipeline_id={pipeline_id} update_id={update_id}")
return rows[0]
pipeline_id = "3a08341f-1acd-4edd-bf93-691c65907436"
update_id = "532c3e5f-d757-4a39-94e3-ebd201edfd75"
r = get_dlt_update_duration_seconds(pipeline_id, update_id)
print(f"Pipeline ID: {pipeline_id}, Update ID: {update_id},Duration(s): {r['duration_seconds']}, state: {r['update_state']}, "
f"start: {r['start_time']}, end: {r['end_time']}")
Let me know if you have any further questions.
2 weeks ago
Yep, this is exact same SQL query that is reading from event_log that I shared with above link:
Lakeflow Declarative Pipelines event log | Databricks on AWS
2 weeks ago
Thanks all, I am able to see the event logs for pipelines now, I wasn't aware of that!
My hope though is that there is some way to fetch these details across an account directly, not just for pipelines I've personally created or for pipelines which are explicitly exporting. Essentially I'm working on a dashboard to track pipeline usage across the entire account, so the pipelines I'm looking at will have been created multiple different users. I've also tried updating to the latest SDK but it seems the start/end time fields are still not populated there.
With the event logs I understand it should be possible to publish them to the catalog, from which they could be queried from a single place to update the dashboard. However I'm worried about the overhead of ensuring that all the existing and future pipelines we want to monitor will have this setting enabled.
Is anyone aware of any other way to get these metrics across an account, similar to how the job run stats can be retrieved from the `system.lakeflow.job_run_timeline` system table or REST API via `api/2.2/jobs/runs/list`?
2 weeks ago
Hi @jakesippy Create a policy or automation (e.g., with Terraform, REST API, or Databricks CLI) to ensure all new pipelines have event log publishing enabled to a central Unity Catalog table. Once logs are published, you can build your dashboard from the central table.
Wednesday
Originally went with the approach of exporting to and reading from the event log table, which has been helpful for getting other metrics as well.
Also found today that there is a new system table is in public preview which exposes the durations I was looking for without the need for additional configuration on the pipelines. Wanted to record that here as well
https://docs.databricks.com/aws/en/admin/system-tables/jobs#pipeline-update-timeline
Thanks all
Wednesday
Hi @jakesippy ,
Thanks for sharing this with us ๐
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now