- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-10-2025 01:39 AM
Since .foreachBatch() is "hijacking" the stream and executing arbitrary code in it, do I need to specify Output mode and Path:
(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")
.foreachBatch(data_load)
.outputMode('update')
.option('path', output_filepath)
.start()
)Or I can do it without it:
(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")
.foreachBatch(data_load)
.start()
)code for load_data:
def data_load(df, batchId):
(target.alias("target").merge(
source = df.alias("source"),
condition = "target.key = source.key"
).whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-11-2025 09:47 AM - edited 06-11-2025 09:49 AM
Hi @IGRACH
Good day!!
When you're using .foreachBatch(), it "hijacks" the stream and gives you the DataFrame for each micro-batch. Inside that function, you define exactly what happens — whether you merge, update, insert, or write somewhere else.
Because of this:
-
Spark doesn’t need to know how to output the data — it delegates that entirely to your code.
-
So,
outputMode("append")/outputMode("update")/outputMode("complete")become irrelevant.
So you need not specify outputMode(...)
You can use the below approach.
df.writeStream \
.trigger(availableNow=True) \
.option("checkpointLocation", "check_point_location") \
.foreachBatch(data_load) \
.start()
Kindly let me know if you have any questions on this.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-11-2025 12:01 PM
Thanks xD