I want to trigger Databricks job from Airflow using DatabricksSubmitRunDeferrableOperator and I need to pass configuration params. Here is excerpt from my code (definition is not complete, only crucial properties):
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunDeferrableOperator
class PythonWheelOperator(DatabricksSubmitRunDeferrableOperator):
def __init__(self, action_name, **kwargs):
DatabricksSubmitRunDeferrableOperator.__init__(self,
databricks_conn_id=dag_config_json["databricksConnId"],
task_id=action_name,
tasks=[
{
"python_wheel_task": {
"package_name": "PName",
"entry_point": "ReviewSuppliers",
"named_parameters": "{{dict(dag_run.conf)}}"
},
"libraries": [
{
"whl": "dbfs:/FileStore/path-to-whl.whl"
}
)
Error I got:
airflow.exceptions.AirflowException: Response: b'{"error_code":"MALFORMED_REQUEST","message":"Could not parse request object: Expected \'START_OBJECT\' not \'VALUE_STRING\'\\n at [Source: (ByteArrayInputStream); line: 1, column: 187]\\n at [Source: java.io.ByteArrayInputStream@47fb05c6; line: 1, column: 187]"}', Status Code: 400
How should I change line: "named_parameters": "{{dict(dag_run.conf)}}" to properly pass this config parameters?