Get the triggering task's name

ttamas
New Contributor III

Hi,

I have tasks that depend on each other. I would like to get variables from task1 that triggers task2.

This is how I solved for my problem:

Following suggestion in https://community.databricks.com/t5/data-engineering/how-to-pass-parameters-to-a-quot-job-as-task-qu... I can set taskValues in task1, and get taskValues in task2

task1 sets it like:

 

dbutils.jobs.taskValues.set(key, value)

 

task2 gets it like:

 

dbutils.jobs.taskValues.get(taskKey, key, default, debugValue)

 

, but in task2 I must know the upstream task's name (task1), because taskKey is expecting that.

A question to Databricks development: Why? Why does task2 have to know the {{task.name}} of the triggering task just to get its taskValues? It would be much easier to have it in a context variable just like we know {{task.name}} or {{job.id}}.

My solution to get the name of the triggering task:

I defined task parameters so the code in task2 knows its own {{task.name}} and {{job.id}}:

 

 

{
  "task_name": "{{task.name}}",
  "job_id": "{{job.id}}"
}

 

 

Then inside the code in task2 I made sure to get the job id and task name into the python execution context:

 

run_as_job = False
try:
    job_id = dbutils.widgets.get("job_id")
    task_name = dbutils.widgets.get("task_name")
    run_as_job = True
except:
    print('Pass these parameters to the task in workflows job: { "job_id": "{{job.id}}", "task_name": "{{task.name}}" }')

 

Then I called the API to get job configuration details and parsed the first element of task2's 'depends_on' from there.

 

import requests
import json

def get_triggering_task_name(job_id):

    API_URL = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
    TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
    params = {'job_id': job_id}

    try:
        response = requests.get( API_URL + '/api/2.1/jobs/get'
                                ,headers={"Authorization": "Bearer " + TOKEN}
                                ,params=params
                                )
      
        response.raise_for_status()
        #print(json.dumps(response.json(), indent=4))
  
    except:
        print(f'Error: {response.json()["error_code"]}: {response.json()["message"]}')
        raise

    triggering_task = [i for i in response.json()['settings']['tasks'] if i['task_key'] == task_name][0]['depends_on'][0]['task_key']
    return triggering_task

 

The whole exercise above was only to make it possible to:

 

dbutils.jobs.taskValues.get(taskKey=get_triggering_task_name(job_id), key='src_table', default='defautlvalue', debugValue='debugvalue'))

 

This is very complicated. Is there a simpler way to access taskValues set by the triggering task if we don't hardcode its name?

 

tt