a month ago
a month ago
Hi @varshini_reddy ,
What you can do is to add another task at the end of the job to run if one/all dependencies failed.
This task would be a notebook that would make a call to REST API to stop the job.
# Set up the Databricks API endpoint and token
databricks_instance = "<your-databricks-instance>" # e.g., "https://<your-region>.azuredatabricks.net"
job_id = <your_job_id> # Replace with the ID of the job you want to disable
personal_access_token = "<your_personal_access_token>"
# API endpoint to update the job
api_url = f"{databricks_instance}/api/2.1/jobs/update"
# Function to disable the job by setting the pause status to PAUSED
def disable_job():
# Set the payload to pause the job
payload = {
"job_id": job_id,
"new_settings": {
"schedule": {
"pause_status": "PAUSED"
}
}
}
# Send the API request
response = requests.post(
api_url,
headers={"Authorization": f"Bearer {personal_access_token}"},
json=payload
)
# Check if the request was successful
if response.status_code == 200:
print(f"Job {job_id} has been successfully paused.")
else:
print(f"Failed to pause the job. Status Code: {response.status_code}")
print(f"Response: {response.text}")
disable_job()
a month ago
Hi @filipniziol ,
That's quite clever ๐ One suggestion though, instead of using /api/2.1/jobs/update endpoint, I would use /api/2.1/jobs/runs/cancel endpoint passing run_id of for_each task that I want to cancel.
Cancel a run | Jobs API | REST API reference | Databricks on AWS
Cancel a run | Jobs API | REST API reference | Databricks on AWS
a month ago
Hi @szymon_dybczak ,
As far as I understand the point is not to cancel the job run, but to stop the job
a month ago
I already tried this but next task is only starting once all iterations are looped, but i want job to be stopped at the very first faield iteration.
a month ago
Hi @varshini_reddy ,
Could you clarify what you want to achieve:
# Retrieve the job root run ID using dbutils
import json
# Get the context object
context_json = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
# Load the JSON object
context_dict = json.loads(context_json)
# Extract the rootRunId
root_run_id = context_dict.get("rootRunId").get("id")
print(root_run_id)โ
The code:
CREATE TABLE IF NOT EXISTS job_run_failures (
job_root_run_id STRING,
status STRING -- e.g., "SUCCESS", "FAILURE"
) USING DELTA;
import json
from datetime import datetimeโ
# Get the context object
context_json = dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
# Load the JSON object
context_dict = json.loads(context_json)
# Retrieve the current job root run ID
job_root_run_id = context_dict.get("rootRunId").get("id")
# Check if a failure is already logged for this job run
existing_failure = spark.sql(f"""
SELECT COUNT(*)
FROM job_run_failures
WHERE job_root_run_id = '{job_root_run_id}' AND status = 'FAILURE'
""").collect()[0][0]
# If a failure is detected, stop execution
if existing_failure > 0:
dbutils.notebook.exit("Exiting due to a previously logged failure.")
try:
# Your main code logic here
print("Executing main task...")
# Simulate a task failure
# raise ValueError("Simulated task failure") # Uncomment to test failure handling
except Exception as e:
# Log the failure to the Delta table using MERGE to handle concurrency
spark.sql(f"""
MERGE INTO job_run_failures AS target
USING (SELECT '{job_root_run_id}' AS job_root_run_id, 'FAILURE' AS status) AS source
ON target.job_root_run_id = source.job_root_run_id
WHEN NOT MATCHED THEN
INSERT (job_root_run_id, status)
VALUES (source.job_root_run_id, source.status)
""")
# Exit the notebook with an error message
dbutils.notebook.exit(f"FAILED: {str(e)}")
a month ago
yes its 3
does this stop the whole job run or just the NB?
The whole context im talking about is in the workflow context where workflow should stop if the loop task had atleast 1 failure
a month ago - last edited a month ago
It would not stop the iterations, but effectively skip them.
Let's say you have 20 iterations
a month ago
how does this work in databricks task or for loop? bcoz rootrunid is with respect to Databricks Notebooks right?
a month ago - last edited a month ago
Root Run Id = this is the run id of the job that is running.
You run the job, it is assigned Run Id.
But then you run some iterations inside For Each -> those iterations get additionally their own Run Id.
So in order to make the solution work you need to check if there is an error of one of the iterations sharing the same Root Run Id (this is why the code I shared is based on the Root Run Id, and not just Run Id).
a month ago
Got it,
But here you are just stopping the NB execution right, but i want the for loop iteration to get stooped for that rootrunid. Let's say i have a for each loop in workflow inside which i have a job, for each iteration to be stooped/failed if one of the iteration fails.
if existing_failure > 0:
dbutils.notebook.exit("Exiting due to a previously logged failure.").
i tried something like this to cancel the job based on run id for that loop job but its cancelling , i need to make it fail,not cancel:
a month ago
If you want to fail the notebook then instead of dbutils.notebook.exit simply raise an exception:
if existing_failure > 0:
raise Exception("Exiting due to a previously logged failure.").
a month ago
failing the NB is just failing that particular iteration, it isn't stopping the loop. I have to stop the for-each.
a month ago
Hi @varshini_reddy ,
There is no option to stop all the other iterations when for each is running and one of the iterations failed.
This is why the shared workaround, that will simply skip/fail all the next iterations without doing anything.
You can fail the job terminate the for each is executed.
a month ago
This is why the shared workaround, that will simply skip/fail all the next iterations without doing anything.?
which one?, all above will only exit the NB right?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group