Since .foreachBatch() is "hijacking" the stream and executing arbitrary code in it, do I need to specify Output mode and Path:
(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")
.foreachBatch(data_load)
.outputMode('update')
.option('path', output_filepath)
.start()
)
Or I can do it without it:
(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")
.foreachBatch(data_load)
.start()
)
code for load_data:
def data_load(df, batchId):
(target.alias("target").merge(
source = df.alias("source"),
condition = "target.key = source.key"
).whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)