cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Airflow - How to pull XComs value in the notebook task?

Choolanadu
New Contributor

Using AIrflow, I have created a DAG with a sequence of notebook tasks. The first notebook returns a batch id; the subsequent notebook tasks need this batch_id.

I am using the DatabricksSubmitRunOperator to run the notebook task. This operator pushes two values(run_id,run_page_url) to airflow Xcom. I used the run_id and using the data bricks rest API, I retrieved the notebook output.

The Xcom value retrieval works within the callable function of the python operator.

How would I do the same operation in the DatabricksSubmitRunOperator?

How do I get the handle to the task instance inside the notebook?

Working python operator code

def pull_databricks_task_run_xcom_values(**kwargs):

  ti = kwargs['ti']

  run_id = ti.xcom_pull(key='run_id', task_ids="notebook_task1")

  logger.info("======pulled value based on run_id:===== " +str(run_id))

  run_page_url = ti.xcom_pull(key='run_page_url', task_ids='initiate_batch')

  logger.info("======pulled value based on run_page_url:===== " +str(run_page_url))

with DAG('trigger_db_notebooks_tasks',start_date = days_ago(2),schedule_interval = None,default_args = default_args) as dag:

  #Task 1

  notebook_task1=DatabricksSubmitRunOperator(task_id="notebook_task1",notebook_task=notebook_task1_dict,new_cluster=default_cluster_conf['clusters_spec'],do_xcom_push=True)

  #Task 1

  xcom_pull=PythonOperator(task_id='download_files',provide_context=True,python_callable=pull_databricks_task_run_xcom_values)

   

1 REPLY 1

daniel_sahal
Esteemed Contributor

From what I understand - you want to pass a run_id parameter to the second notebook task?

You can:

  1. Create a widget param inside your databricks notebook (https://docs.databricks.com/notebooks/widgets.html) that will consume your run_id
  2. Pass the parameter in DatabricksSubmitRunOperator (example here: https://stackoverflow.com/questions/61542653/airflow-databrickssubmitrunoperator-does-not-take-in-no...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group