Merge operation with ouputMode update in autoloader databricks

db_knowledge
New Contributor II

Hi team,

I am trying to do merge operation along with outputMode('update') and foreachmode byusing below code but it is not updating data could you please any help on this?

output=(casting_df.writeStream.format('delta').trigger(availableNow=True).option(
    "checkpointLocation", f"{output_path}/checkpoint"
    ).option("mergeSchema", "true").foreachBatch(lambda casting_df,batch_id:upsertToDelta(casting_df,batch_id)).outputMode("update").start()
    )
print(f"{target_schema}.{table_name}")
def upsertToDelta(casting_df,bacthId😞
  casting_df.persist()
  delta_df = DeltaTable.forName(spark, f"{target_schema}.{table_name}")
  (delta_df.alias("t")\
  .merge(
    source = casting_df.alias("m"),
    condition = "m.key_column=t.key_column"
    )\
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()
  )
  casting_df.unpersist()
Any help is appreciated

anardinelli
Databricks Employee
Databricks Employee

Hi @db_knowledge 

Please try .foreachBatch(upsertToDelta) instead of creating the lambda inside it.

Best,

Alessandro

Hi @anardinelli  tried with foreachBatch(upsertToDelta) instead of creating the lambda inside it. but not working