cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Get the triggering task's name

ttamas
New Contributor III

Hi,

I have tasks that depend on each other. I would like to get variables from task1 that triggers task2.

This is how I solved for my problem:

Following suggestion in https://community.databricks.com/t5/data-engineering/how-to-pass-parameters-to-a-quot-job-as-task-qu... I can set taskValues in task1, and get taskValues in task2

task1 sets it like:

 

dbutils.jobs.taskValues.set(key, value)

 

task2 gets it like:

 

dbutils.jobs.taskValues.get(taskKey, key, default, debugValue)

 

, but in task2 I must know the upstream task's name (task1), because taskKey is expecting that.

A question to Databricks development: Why? Why does task2 have to know the {{task.name}} of the triggering task just to get its taskValues? It would be much easier to have it in a context variable just like we know {{task.name}} or {{job.id}}.

My solution to get the name of the triggering task:

I defined task parameters so the code in task2 knows its own {{task.name}} and {{job.id}}:

 

 

{
  "task_name": "{{task.name}}",
  "job_id": "{{job.id}}"
}

 

 

Then inside the code in task2 I made sure to get the job id and task name into the python execution context:

 

run_as_job = False
try:
    job_id = dbutils.widgets.get("job_id")
    task_name = dbutils.widgets.get("task_name")
    run_as_job = True
except:
    print('Pass these parameters to the task in workflows job: { "job_id": "{{job.id}}", "task_name": "{{task.name}}" }')

 

Then I called the API to get job configuration details and parsed the first element of task2's 'depends_on' from there.

 

import requests
import json

def get_triggering_task_name(job_id):

    API_URL = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
    TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
    params = {'job_id': job_id}

    try:
        response = requests.get( API_URL + '/api/2.1/jobs/get'
                                ,headers={"Authorization": "Bearer " + TOKEN}
                                ,params=params
                                )
      
        response.raise_for_status()
        #print(json.dumps(response.json(), indent=4))
  
    except:
        print(f'Error: {response.json()["error_code"]}: {response.json()["message"]}')
        raise

    triggering_task = [i for i in response.json()['settings']['tasks'] if i['task_key'] == task_name][0]['depends_on'][0]['task_key']
    return triggering_task

 

The whole exercise above was only to make it possible to:

 

dbutils.jobs.taskValues.get(taskKey=get_triggering_task_name(job_id), key='src_table', default='defautlvalue', debugValue='debugvalue'))

 

This is very complicated. Is there a simpler way to access taskValues set by the triggering task if we don't hardcode its name?

 

tt
2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @ttamasThank you for sharing your approach! It’s true that handling task dependencies and passing values between tasks in Databricks can sometimes be complex.

  • Databricks now supports dynamic value references in notebooks. Instead of using dbutils.jobs.taskValues.get(), you can directly reference task values set in upstream tasks using expressions like {{tasks.Get_user_data.values.name}}.
  • This approach avoids hardcoding task names and simplifies the process.

ttamas
New Contributor III

Hi @Kaniz_Fatma,

Thank you for your response.

I understand you can use {{tasks.[task_name].values.[value_name]}} as described in Pass context about job runs into job tasks | Databricks on AWS (example in the attached screenshot), but this still requires knowing and hardcoding the upstream task's name when you define input parameters for the downstream task.

Namely, when you say {{tasks.Get_user_data.values.name}} expression would help me, 'Get_user_data' is still the hardcoded name of the previous task. I am looking for a simple solution where I don't have to hardcode the upstream task's name, just get the taskvalues that it set. I the name of the taskValues keys that I am looking for is the requirement of the downstream task so the previous task must have set that key. But the downstream task does not know the upstream task's name, hence can't get the values that it has set. Following my way the notebook does not have to know the upstream task's name.

So in short, I am looking for a simple solution where I don't have to care about the upstream task's name.

Calling the jobs/get api endpoint works the best if there is only one upstream task but it could also be extended to work with an array of upstream tasks.

Thank you

tt
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!