cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Airflow - How to pull XComs value in the notebook task?

Choolanadu
New Contributor

Using AIrflow, I have created a DAG with a sequence of notebook tasks. The first notebook returns a batch id; the subsequent notebook tasks need this batch_id.

I am using the DatabricksSubmitRunOperator to run the notebook task. This operator pushes two values(run_id,run_page_url) to airflow Xcom. I used the run_id and using the data bricks rest API, I retrieved the notebook output.

The Xcom value retrieval works within the callable function of the python operator.

How would I do the same operation in the DatabricksSubmitRunOperator?

How do I get the handle to the task instance inside the notebook?

Working python operator code

def pull_databricks_task_run_xcom_values(**kwargs):

  ti = kwargs['ti']

  run_id = ti.xcom_pull(key='run_id', task_ids="notebook_task1")

  logger.info("======pulled value based on run_id:===== " +str(run_id))

  run_page_url = ti.xcom_pull(key='run_page_url', task_ids='initiate_batch')

  logger.info("======pulled value based on run_page_url:===== " +str(run_page_url))

with DAG('trigger_db_notebooks_tasks',start_date = days_ago(2),schedule_interval = None,default_args = default_args) as dag:

  #Task 1

  notebook_task1=DatabricksSubmitRunOperator(task_id="notebook_task1",notebook_task=notebook_task1_dict,new_cluster=default_cluster_conf['clusters_spec'],do_xcom_push=True)

  #Task 1

  xcom_pull=PythonOperator(task_id='download_files',provide_context=True,python_callable=pull_databricks_task_run_xcom_values)

   

1 REPLY 1

daniel_sahal
Honored Contributor III

From what I understand - you want to pass a run_id parameter to the second notebook task?

You can:

  1. Create a widget param inside your databricks notebook (https://docs.databricks.com/notebooks/widgets.html) that will consume your run_id
  2. Pass the parameter in DatabricksSubmitRunOperator (example here: https://stackoverflow.com/questions/61542653/airflow-databrickssubmitrunoperator-does-not-take-in-no...
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.