cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Need to automatically rerun the failed jobs in databricks

Vishalakshi
New Contributor II

Hi all,

 

I need to retrigger the failed jobs automatically in data bricks, can you please help me with all the possible ways to make it possible 

5 REPLIES 5

ranged_coop
Valued Contributor II

have a look at this link - https://docs.databricks.com/en/jobs/settings.html#retry-policies

you can set retry logic for tasks or have the job run in a loop and check manually for the status and re-run if not successful.

I tried with retry logic, but I need to trigger those job if they fail only due to timeout/server issues, Can you help me on this 

filipniziol
Contributor

Hi @ranged_coop ,

To automatically retrigger failed jobs in Databricks within the last 24 hours, you can use the Databricks REST API to list the jobs, filter out the failed runs, and then retrigger those failed jobs. Below is a Python script that will help you achieve this.

import requests
import datetime

# Databricks configurations
DATABRICKS_HOST = "<your workspace url>"  # Replace with your Databricks workspace URL
DATABRICKS_TOKEN = "<your personal access token>"  # Replace with your Databricks Personal Access Token

# API endpoints
LIST_RUNS_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/runs/list"
RUN_NOW_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/run-now"

# Headers for API requests
HEADERS = {
    "Authorization": f"Bearer {DATABRICKS_TOKEN}"
}
print(datetime.datetime.now())
# Get the timestamp for 24 hours ago
twenty_four_hours_ago = datetime.datetime.now() - datetime.timedelta(hours=24)

def get_failed_runs():
    """
    Retrieve all failed job runs within the last 24 hours.
    """
    failed_runs = []
    has_more = True
    offset = 0
    
    while has_more:
        # Fetch job runs with pagination
        response = requests.get(LIST_RUNS_ENDPOINT, headers=HEADERS, params={"offset": offset, "limit": 25})
        data = response.json()
        
        # Check if data fetch was successful
        if response.status_code != 200 or "runs" not in data:
            print(f"Failed to fetch job runs: {data.get('message', 'Unknown error')}")
            break
        
        for run in data["runs"]:
            # Check if the run failed and was within the last 24 hours
            run_end_time = datetime.datetime.fromtimestamp(run["end_time"] / 1000)
            if "state" in run and "result_state" in run["state"] and run["state"]["result_state"] == "FAILED" and run_end_time > twenty_four_hours_ago:
                failed_runs.append(run)
        
        # Check for more runs
        has_more = data.get("has_more", False)
        offset += 25  # Increment offset to fetch next set of runs
    
    return failed_runs

def retrigger_failed_runs(failed_runs):
    """
    Retrigger all failed job runs.
    """
    for run in failed_runs:
        job_id = run["job_id"]
        print(f"Retriggering job ID: {job_id}, Run ID: {run['run_id']}")
        response = requests.post(RUN_NOW_ENDPOINT, headers=HEADERS, json={"job_id": job_id})
        
        if response.status_code == 200:
            print(f"Successfully retriggered job {job_id}.")
        else:
            print(f"Failed to retrigger job {job_id}: {response.json().get('message', 'Unknown error')}")

# Main script execution
if __name__ == "__main__":
    failed_runs = get_failed_runs()
    if failed_runs:
        print(f"Found {len(failed_runs)} failed runs in the last 24 hours.")
        retrigger_failed_runs(failed_runs)
    else:
        print("No failed runs found in the last 24 hours.")

 

Thanks for the script.

But I need to write a code within the existing notebook, that code should filter and retrigger the notebook if it will fail due to any timeout/server issues.

Can you please help me with the script 

 

filipniziol
Contributor

Hi @Vishalakshi ,

I have responded during the weekend, but it seems the responses were lost.
You have here the run object. For example the current criteria is to return only runs where run[state][result_state] == "FAILED" so basically all failed jobs.

filipniziol_0-1726589554376.png

What you can do is to print(run) and you will see the object structure:

filipniziol_1-1726589796071.png

For example to rerun the jobs where run["state"]["state_message"] contains  "Task df_regular failed with message: Workload failed" the code would be:

        for run in data["runs"]:
            # Check if the run failed and was within the last 24 hours
            run_end_time = datetime.datetime.fromtimestamp(run["end_time"] / 1000)
            if "state" in run and "result_state" in run["state"] and run["state"]["result_state"] == "FAILED" \
                and "Task df_regular failed with message: Workload failed" in run["state"]["state_message"] \
                and run_end_time > twenty_four_hours_ago:
                failed_runs.append(run)

So I recommend just printing the run object, and building the filtering logic according to your criteria.

Hope it helps!



Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group