K_Anudeep
Databricks Employee
Databricks Employee

Hello @jakesippy ,

Instead of using any rest APIs, just run the below cell using spark.sql() programatically, and you will get all the info. The query below will depend on the pipeline event logs and will always give you accurate information.

%sql
DROP VIEW IF EXISTS event_log_raw;
CREATE VIEW event_log_raw AS
SELECT * FROM event_log("3a08341f-1acd-4edd-bf93-691c65907436");

WITH last_status_per_update AS (
  SELECT
    origin.pipeline_id,
    origin.pipeline_name,
    origin.update_id,
    FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
    timestamp,
    ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'update_progress'
  QUALIFY rn = 1
),
update_durations AS (
  SELECT
    origin.pipeline_id,
    origin.pipeline_name,
    origin.update_id,
    MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
    COALESCE(
      MAX(CASE WHEN event_type = 'update_progress'
                AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
                    IN ('COMPLETED','FAILED','CANCELED')
               THEN timestamp END),
      current_timestamp()
    ) AS end_time
  FROM event_log_raw
  WHERE event_type IN ('create_update','update_progress')
    AND origin.update_id IS NOT NULL
  GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
  HAVING start_time IS NOT NULL
)
SELECT
  d.pipeline_id,
  d.pipeline_name,
  d.update_id,
  d.start_time,
  d.end_time,
  ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
  s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
  ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id

WHERE d.update_id In ("<update_id>")
ORDER BY d.start_time DESC;

 

Programmatic way (Python) :

from pyspark.sql import Row

def get_dlt_update_duration_seconds(pipeline_id: str, update_id: str) -> Row:
    """
    Returns a Row with: start_time, end_time, duration_seconds, update_state
    for the given pipeline_id & update_id. Raises ValueError if not found.
    """
    def _sq(s: str) -> str:
        return s.replace("'", "''")
    
    # Step 1: Create the temp view
    spark.sql(
        f"""
        CREATE OR REPLACE TEMP VIEW event_log_raw AS
        SELECT * FROM event_log('{_sq(pipeline_id)}')
        """
    )
    
    # Step 2: Run the main query
    sql = f"""
    WITH last_status_per_update AS (
      SELECT
        origin.pipeline_id,
        origin.pipeline_name,
        origin.update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
        timestamp,
        ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
      FROM event_log_raw
      WHERE event_type = 'update_progress'
      QUALIFY rn = 1
    ),
    update_durations AS (
      SELECT
        origin.pipeline_id,
        origin.pipeline_name,
        origin.update_id,
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
        COALESCE(
          MAX(CASE WHEN event_type = 'update_progress'
                    AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
                        IN ('COMPLETED','FAILED','CANCELED')
                   THEN timestamp END),
          current_timestamp()
        ) AS end_time
      FROM event_log_raw
      WHERE event_type IN ('create_update','update_progress')
        AND origin.update_id IS NOT NULL
      GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
      HAVING start_time IS NOT NULL
    )
    SELECT
      d.start_time,
      d.end_time,
      ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
      s.last_state AS update_state
    FROM update_durations d
    JOIN last_status_per_update s
      ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
    WHERE d.update_id = '{_sq(update_id)}'
    """
    res = spark.sql(sql)
    rows = res.collect()

    if not rows:
        raise ValueError(f"No update found for pipeline_id={pipeline_id} update_id={update_id}")
    return rows[0]

pipeline_id = "3a08341f-1acd-4edd-bf93-691c65907436"
update_id   = "532c3e5f-d757-4a39-94e3-ebd201edfd75"
r = get_dlt_update_duration_seconds(pipeline_id, update_id)
print(f"Pipeline ID: {pipeline_id}, Update ID: {update_id},Duration(s): {r['duration_seconds']}, state: {r['update_state']}, "
      f"start: {r['start_time']}, end: {r['end_time']}")

 

Let me know if you have any further questions.

Anudeep