cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trigger Databricks Workflow when other workflows succeeded

dbuserng
New Contributor II

Hi,

I have 3 separate workflows with 3 different triggers and the thing I would like to achieve is - after all of these 3 jobs completed & succeeded I would like to trigger another job. Is it possible?

These 3 jobs have to stay separate (I cannot combine them into a single pipeline and add a dependent task to them).

1 REPLY 1

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @dbuserng,

It is possible but it requires custom code setup based on your use-case. You can use job REST API: https://docs.databricks.com/api/workspace/jobs.

Create a Monitoring Job: Set up a job that will monitor the completion status of the three workflows. This job will periodically check if all three workflows have completed successfully.

Use the Jobs API: Utilize the Databricks Jobs API to check the status of the workflows. You can use the jobs/runs/list endpoint to get the status of the runs for each workflow.

 

Conditional Trigger: In the monitoring job, implement logic to check if all three workflows have completed successfully. If they have, trigger the desired job.

 

Here is a high-level example of how you can implement this:

 

import requests

 

# Define the job IDs for the three workflows

workflow_ids = [workflow1_id, workflow2_id, workflow3_id]

 

# Function to check the status of a workflow

def check_workflow_status(job_id):

    response = requests.get(f'https://<databricks-instance>/api/2.0/jobs/runs/list?job_id={job_id}')

    runs = response.json().get('runs', [])

    if runs and runs[0]['state']['life_cycle_state'] == 'TERMINATED' and runs[0]['state']['result_state'] == 'SUCCESS':

        return True

    return False

 

# Check the status of all workflows

all_completed = all(check_workflow_status(job_id) for job_id in workflow_ids)

 

# If all workflows are completed successfully, trigger the next job

if all_completed:

    trigger_response = requests.post('https://<databricks-instance>/api/2.0/jobs/run-now', json={"job_id": next_job_id})

    if trigger_response.status_code == 200:

        print("Next job triggered successfully")

    else:

        print("Failed to trigger the next job")

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now