cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

API for Restarting Individual Failed Tasks within a Job?

minhhung0507
Contributor III

Hi everyone,

I'm exploring ways to streamline my workflow in Databricks and could really use some expert advice. In my current setup, I have a job (named job_silver) with multiple tasks (e.g., task 1, task 2, task 3). When one of these tasks fails—say task 2—I want the ability to restart just that specific task without rerunning the entire job.

I did some research and came across the “Repair and Rerun” feature (Databricks Blog). While that's a great tool for saving time and money in data and ML workflows, my use case requires more flexibility. Specifically, I'm looking for an API-based solution that I can integrate into my code, allowing dynamic control over which task to restart based on custom logic.

Some points I’m particularly interested in:

  1. Is there an existing API (or a combination of APIs) that allows for restarting individual tasks within a job?

  2. Could this be done via the REST API, and if so, what endpoints or methods should I look at?

  3. Are there any workarounds or best practices for implementing this functionality if a dedicated API is not available?

  4. How might this approach scale in environments with a large number of jobs and complex dependency graphs?

I’d love to hear about your experiences and any code snippets or documentation pointers that could help me get started. Thanks in advance for your insights!

Regards,
Hung Nguyen
15 REPLIES 15

Aviral-Bhardwaj
Esteemed Contributor III

use retry job or task option it is available bottom right side of the job or task plane 

AviralBhardwaj

Hi @Aviral-Bhardwaj , 

Thanks for your suggestion. But I really need is a code/API-driven approach rather than having to click around in the UI to manage a specific task in a job retry.

Regards,
Hung Nguyen

Aviral-Bhardwaj
Esteemed Contributor III

Thank you so much for sharing that information. After reviewing the /api/2.2/jobs/reset API documentation, it looks like this endpoint is designed for updating the entire job configuration for future runs and doesn’t support restarting only the failed tasks of an active job run.

I appreciate your help, and if you have any other suggestions or ideas on achieving that level of granularity, I'd be happy to hear them.

Regards,
Hung Nguyen

aayrm5
Honored Contributor

hi @minhhung0507 

Please check if the below code using rest api is able to cater to your needs.

 

list_runs_url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/runs/list"
params = {
"job_id": JOB_ID,
"limit": 1,
"active_only": False
}
response = requests.get(list_runs_url, headers=headers, params=params)
latest_run_id = response.json()["runs"][0]["run_id"]
 
run_details_url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/runs/get"
response = requests.get(run_details_url, headers=headers, params={"run_id": latest_run_id})
tasks = response.json().get("tasks", [])
 
failed_tasks = [task["task_key"] for task in tasks if task["state"]["result_state"] == "FAILED"]
print(failed_tasks)
 
if not failed_tasks:
print("No failed tasks found.")
else:
print(f"Retrying failed tasks: {failed_tasks}")

# Submit new run for each failed task (assuming same notebook and cluster setup)
for task in tasks:
if task["task_key"] in failed_tasks:
submit_url = f"{DATABRICKS_INSTANCE}/api/2.1/jobs/runs/submit"
payload = {
"run_name": f"Rerun failed task: {task['task_key']}",
"tasks": [
{
"task_key": task["task_key"],
"notebook_task": task["notebook_task"],
"existing_cluster_id": task["existing_cluster_id"]
}
]
}
submit_resp = requests.post(submit_url, headers=headers, data=json.dumps(payload))
print(f"Submitted rerun for task {task['task_key']}: {submit_resp.status_code} - {submit_resp.text}")
 
Riz

aayrm5
Honored Contributor

Hey @minhhung0507 

Keep me posted on how this works for you.

Looking forward 

Cheers!

Riz

Sure, we will try again based on the solution you provided and will feedback to you whether it works or not. Thank you very much.

Regards,
Hung Nguyen

Hi @aayrm5 , after applying your code to the pipeline it seems that it can't work because of the following error:

400 --- {"error_code":"INVALID_PARAMETER_VALUE","message":"One of job_cluster_key, new_cluster, or existing_cluster_id must be specified. Serverless compute for workflows is not enabled in the workspace.","details":[{"@type":"type.googleapis.com/google.rpc.RequestInfo","request_id":"74ef6feb-9cff-4d3d-8cb6-3f45f3525768","serving_data":""}]}

Could you plese help us investigate this error?

Regards,
Hung Nguyen

aayrm5
Honored Contributor

Hey @minhhung0507 - quick question - what is the cluster type you're using to run your workflow?

I'm using a shared, interactive cluster, so I'm passing the parameter 

{'existing_cluster_id' : task['existing_cluster_id']}

in the payload. This parameter will change if you're using a job_cluster, I'd have to check what it's gonna be for serveless though. 

Hope it helps.

Cheers!

Riz

Hi @aayrm5 ,

Thanks for showing how to pass the existing_cluster_id in the payload. One thing I’m worried about from a production standpoint is what happens if the ID we supply doesn’t correspond to an active cluster—will Databricks automatically spin up a new cluster under the hood, or will the run fail immediately?

Auto‑provisioning clusters without explicit control could lead to unwanted resource sprawl and cost overruns. Could you please clarify the behavior of the Submit Run API when existing_cluster_id can’t be resolved, and recommend best practices to ensure we only target known, managed clusters?

Regards,
Hung Nguyen

aayrm5
Honored Contributor

Hi @minhhung0507 

The trigger won't be successful if the cluster-id is not provided. I tried to replicate it and found that it throws an error as below: 

{"error_code":"INVALID_PARAMETER_VALUE","message":"Missing required fields: settings.job_clusters.job_cluster_key, settings.job_clusters.new_cluster","details":[{"@type":"type.googleapis.com/google.rpc.RequestInfo","request_id":"b9631652-3101-4b69-95a6-45be477c85a5","serving_data":""}]}

 

Riz

@aayrm5 FYI

@aayrm5  FYI:

400 --- {"error_code":"INVALID_PARAMETER_VALUE","message":"The cluster 0417-102632-cuhsyfm0 is not an all-purpose cluster. existing_cluster_id only supports all-purpose cluster IDs.","details":[{"@type":"type.googleapis.com/google.rpc.RequestInfo","request_id":"3379c0ea-0fc0-405a-808c-7f06759aad83","serving_data":""}]}

Regards,
Hung Nguyen

aayrm5
Honored Contributor

The error depicts that you've passed job_cluster id with the key as existing_cluster_id which is used for interactive clusters. Please pass job_cluster_details if you're using the key `

"job_cluster_key"`. Thank you
Riz

Hi @aayrm5 ,

I have two quick questions I’d appreciate your help with:

  1. Why do we need to use an interactive (all‑purpose) cluster when submitting a job, rather than a job cluster?

  2. I can’t find any reference to a parameter called job_cluster_details in the official docs—could you point me to where it’s documented or share a link?

Thanks in advance for any pointers!

Regards,
Hung Nguyen

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now