cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Specifing Output mode and Path when using For Each Batch

IGRACH
New Contributor III

Since .foreachBatch() is "hijacking" the stream and executing arbitrary code in it, do I need to specify Output mode and Path:

(df.writeStream
.format("delta")
.trigger(availableNow = True)
.option("checkpointLocation", "check_point_location")

.foreachBatch(data_load)

.outputMode('update')
.option('path', output_filepath)
.start()
)

 Or I can do it without it:

(df.writeStream
  .format("delta")
  .trigger(availableNow = True)
  .option("checkpointLocation", "check_point_location")

  .foreachBatch(data_load)

  .start()
  
)

code for load_data:

def data_load(df, batchId):
  (target.alias("target").merge(
    source = df.alias("source"),
    condition = "target.key = source.key"
  ).whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
  )

 

1 ACCEPTED SOLUTION

Accepted Solutions

Saritha_S
Databricks Employee
Databricks Employee

Hi @IGRACH 

Good day!!

When you're using .foreachBatch(),  it "hijacks" the stream and gives you the DataFrame for each micro-batch. Inside that function, you define exactly what happens — whether you merge, update, insert, or write somewhere else.

Because of this:

  • Spark doesn’t need to know how to output the data — it delegates that entirely to your code.

  • So, outputMode("append") / outputMode("update") / outputMode("complete") become irrelevant.

So you need not specify outputMode(...)

You can use the below approach. 

df.writeStream \
.trigger(availableNow=True) \
.option("checkpointLocation", "check_point_location") \
.foreachBatch(data_load) \
.start()

Kindly let me know if you have any questions on this. 

 

View solution in original post

2 REPLIES 2

Saritha_S
Databricks Employee
Databricks Employee

Hi @IGRACH 

Good day!!

When you're using .foreachBatch(),  it "hijacks" the stream and gives you the DataFrame for each micro-batch. Inside that function, you define exactly what happens — whether you merge, update, insert, or write somewhere else.

Because of this:

  • Spark doesn’t need to know how to output the data — it delegates that entirely to your code.

  • So, outputMode("append") / outputMode("update") / outputMode("complete") become irrelevant.

So you need not specify outputMode(...)

You can use the below approach. 

df.writeStream \
.trigger(availableNow=True) \
.option("checkpointLocation", "check_point_location") \
.foreachBatch(data_load) \
.start()

Kindly let me know if you have any questions on this. 

 

Branislav
New Contributor II

Thanks xD