Hi team,
I am trying to do merge operation along with outputMode('update') and foreachmode byusing below code but it is not updating data could you please any help on this?
output=(casting_df.writeStream.format('delta').trigger(availableNow=True).option(
"checkpointLocation", f"{output_path}/checkpoint"
).option("mergeSchema", "true").foreachBatch(lambda casting_df,batch_id:upsertToDelta(casting_df,batch_id)).outputMode("update").start()
)
print(f"{target_schema}.{table_name}")
def upsertToDelta(casting_df,bacthId😞
casting_df.persist()
delta_df = DeltaTable.forName(spark, f"{target_schema}.{table_name}")
(delta_df.alias("t")\
.merge(
source = casting_df.alias("m"),
condition = "m.key_column=t.key_column"
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
)
casting_df.unpersist()
Any help is appreciated