cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trigger Databricks Workflow when other workflows succeeded

dbuserng
New Contributor II

Hi,

I have 3 separate workflows with 3 different triggers and the thing I would like to achieve is - after all of these 3 jobs completed & succeeded I would like to trigger another job. Is it possible?

These 3 jobs have to stay separate (I cannot combine them into a single pipeline and add a dependent task to them).

1 REPLY 1

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @dbuserng,

It is possible but it requires custom code setup based on your use-case. You can use job REST API: https://docs.databricks.com/api/workspace/jobs.

Create a Monitoring Job: Set up a job that will monitor the completion status of the three workflows. This job will periodically check if all three workflows have completed successfully.

Use the Jobs API: Utilize the Databricks Jobs API to check the status of the workflows. You can use the jobs/runs/list endpoint to get the status of the runs for each workflow.

 

Conditional Trigger: In the monitoring job, implement logic to check if all three workflows have completed successfully. If they have, trigger the desired job.

 

Here is a high-level example of how you can implement this:

 

import requests

 

# Define the job IDs for the three workflows

workflow_ids = [workflow1_id, workflow2_id, workflow3_id]

 

# Function to check the status of a workflow

def check_workflow_status(job_id):

    response = requests.get(f'https://<databricks-instance>/api/2.0/jobs/runs/list?job_id={job_id}')

    runs = response.json().get('runs', [])

    if runs and runs[0]['state']['life_cycle_state'] == 'TERMINATED' and runs[0]['state']['result_state'] == 'SUCCESS':

        return True

    return False

 

# Check the status of all workflows

all_completed = all(check_workflow_status(job_id) for job_id in workflow_ids)

 

# If all workflows are completed successfully, trigger the next job

if all_completed:

    trigger_response = requests.post('https://<databricks-instance>/api/2.0/jobs/run-now', json={"job_id": next_job_id})

    if trigger_response.status_code == 200:

        print("Next job triggered successfully")

    else:

        print("Failed to trigger the next job")

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group