cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to get pipeline update duration programmatically

jakesippy
Visitor

I'm looking to track how much time is being spent running updates for my DLT pipelines.

When querying the list pipeline updates REST API endpoint I can see start and end times being returned, however, these fields are not listed in the documentation. They also don't appear to be available in the Java SDK.

https://docs.databricks.com/api/workspace/pipelines/listupdates

I'm wondering if these values should always be available in the results and/or if this is simply a miss in the API spec?

Alternatively, is there any other way to programmatically get this information?

 

// Example response seen from `api/2.0/pipelines/<pipeline-id>/updates`:

{
    "updates": [
        {
            "pipeline_id": "ec65fedc-c87a-4b05-9178-f3e8d70c2abc",
            "update_id": "20eab4bf-ecbe-4d1e-8b29-5c0e97f149aa",
            "cause": "USER_ACTION",
            "state": "COMPLETED",
            "creation_time": 1758657962989,
            "start_time": "2025-09-23T20:06:02.989Z",
            "end_time": "2025-09-23T20:06:16.115Z",
            "full_refresh": false,
            "request_id": "20eab4bf-ecbe-4d1e-8b29-5c0e97f149aa",
            "validate_only": false,
            "development": true,
            "explore_only": false
        }
    ],

 

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @jakesippy ,

Maybe you have outdated Java SDK version and that's why you can't see them.

But, alternative way to obtain this information programatically is to use pipeline's event log. You can read more about this at following link. 

Lakeflow Declarative Pipelines event log | Databricks on AWS

So you can prepare your query first and then use statement execution api to obtain result:

Statement Execution API | REST API reference | Databricks on AWS

 

K_Anudeep
Databricks Employee
Databricks Employee

Hello @jakesippy ,

Instead of using any rest APIs, just run the below cell using spark.sql() programatically, and you will get all the info. The query below will depend on the pipeline event logs and will always give you accurate information.

%sql
DROP VIEW IF EXISTS event_log_raw;
CREATE VIEW event_log_raw AS
SELECT * FROM event_log("3a08341f-1acd-4edd-bf93-691c65907436");

WITH last_status_per_update AS (
  SELECT
    origin.pipeline_id,
    origin.pipeline_name,
    origin.update_id,
    FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
    timestamp,
    ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'update_progress'
  QUALIFY rn = 1
),
update_durations AS (
  SELECT
    origin.pipeline_id,
    origin.pipeline_name,
    origin.update_id,
    MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
    COALESCE(
      MAX(CASE WHEN event_type = 'update_progress'
                AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
                    IN ('COMPLETED','FAILED','CANCELED')
               THEN timestamp END),
      current_timestamp()
    ) AS end_time
  FROM event_log_raw
  WHERE event_type IN ('create_update','update_progress')
    AND origin.update_id IS NOT NULL
  GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
  HAVING start_time IS NOT NULL
)
SELECT
  d.pipeline_id,
  d.pipeline_name,
  d.update_id,
  d.start_time,
  d.end_time,
  ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
  s.last_state AS update_state
FROM update_durations d
JOIN last_status_per_update s
  ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id

WHERE d.update_id In ("<update_id>")
ORDER BY d.start_time DESC;

 

Programmatic way (Python) :

from pyspark.sql import Row

def get_dlt_update_duration_seconds(pipeline_id: str, update_id: str) -> Row:
    """
    Returns a Row with: start_time, end_time, duration_seconds, update_state
    for the given pipeline_id & update_id. Raises ValueError if not found.
    """
    def _sq(s: str) -> str:
        return s.replace("'", "''")
    
    # Step 1: Create the temp view
    spark.sql(
        f"""
        CREATE OR REPLACE TEMP VIEW event_log_raw AS
        SELECT * FROM event_log('{_sq(pipeline_id)}')
        """
    )
    
    # Step 2: Run the main query
    sql = f"""
    WITH last_status_per_update AS (
      SELECT
        origin.pipeline_id,
        origin.pipeline_name,
        origin.update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_state,
        timestamp,
        ROW_NUMBER() OVER (PARTITION BY origin.update_id ORDER BY timestamp DESC) AS rn
      FROM event_log_raw
      WHERE event_type = 'update_progress'
      QUALIFY rn = 1
    ),
    update_durations AS (
      SELECT
        origin.pipeline_id,
        origin.pipeline_name,
        origin.update_id,
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
        COALESCE(
          MAX(CASE WHEN event_type = 'update_progress'
                    AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state
                        IN ('COMPLETED','FAILED','CANCELED')
                   THEN timestamp END),
          current_timestamp()
        ) AS end_time
      FROM event_log_raw
      WHERE event_type IN ('create_update','update_progress')
        AND origin.update_id IS NOT NULL
      GROUP BY origin.pipeline_id, origin.pipeline_name, origin.update_id
      HAVING start_time IS NOT NULL
    )
    SELECT
      d.start_time,
      d.end_time,
      ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time)/1000) AS duration_seconds,
      s.last_state AS update_state
    FROM update_durations d
    JOIN last_status_per_update s
      ON d.pipeline_id = s.pipeline_id AND d.update_id = s.update_id
    WHERE d.update_id = '{_sq(update_id)}'
    """
    res = spark.sql(sql)
    rows = res.collect()

    if not rows:
        raise ValueError(f"No update found for pipeline_id={pipeline_id} update_id={update_id}")
    return rows[0]

pipeline_id = "3a08341f-1acd-4edd-bf93-691c65907436"
update_id   = "532c3e5f-d757-4a39-94e3-ebd201edfd75"
r = get_dlt_update_duration_seconds(pipeline_id, update_id)
print(f"Pipeline ID: {pipeline_id}, Update ID: {update_id},Duration(s): {r['duration_seconds']}, state: {r['update_state']}, "
      f"start: {r['start_time']}, end: {r['end_time']}")

 

Let me know if you have any further questions.

szymon_dybczak
Esteemed Contributor III

Yep, this is exact same SQL query that is reading from event_log that I shared with above link:

Lakeflow Declarative Pipelines event log | Databricks on AWS

szymon_dybczak_0-1758715838302.png

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now