Hello @jakesippy ,
Instead of using any rest APIs, just run the below cell using spark.sql() programatically, and you will get all the info. The query below will depend on the pipeline event logs and will always give you accurate information.
%sql
DROP VIEW IF EXISTS event_log_raw;
CREATE VIEW event_log_raw AS
SELECT * FROM event_log("3a08341f-1acd-4edd-bf93-691c65907436");
WITH last_status_per_update AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
COALESCE(
MAX(CASE WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
IN ('COMPLETED','FAILED','CANCELED')
THEN timestamp END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update','update_progress')
AND origin.update_id IS NOT NULL
GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
HAVING start_time IS NOT NULL
)
SELECT
d.pipeline_id,
d.pipeline_name,
d.update_id,
d.start_time,
d.end_time,
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
WHERE d.update_id In ("<update_id>")
ORDER BY d.start_time DESC;
Programmatic way (Python) :
from pyspark.sql import Row
def get_dlt_update_duration_seconds(pipeline_id: str, update_id: str) -> Row:
"""
Returns a Row with: start_time, end_time, duration_seconds, update_state
for the given pipeline_id & update_id. Raises ValueError if not found.
"""
def _sq(s: str) -> str:
return s.replace("'", "''")
# Step 1: Create the temp view
spark.sql(
f"""
CREATE OR REPLACE TEMP VIEW event_log_raw AS
SELECT * FROM event_log('{_sq(pipeline_id)}')
"""
)
# Step 2: Run the main query
sql = f"""
WITH last_status_per_update AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id,
origin.pipeline_name,
origin.update_id,
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
COALESCE(
MAX(CASE WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
IN ('COMPLETED','FAILED','CANCELED')
THEN timestamp END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update','update_progress')
AND origin.update_id IS NOT NULL
GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
HAVING start_time IS NOT NULL
)
SELECT
d.start_time,
d.end_time,
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
WHERE d.update_id = '{_sq(update_id)}'
"""
res = spark.sql(sql)
rows = res.collect()
if not rows:
raise ValueError(f"No update found for pipeline_id={pipeline_id} update_id={update_id}")
return rows[0]
pipeline_id = "3a08341f-1acd-4edd-bf93-691c65907436"
update_id = "532c3e5f-d757-4a39-94e3-ebd201edfd75"
r = get_dlt_update_duration_seconds(pipeline_id, update_id)
print(f"Pipeline ID: {pipeline_id}, Update ID: {update_id},Duration(s): {r['duration_seconds']}, state: {r['update_state']}, "
f"start: {r['start_time']}, end: {r['end_time']}")
Let me know if you have any further questions.