cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Workflow

abhirupa7
New Contributor

I have a query. I have multiple job (workflow)present in my workspace. Those job runs regularly. Multiple task present in those jobs. Few task having notebook that contain for each code in it. now when a job runs that particular task execute the for each loop and create multiple iteration of that loop. now My requirement is I want to capture those iteration related information via notebook using databricks sd or rest api (which I can see in databricks workspace UI)?

Interation level information like taskid-iteration task run id,start time,end date, duration, status etc

I want to see how many iteration present in that for each loop and how many successfull and failed.

Already I can capture job-run-task level information.

suggest me some doc/code sample/explaination etc.

TIA

1 ACCEPTED SOLUTION

Accepted Solutions

mark_ott
Databricks Employee
Databricks Employee

To programmatically capture iteration-level information for tasks running inside a Databricks Workflow Job that uses the "for each" loop construct, you will primarily rely on the Databricks Jobs REST API (v2.1) and possibly the Databricks Python SDK. However, there are some gaps and workarounds you should be aware of, especially for capturing “for each” iteration details at the same granularity as seen in the UI.

Key Points

  • The Databricks Jobs API allows you to list jobs, get details on job runs, and inspect each task inside a run, including start time, end time, duration, and status.​

  • Standard API endpoints:

    • /api/2.1/jobs/runs/list: List runs for a job, including status info.

    • /api/2.1/jobs/runs/get: Get details for a specific run, includes subtask info.

    • Subtasks for "for each" are tied to the parent via parent_run_id, and their info (status, timing, etc.) is available only if you gather run IDs for each iteration.​

  • The main hurdle: As of now, the API does not directly return all for-each subtask iteration run IDs from the parent single call. In the UI, you can view each iteration, but the API often only returns the root “for each” task and not the individual iterations in a single list response.​

Workaround and Process

  1. Identify the Job Run (Parent):

    • Use /api/2.1/jobs/runs/get?run_id=<parent_run_id> to get the root run and see any subtasks.

  2. Find Subtask (Iteration) Run IDs:

    • Use the web UI to get an iteration run ID, then call /api/2.1/jobs/runs/get?run_id=<iteration_run_id>.

    • Check if the response has a parent_run_id to connect the iteration to the parent.

  3. Automate Retrieval:

    • List all runs using /api/2.1/jobs/runs/list?parent_run_id=<parent_run_id>, if supported for your environment. This filter can help you gather all iterations linked to the parent task.

    • For each run, collect: task_id, run_id, start_time, end_time, duration, and state (status).

  4. Aggregate Iteration Results:

    • Within your Databricks notebook (Python), call the REST API and aggregate by:

      • Count of total iterations.

      • Number successful and failed.

      • Per-iteration timing and status.

Example: Python Code Snippet

python
import requests # Set your Databricks instance and token instance = "https://<your-instance>.cloud.databricks.com" token = "<your-databricks-token>" parent_run_id = 123456 # Replace with your parent run ID # List all runs (potentially filter by parent if supported) response = requests.get( f"{instance}/api/2.1/jobs/runs/list", headers={"Authorization": f"Bearer {token}"}, params={"parent_run_id": parent_run_id} ) runs = response.json().get("runs", []) # Summarize iteration info results = [] for run in runs: results.append({ "task_id": run["run_id"], "start_time": run["start_time"], "end_time": run["end_time"], "duration": run["end_time"] - run["start_time"], "status": run["state"]["result_state"] }) # Count successes and failures success_count = sum(1 for r in results if r["status"] == "SUCCESS") fail_count = sum(1 for r in results if r["status"] == "FAILED") total = len(results)
  • Note: The above assumes your workspace and Databricks version support filtering by parent_run_id in /jobs/runs/list. Results may vary; some manual parsing may be necessary.​

Additional Documentation

  • Databricks "For Each" Documentation: Explains the UI setup as well as how parameters are passed, but the API documentation for direct iteration-level retrieval is still evolving.​

  • Community discussions highlight that consistently retrieving all "for each" iterations without manual run ID extraction might not be supported yet; keep an eye on API updates for improved support.​

Summary Table: API Calls and Information

Info to Capture API Endpoint Notes
List all runs /api/2.1/jobs/runs/list?job_id=... Use filters if needed​
Get run details /api/2.1/jobs/runs/get?run_id=... Includes timing/status per run
Connect iter to parent /api/2.1/jobs/runs/get?run_id=<iter_id> Check parent_run_id field
Count successes/fails Aggregate state.result_state for each iteration run Must script this in notebook
 
 

