โ09-08-2024 07:53 AM
Hi all,
I need to retrigger the failed jobs automatically in data bricks, can you please help me with all the possible ways to make it possible
โ09-08-2024 08:23 AM
have a look at this link - https://docs.databricks.com/en/jobs/settings.html#retry-policies
you can set retry logic for tasks or have the job run in a loop and check manually for the status and re-run if not successful.
โ09-13-2024 02:46 AM
I tried with retry logic, but I need to trigger those job if they fail only due to timeout/server issues, Can you help me on this
โ09-08-2024 10:10 AM
Hi @ranged_coop ,
To automatically retrigger failed jobs in Databricks within the last 24 hours, you can use the Databricks REST API to list the jobs, filter out the failed runs, and then retrigger those failed jobs. Below is a Python script that will help you achieve this.
import requests
import datetime
# Databricks configurations
DATABRICKS_HOST = "<your workspace url>" # Replace with your Databricks workspace URL
DATABRICKS_TOKEN = "<your personal access token>" # Replace with your Databricks Personal Access Token
# API endpoints
LIST_RUNS_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/runs/list"
RUN_NOW_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/run-now"
# Headers for API requests
HEADERS = {
"Authorization": f"Bearer {DATABRICKS_TOKEN}"
}
print(datetime.datetime.now())
# Get the timestamp for 24 hours ago
twenty_four_hours_ago = datetime.datetime.now() - datetime.timedelta(hours=24)
def get_failed_runs():
"""
Retrieve all failed job runs within the last 24 hours.
"""
failed_runs = []
has_more = True
offset = 0
while has_more:
# Fetch job runs with pagination
response = requests.get(LIST_RUNS_ENDPOINT, headers=HEADERS, params={"offset": offset, "limit": 25})
data = response.json()
# Check if data fetch was successful
if response.status_code != 200 or "runs" not in data:
print(f"Failed to fetch job runs: {data.get('message', 'Unknown error')}")
break
for run in data["runs"]:
# Check if the run failed and was within the last 24 hours
run_end_time = datetime.datetime.fromtimestamp(run["end_time"] / 1000)
if "state" in run and "result_state" in run["state"] and run["state"]["result_state"] == "FAILED" and run_end_time > twenty_four_hours_ago:
failed_runs.append(run)
# Check for more runs
has_more = data.get("has_more", False)
offset += 25 # Increment offset to fetch next set of runs
return failed_runs
def retrigger_failed_runs(failed_runs):
"""
Retrigger all failed job runs.
"""
for run in failed_runs:
job_id = run["job_id"]
print(f"Retriggering job ID: {job_id}, Run ID: {run['run_id']}")
response = requests.post(RUN_NOW_ENDPOINT, headers=HEADERS, json={"job_id": job_id})
if response.status_code == 200:
print(f"Successfully retriggered job {job_id}.")
else:
print(f"Failed to retrigger job {job_id}: {response.json().get('message', 'Unknown error')}")
# Main script execution
if __name__ == "__main__":
failed_runs = get_failed_runs()
if failed_runs:
print(f"Found {len(failed_runs)} failed runs in the last 24 hours.")
retrigger_failed_runs(failed_runs)
else:
print("No failed runs found in the last 24 hours.")
โ09-13-2024 02:49 AM
Thanks for the script.
But I need to write a code within the existing notebook, that code should filter and retrigger the notebook if it will fail due to any timeout/server issues.
Can you please help me with the script
โ09-17-2024 09:19 AM
Hi @Vishalakshi ,
I have responded during the weekend, but it seems the responses were lost.
You have here the run object. For example the current criteria is to return only runs where run[state][result_state] == "FAILED" so basically all failed jobs.
What you can do is to print(run) and you will see the object structure:
For example to rerun the jobs where run["state"]["state_message"] contains "Task df_regular failed with message: Workload failed" the code would be:
for run in data["runs"]:
# Check if the run failed and was within the last 24 hours
run_end_time = datetime.datetime.fromtimestamp(run["end_time"] / 1000)
if "state" in run and "result_state" in run["state"] and run["state"]["result_state"] == "FAILED" \
and "Task df_regular failed with message: Workload failed" in run["state"]["state_message"] \
and run_end_time > twenty_four_hours_ago:
failed_runs.append(run)
So I recommend just printing the run object, and building the filtering logic according to your criteria.
Hope it helps!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group