cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

autoloader strategy write ( APPEND, MERGE, UPDATE, COMPLETE, OVERWRITE)

seefoods
Contributor

Hello Guys,  

I want to know if operations like overwrite, merge, update in static write its the same when we using autoloader. I'm so confusing about the behavior of mode like ( complete, update and append). 
After that, i want to know what its the correct strategy to write the output data indempotency on our delta lake. I have seen on documenation databricks we can use foreachBatch(lambda functions) or we can writeStream directly. 

Cordially, 


4 ACCEPTED SOLUTIONS

Accepted Solutions

lingareddy_Alva
Honored Contributor II

Sankha_Mondal
New Contributor II

While writing stream data :

spark.table('table_name').writeStream works fine.

important settings are required for that:

A. Checkpointing :

1. Stores the current status of the streaming job with combination of "Write ahead log".

2. These checkpoint locations are unique.

B. Output Mode:

1. outputMode('append') : Only new records are added. default setting.

2. outputMode('completed') : Recalculated each time a write is triggered. Used mostly for aggregated tables.

3. outputMode('update') : Allows to perform complex upserts in streaming data. requires .start() command too.

C. Trigger Interval:

1. trigger(): if unspecified then, processingTime='500ms". Automatically detect and process all data in the source which are added post last run.

2. trigger(processingTime='2 minutes"): processes available data in micro batch at user specified time.

3. trigger(once=True): process all available data in one go (single batch).

4. trigger(availableNow=True): process all available data in multiple batches then stops. good option as long running steams can be bypassed with one fault tolerance guarantee.

D. .awaitTermination(): This option blocks execution of next cell until the incremental batch write has succeeded.

View solution in original post

seefoods
Contributor

chanukya-pekala
New Contributor III

Thanks for discussion. I have a tiny suggestion. Based on my experience working with streaming loads, I often find the checkpoint location hard enough to actually check the offset information or delete that directory for fresh load of data. Hence I have used TBLPROPERTIES ('checkpointLocation' = '/path/to/checkpoint') by altering the table. Any user will then be able to know where is checkpoint of this particular table. Databricks/Spark by principle do not want to tightly couple the checkpoint with the table for scalability purposes but having it in some reference to table makes more sense, Without having this critical piece of info, table user needs to reach out to developer only to get the location.

Chanukya

View solution in original post

6 REPLIES 6

lingareddy_Alva
Honored Contributor II

Hi @seefoods 

Let me break down your questions about Delta Lake write operations and streaming strategies.
Static vs Streaming Write Operations
The write modes behave differently between static writes and streaming (autoloader):

Static Write Operations:
overwrite: Replaces entire table/partition data
append: Adds new records without checking for duplicates
merge: Uses MERGE SQL operations for upserts/complex logic

Streaming Write Operations:
complete: Outputs entire result table each trigger (rare use case)
update: Not a standard streaming mode - you might be thinking of append
append: Default streaming mode - adds new records as they arrive

The key difference is that streaming operations are incremental and trigger-based, while static operations process the entire dataset at once.

Write Modes in Streaming Context

append: Most common for streaming. Processes new data incrementally
complete: Rewrites entire output each trigger - only works with aggregations that can be fully recomputed
update: This isn't a standard Spark streaming output mode. You might be confusing it with append or thinking of update operations within foreachBatch

Idempotency Strategies for Delta Lake
For idempotent writes to Delta Lake, you have two main approaches:

1. Direct writeStream with Deduplication

# Example with built-in deduplication
df.writeStream \
.option("checkpointLocation", checkpoint_path) \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.table("target_table")

2. foreachBatch with Custom Logic

def upsert_to_delta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")

microBatchDF._jdf.sparkSession().sql("""
MERGE INTO target_table t
USING updates u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

df.writeStream \
.foreachBatch(upsert_to_delta) \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.start()

Recommended Strategy for Idempotency
Use foreachBatch when you need:
- Complex merge logic
- Multiple target tables
- Custom deduplication logic
- Cross-batch deduplication

Use direct writeStream when you have:
- Simple append scenarios
- Natural deduplication keys
- Partitioned data that naturally avoids duplicates

 

LR

Thanks you @lingareddy_Alva it's clear now 

lingareddy_Alva
Honored Contributor II

Welcome @seefoods . 

LR

Sankha_Mondal
New Contributor II

While writing stream data :

spark.table('table_name').writeStream works fine.

important settings are required for that:

A. Checkpointing :

1. Stores the current status of the streaming job with combination of "Write ahead log".

2. These checkpoint locations are unique.

B. Output Mode:

1. outputMode('append') : Only new records are added. default setting.

2. outputMode('completed') : Recalculated each time a write is triggered. Used mostly for aggregated tables.

3. outputMode('update') : Allows to perform complex upserts in streaming data. requires .start() command too.

C. Trigger Interval:

1. trigger(): if unspecified then, processingTime='500ms". Automatically detect and process all data in the source which are added post last run.

2. trigger(processingTime='2 minutes"): processes available data in micro batch at user specified time.

3. trigger(once=True): process all available data in one go (single batch).

4. trigger(availableNow=True): process all available data in multiple batches then stops. good option as long running steams can be bypassed with one fault tolerance guarantee.

D. .awaitTermination(): This option blocks execution of next cell until the incremental batch write has succeeded.

seefoods
Contributor

Thanks @Sankha_Mondal 

chanukya-pekala
New Contributor III

Thanks for discussion. I have a tiny suggestion. Based on my experience working with streaming loads, I often find the checkpoint location hard enough to actually check the offset information or delete that directory for fresh load of data. Hence I have used TBLPROPERTIES ('checkpointLocation' = '/path/to/checkpoint') by altering the table. Any user will then be able to know where is checkpoint of this particular table. Databricks/Spark by principle do not want to tightly couple the checkpoint with the table for scalability purposes but having it in some reference to table makes more sense, Without having this critical piece of info, table user needs to reach out to developer only to get the location.

Chanukya

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now