Hi @ranged_coop ,
To automatically retrigger failed jobs in Databricks within the last 24 hours, you can use the Databricks REST API to list the jobs, filter out the failed runs, and then retrigger those failed jobs. Below is a Python script that will help you achieve this.
import requests
import datetime
# Databricks configurations
DATABRICKS_HOST = "<your workspace url>" # Replace with your Databricks workspace URL
DATABRICKS_TOKEN = "<your personal access token>" # Replace with your Databricks Personal Access Token
# API endpoints
LIST_RUNS_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/runs/list"
RUN_NOW_ENDPOINT = f"{DATABRICKS_HOST}/api/2.1/jobs/run-now"
# Headers for API requests
HEADERS = {
"Authorization": f"Bearer {DATABRICKS_TOKEN}"
}
print(datetime.datetime.now())
# Get the timestamp for 24 hours ago
twenty_four_hours_ago = datetime.datetime.now() - datetime.timedelta(hours=24)
def get_failed_runs():
"""
Retrieve all failed job runs within the last 24 hours.
"""
failed_runs = []
has_more = True
offset = 0
while has_more:
# Fetch job runs with pagination
response = requests.get(LIST_RUNS_ENDPOINT, headers=HEADERS, params={"offset": offset, "limit": 25})
data = response.json()
# Check if data fetch was successful
if response.status_code != 200 or "runs" not in data:
print(f"Failed to fetch job runs: {data.get('message', 'Unknown error')}")
break
for run in data["runs"]:
# Check if the run failed and was within the last 24 hours
run_end_time = datetime.datetime.fromtimestamp(run["end_time"] / 1000)
if "state" in run and "result_state" in run["state"] and run["state"]["result_state"] == "FAILED" and run_end_time > twenty_four_hours_ago:
failed_runs.append(run)
# Check for more runs
has_more = data.get("has_more", False)
offset += 25 # Increment offset to fetch next set of runs
return failed_runs
def retrigger_failed_runs(failed_runs):
"""
Retrigger all failed job runs.
"""
for run in failed_runs:
job_id = run["job_id"]
print(f"Retriggering job ID: {job_id}, Run ID: {run['run_id']}")
response = requests.post(RUN_NOW_ENDPOINT, headers=HEADERS, json={"job_id": job_id})
if response.status_code == 200:
print(f"Successfully retriggered job {job_id}.")
else:
print(f"Failed to retrigger job {job_id}: {response.json().get('message', 'Unknown error')}")
# Main script execution
if __name__ == "__main__":
failed_runs = get_failed_runs()
if failed_runs:
print(f"Found {len(failed_runs)} failed runs in the last 24 hours.")
retrigger_failed_runs(failed_runs)
else:
print("No failed runs found in the last 24 hours.")