cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Airflow - How to pull XComs value in the notebook task?

Choolanadu
New Contributor

Using AIrflow, I have created a DAG with a sequence of notebook tasks. The first notebook returns a batch id; the subsequent notebook tasks need this batch_id.

I am using the DatabricksSubmitRunOperator to run the notebook task. This operator pushes two values(run_id,run_page_url) to airflow Xcom. I used the run_id and using the data bricks rest API, I retrieved the notebook output.

The Xcom value retrieval works within the callable function of the python operator.

How would I do the same operation in the DatabricksSubmitRunOperator?

How do I get the handle to the task instance inside the notebook?

Working python operator code

def pull_databricks_task_run_xcom_values(**kwargs):

  ti = kwargs['ti']

  run_id = ti.xcom_pull(key='run_id', task_ids="notebook_task1")

  logger.info("======pulled value based on run_id:===== " +str(run_id))

  run_page_url = ti.xcom_pull(key='run_page_url', task_ids='initiate_batch')

  logger.info("======pulled value based on run_page_url:===== " +str(run_page_url))

with DAG('trigger_db_notebooks_tasks',start_date = days_ago(2),schedule_interval = None,default_args = default_args) as dag:

  #Task 1

  notebook_task1=DatabricksSubmitRunOperator(task_id="notebook_task1",notebook_task=notebook_task1_dict,new_cluster=default_cluster_conf['clusters_spec'],do_xcom_push=True)

  #Task 1

  xcom_pull=PythonOperator(task_id='download_files',provide_context=True,python_callable=pull_databricks_task_run_xcom_values)

   

1 REPLY 1

daniel_sahal
Esteemed Contributor

From what I understand - you want to pass a run_id parameter to the second notebook task?

You can:

  1. Create a widget param inside your databricks notebook (https://docs.databricks.com/notebooks/widgets.html) that will consume your run_id
  2. Pass the parameter in DatabricksSubmitRunOperator (example here: https://stackoverflow.com/questions/61542653/airflow-databrickssubmitrunoperator-does-not-take-in-no...
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!