cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

In databricks workflows, can we stop the loop run if one of the iteration fails?

varshini_reddy
New Contributor III
 
14 REPLIES 14

filipniziol
Contributor

Hi @varshini_reddy ,

What you can do is to add another task at the end of the job to run if one/all dependencies failed.

filipniziol_0-1725991754419.png
This task would be a notebook that would make a call to REST API to stop the job.

# Set up the Databricks API endpoint and token
databricks_instance = "<your-databricks-instance>"  # e.g., "https://<your-region>.azuredatabricks.net"
job_id = <your_job_id>  # Replace with the ID of the job you want to disable
personal_access_token = "<your_personal_access_token>"

# API endpoint to update the job
api_url = f"{databricks_instance}/api/2.1/jobs/update"

# Function to disable the job by setting the pause status to PAUSED
def disable_job():
    # Set the payload to pause the job
    payload = {
        "job_id": job_id,
        "new_settings": {
            "schedule": {
                "pause_status": "PAUSED"
            }
        }
    }

    # Send the API request
    response = requests.post(
        api_url,
        headers={"Authorization": f"Bearer {personal_access_token}"},
        json=payload
    )

    # Check if the request was successful
    if response.status_code == 200:
        print(f"Job {job_id} has been successfully paused.")
    else:
        print(f"Failed to pause the job. Status Code: {response.status_code}")
        print(f"Response: {response.text}")

disable_job()



 

 

Hi @filipniziol ,

That's quite clever 🙂 One suggestion though, instead of using /api/2.1/jobs/update endpoint, I would use /api/2.1/jobs/runs/cancel endpoint passing run_id of for_each task that I want to cancel.
Cancel a run | Jobs API | REST API reference | Databricks on AWS

Cancel a run | Jobs API | REST API reference | Databricks on AWS

Hi @szymon_dybczak ,
As far as I understand the point is not to cancel the job run, but to stop the job 

I already tried this but next task is only starting once all iterations are looped, but i want job to be stopped at the very first faield iteration. 

filipniziol
Contributor

Hi @varshini_reddy   ,

Could you clarify what you want to achieve:

  1. If the job run fails, then you want to stop the job (so it does not run the next time). That what covered my answer
  2. If the job run fails, they you want to stop the job run. That is what covered @szymon_dybczak 
  3. You have a "For each" task in your job, and you want to stop the job run, if one of the iterations failed? 
    • If that's the case then you can create a Delta table and to keep there Job Run Failures. 
    • It can basically have just 2 columns, Job Root Run Id and Status
    • In your notebook that is being executed by the For each have if statement to check first whether there is an entry in the table where Job Root Run Id = <current job root run id>  and Status = Failure
    • Wrap the remaining code in your notebook in try/except clause. In except insert the data into your Delta table
    • You can get the Job Root Run Id using this code snippet: 
# Retrieve the job root run ID using dbutils
import json
# Get the context object
context_json = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()

# Load the JSON object
context_dict = json.loads(context_json)

# Extract the rootRunId
root_run_id = context_dict.get("rootRunId").get("id")

print(root_run_id)​

The code:

CREATE TABLE IF NOT EXISTS job_run_failures (
job_root_run_id STRING,
status STRING -- e.g., "SUCCESS", "FAILURE"
) USING DELTA;
import json
from datetime import datetime​
# Get the context object
context_json = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()

# Load the JSON object
context_dict = json.loads(context_json)

# Retrieve the current job root run ID
job_root_run_id = context_dict.get("rootRunId").get("id")

# Check if a failure is already logged for this job run
existing_failure = spark.sql(f"""
SELECT COUNT(*)
FROM job_run_failures
WHERE job_root_run_id = '{job_root_run_id}' AND status = 'FAILURE'
""").collect()[0][0]

# If a failure is detected, stop execution
if existing_failure > 0:
dbutils.notebook.exit("Exiting due to a previously logged failure.")

try:
# Your main code logic here
print("Executing main task...")
# Simulate a task failure
# raise ValueError("Simulated task failure") # Uncomment to test failure handling

except Exception as e:
# Log the failure to the Delta table using MERGE to handle concurrency
spark.sql(f"""
MERGE INTO job_run_failures AS target
USING (SELECT '{job_root_run_id}' AS job_root_run_id, 'FAILURE' AS status) AS source
ON target.job_root_run_id = source.job_root_run_id
WHEN NOT MATCHED THEN
INSERT (job_root_run_id, status)
VALUES (source.job_root_run_id, source.status)
""")
# Exit the notebook with an error message
dbutils.notebook.exit(f"FAILED: {str(e)}")

 

yes its 3

does this stop the whole job run or just the NB?

 

The whole context im talking about is in the workflow context where workflow should stop if the loop task had atleast 1 failure

It would not stop the iterations, but effectively skip them.
Let's say you have 20 iterations

  • 1-3 were successful
  • 4 failed -> now the Failure is added to the table
  • 15-20 iterations are run, but they check there is a failure, so effectively they do nothing, they just exit the notebook.

 how does this work in databricks task or for loop? bcoz rootrunid is with respect to Databricks Notebooks right?

filipniziol
Contributor

Root Run Id = this is the run id of the job that is running.
You run the job, it is assigned Run Id.
But then you run some iterations inside For Each -> those iterations get additionally their own Run Id.
So in order to make the solution work you need to check if there is an error of one of the iterations sharing the same Root Run Id (this is why the code I shared is based on the Root Run Id, and not just Run Id).

Got it,   

But here you are just stopping the NB execution right, but i want the for loop iteration to get stooped for that rootrunid. Let's say i have a for each loop in workflow inside which i have a job, for each iteration to be stooped/failed if one of the iteration fails.

if existing_failure > 0:
dbutils.notebook.exit("Exiting due to a previously logged failure.").

 

 

i tried something like this to cancel the job based on run id for that loop job but its cancelling , i need to make it fail,not cancel:

 

import requests
def cancel_job(run_id, token😞
    url = f"https://<databricks-instance>/api/2.1/jobs/runs/cancel"
    headers = {"Authorization": f"Bearer {token}"}
    data = {"run_id": run_id}
    response = requests.post(url, headers=headers, json=data)
    return response.status_code

 

filipniziol
Contributor

If you want to fail the notebook then instead of dbutils.notebook.exit simply raise an exception:

if existing_failure > 0:
raise Exception("Exiting due to a previously logged failure.").

failing the NB is just failing that particular iteration, it isn't stopping the loop. I have to stop the for-each.

filipniziol
Contributor

Hi @varshini_reddy ,
There is no option to stop all the other iterations when for each is running and one of the iterations failed.
This is why the shared workaround, that will simply skip/fail all the next iterations without doing anything.

You can fail the job terminate the for each is executed.

This is why the shared workaround, that will simply skip/fail all the next iterations without doing anything.?

which one?, all above will only exit the NB right?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group