cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to pass all dag_run.conf parameters to python_wheel_task

Bartek
Contributor

I want to trigger Databricks job from Airflow using DatabricksSubmitRunDeferrableOperator and I need to pass configuration params. Here is excerpt from my code (definition is not complete, only crucial properties):

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunDeferrableOperator
 
 
class PythonWheelOperator(DatabricksSubmitRunDeferrableOperator):
   def __init__(self, action_name, **kwargs):
       DatabricksSubmitRunDeferrableOperator.__init__(self,
                                                      databricks_conn_id=dag_config_json["databricksConnId"],
                                                      task_id=action_name,
                                                      tasks=[
                                                           {
                                                               "python_wheel_task": {
                                                                   "package_name": "PName",
                                                                   "entry_point": "ReviewSuppliers",
                                                                   "named_parameters": "{{dict(dag_run.conf)}}"
                                                               },
                                                               "libraries": [
                                                                   {
                                                                       "whl": "dbfs:/FileStore/path-to-whl.whl"
                                                                   }
                                                      )

Error I got:

airflow.exceptions.AirflowException: Response: b'{"error_code":"MALFORMED_REQUEST","message":"Could not parse request object: Expected \'START_OBJECT\' not \'VALUE_STRING\'\\n at [Source: (ByteArrayInputStream); line: 1, column: 187]\\n at [Source: java.io.ByteArrayInputStream@47fb05c6; line: 1, column: 187]"}', Status Code: 400

How should I change line:  "named_parameters": "{{dict(dag_run.conf)}}" to properly pass this config parameters?

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.