Using AIrflow, I have created a DAG with a sequence of notebook tasks. The first notebook returns a batch id; the subsequent notebook tasks need this batch_id.
I am using the DatabricksSubmitRunOperator to run the notebook task. This operator pushes two values(run_id,run_page_url) to airflow Xcom. I used the run_id and using the data bricks rest API, I retrieved the notebook output.
The Xcom value retrieval works within the callable function of the python operator.
How would I do the same operation in the DatabricksSubmitRunOperator?
How do I get the handle to the task instance inside the notebook?
Working python operator code
def pull_databricks_task_run_xcom_values(**kwargs):
ti = kwargs['ti']
run_id = ti.xcom_pull(key='run_id', task_ids="notebook_task1")
logger.info("======pulled value based on run_id:===== " +str(run_id))
run_page_url = ti.xcom_pull(key='run_page_url', task_ids='initiate_batch')
logger.info("======pulled value based on run_page_url:===== " +str(run_page_url))
with DAG('trigger_db_notebooks_tasks',start_date = days_ago(2),schedule_interval = None,default_args = default_args) as dag:
#Task 1
notebook_task1=DatabricksSubmitRunOperator(task_id="notebook_task1",notebook_task=notebook_task1_dict,new_cluster=default_cluster_conf['clusters_spec'],do_xcom_push=True)
#Task 1
xcom_pull=PythonOperator(task_id='download_files',provide_context=True,python_callable=pull_databricks_task_run_xcom_values)