Specifing Output mode and Path when using For Each Batch

IGRACH
New Contributor III

Since .foreachBatch() is "hijacking" the stream and executing arbitrary code in it, do I need to specify Output mode and Path:

(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")

.foreachBatch(data_load)

.outputMode('update')
.option('path', output_filepath)
.start()
)

 Or I can do it without it:

(df.writeStream
  .format("delta")
  .trigger(availableNow = True)
  .option("checkpointLocation", "check_point_location")

  .foreachBatch(data_load)

  .start()
  
)

code for load_data:

def data_load(df, batchId):
  (target.alias("target").merge(
    source = df.alias("source"),
    condition = "target.key = source.key"
  ).whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
  )

 

Saritha_S
Databricks Employee
Databricks Employee

Hi @IGRACH 

Good day!!

When you're using .foreachBatch(),  it "hijacks" the stream and gives you the DataFrame for each micro-batch. Inside that function, you define exactly what happens — whether you merge, update, insert, or write somewhere else.

Because of this:

  • Spark doesn’t need to know how to output the data — it delegates that entirely to your code.

  • So, outputMode("append") / outputMode("update") / outputMode("complete") become irrelevant.

So you need not specify outputMode(...)

You can use the below approach. 

df.writeStream \
.trigger(availableNow=True) \
.option("checkpointLocation", "check_point_location") \
.foreachBatch(data_load) \
.start()

Kindly let me know if you have any questions on this. 

 

View solution in original post

Branislav
New Contributor II

Thanks xD