I have tasks that depend on each other. I would like to get variables from task1 that triggers task2.
This is how I solved for my problem:
Following suggestion in https://community.databricks.com/t5/data-engineering/how-to-pass-parameters-to-a-quot-job-as-task-qu... I can set taskValues in task1, and get taskValues in task2
task1 sets it like:
dbutils.jobs.taskValues.set(key, value)
task2 gets it like:
dbutils.jobs.taskValues.get(taskKey, key, default, debugValue)
, but in task2 I must know the upstream task's name (task1), because taskKey is expecting that.
A question to Databricks development: Why? Why does task2 have to know the {{task.name}} of the triggering task just to get its taskValues? It would be much easier to have it in a context variable just like we know {{task.name}} or {{job.id}}.
My solution to get the name of the triggering task:
I defined task parameters so the code in task2 knows its own {{task.name}} and {{job.id}}:
"task_name": "{{task.name}}",
"job_id": "{{job.id}}"
Then inside the code in task2 I made sure to get the job id and task name into the python execution context:
run_as_job = False
job_id = dbutils.widgets.get("job_id")
task_name = dbutils.widgets.get("task_name")
run_as_job = True
print('Pass these parameters to the task in workflows job: { "job_id": "{{job.id}}", "task_name": "{{task.name}}" }')
Then I called the API to get job configuration details and parsed the first element of task2's 'depends_on' from there.
import requests
import json
def get_triggering_task_name(job_id):
API_URL = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
params = {'job_id': job_id}
response = requests.get( API_URL + '/api/2.1/jobs/get'
,headers={"Authorization": "Bearer " + TOKEN}
#print(json.dumps(response.json(), indent=4))
print(f'Error: {response.json()["error_code"]}: {response.json()["message"]}')
triggering_task = [i for i in response.json()['settings']['tasks'] if i['task_key'] == task_name][0]['depends_on'][0]['task_key']
return triggering_task
The whole exercise above was only to make it possible to:
dbutils.jobs.taskValues.get(taskKey=get_triggering_task_name(job_id), key='src_table', default='defautlvalue', debugValue='debugvalue'))
This is very complicated. Is there a simpler way to access taskValues set by the triggering task if we don't hardcode its name?