Airflow - How to pull XComs value in the notebook task?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-17-2022 11:16 AM
Using AIrflow, I have created a DAG with a sequence of notebook tasks. The first notebook returns a batch id; the subsequent notebook tasks need this batch_id.
I am using the DatabricksSubmitRunOperator to run the notebook task. This operator pushes two values(run_id,run_page_url) to airflow Xcom. I used the run_id and using the data bricks rest API, I retrieved the notebook output.
The Xcom value retrieval works within the callable function of the python operator.
How would I do the same operation in the DatabricksSubmitRunOperator?
How do I get the handle to the task instance inside the notebook?
Working python operator code
def pull_databricks_task_run_xcom_values(**kwargs):
ti = kwargs['ti']
run_id = ti.xcom_pull(key='run_id', task_ids="notebook_task1")
logger.info("======pulled value based on run_id:===== " +str(run_id))
run_page_url = ti.xcom_pull(key='run_page_url', task_ids='initiate_batch')
logger.info("======pulled value based on run_page_url:===== " +str(run_page_url))
with DAG('trigger_db_notebooks_tasks',start_date = days_ago(2),schedule_interval = None,default_args = default_args) as dag:
#Task 1
notebook_task1=DatabricksSubmitRunOperator(task_id="notebook_task1",notebook_task=notebook_task1_dict,new_cluster=default_cluster_conf['clusters_spec'],do_xcom_push=True)
#Task 1
xcom_pull=PythonOperator(task_id='download_files',provide_context=True,python_callable=pull_databricks_task_run_xcom_values)
- Labels:
-
Airflow
-
Notebook Task
-
Task
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-22-2022 05:32 AM
From what I understand - you want to pass a run_id parameter to the second notebook task?
You can:
- Create a widget param inside your databricks notebook (https://docs.databricks.com/notebooks/widgets.html) that will consume your run_id
- Pass the parameter in DatabricksSubmitRunOperator (example here: https://stackoverflow.com/questions/61542653/airflow-databrickssubmitrunoperator-does-not-take-in-no...