This approach lets you automate the extraction of each "for each" iteration's status/timing from your Databricks notebook, though you may need to adapt based on your platform’s API capabilities.​

View solution in original post

2 REPLIES 2

AbhaySingh
Databricks Employee
Databricks Employee

Just to confirm : You are using this API endpoint already?

https://docs.databricks.com/api/workspace/jobs/getrun

mark_ott
Databricks Employee
Databricks Employee

To programmatically capture iteration-level information for tasks running inside a Databricks Workflow Job that uses the "for each" loop construct, you will primarily rely on the Databricks Jobs REST API (v2.1) and possibly the Databricks Python SDK. However, there are some gaps and workarounds you should be aware of, especially for capturing “for each” iteration details at the same granularity as seen in the UI.

Key Points

  • The Databricks Jobs API allows you to list jobs, get details on job runs, and inspect each task inside a run, including start time, end time, duration, and status.​

  • Standard API endpoints:

    • /api/2.1/jobs/runs/list: List runs for a job, including status info.

    • /api/2.1/jobs/runs/get: Get details for a specific run, includes subtask info.

    • Subtasks for "for each" are tied to the parent via parent_run_id, and their info (status, timing, etc.) is available only if you gather run IDs for each iteration.​

  • The main hurdle: As of now, the API does not directly return all for-each subtask iteration run IDs from the parent single call. In the UI, you can view each iteration, but the API often only returns the root “for each” task and not the individual iterations in a single list response.​

Workaround and Process

  1. Identify the Job Run (Parent):

    • Use /api/2.1/jobs/runs/get?run_id=<parent_run_id> to get the root run and see any subtasks.

  2. Find Subtask (Iteration) Run IDs:

    • Use the web UI to get an iteration run ID, then call /api/2.1/jobs/runs/get?run_id=<iteration_run_id>.

    • Check if the response has a parent_run_id to connect the iteration to the parent.

  3. Automate Retrieval:

    • List all runs using /api/2.1/jobs/runs/list?parent_run_id=<parent_run_id>, if supported for your environment. This filter can help you gather all iterations linked to the parent task.

    • For each run, collect: task_id, run_id, start_time, end_time, duration, and state (status).

  4. Aggregate Iteration Results:

    • Within your Databricks notebook (Python), call the REST API and aggregate by:

      • Count of total iterations.

      • Number successful and failed.

      • Per-iteration timing and status.

Example: Python Code Snippet

python
import requests # Set your Databricks instance and token instance = "https://<your-instance>.cloud.databricks.com" token = "<your-databricks-token>" parent_run_id = 123456 # Replace with your parent run ID # List all runs (potentially filter by parent if supported) response = requests.get( f"{instance}/api/2.1/jobs/runs/list", headers={"Authorization": f"Bearer {token}"}, params={"parent_run_id": parent_run_id} ) runs = response.json().get("runs", []) # Summarize iteration info results = [] for run in runs: results.append({ "task_id": run["run_id"], "start_time": run["start_time"], "end_time": run["end_time"], "duration": run["end_time"] - run["start_time"], "status": run["state"]["result_state"] }) # Count successes and failures success_count = sum(1 for r in results if r["status"] == "SUCCESS") fail_count = sum(1 for r in results if r["status"] == "FAILED") total = len(results)
  • Note: The above assumes your workspace and Databricks version support filtering by parent_run_id in /jobs/runs/list. Results may vary; some manual parsing may be necessary.​

Additional Documentation

  • Databricks "For Each" Documentation: Explains the UI setup as well as how parameters are passed, but the API documentation for direct iteration-level retrieval is still evolving.​

  • Community discussions highlight that consistently retrieving all "for each" iterations without manual run ID extraction might not be supported yet; keep an eye on API updates for improved support.​

Summary Table: API Calls and Information

Info to Capture API Endpoint Notes
List all runs /api/2.1/jobs/runs/list?job_id=... Use filters if needed​
Get run details /api/2.1/jobs/runs/get?run_id=... Includes timing/status per run
Connect iter to parent /api/2.1/jobs/runs/get?run_id=<iter_id> Check parent_run_id field
Count successes/fails Aggregate state.result_state for each iteration run Must script this in notebook
 
 

This approach lets you automate the extraction of each "for each" iteration's status/timing from your Databricks notebook, though you may need to adapt based on your platform’s API capabilities.​