a week ago
Hello Guys,
I want to know if operations like overwrite, merge, update in static write its the same when we using autoloader. I'm so confusing about the behavior of mode like ( complete, update and append).
After that, i want to know what its the correct strategy to write the output data indempotency on our delta lake. I have seen on documenation databricks we can use foreachBatch(lambda functions) or we can writeStream directly.
Cordially,
a week ago
a week ago - last edited a week ago
While writing stream data :
spark.table('table_name').writeStream works fine.
important settings are required for that:
A. Checkpointing :
1. Stores the current status of the streaming job with combination of "Write ahead log".
2. These checkpoint locations are unique.
B. Output Mode:
1. outputMode('append') : Only new records are added. default setting.
2. outputMode('completed') : Recalculated each time a write is triggered. Used mostly for aggregated tables.
3. outputMode('update') : Allows to perform complex upserts in streaming data. requires .start() command too.
C. Trigger Interval:
1. trigger(): if unspecified then, processingTime='500ms". Automatically detect and process all data in the source which are added post last run.
2. trigger(processingTime='2 minutes"): processes available data in micro batch at user specified time.
3. trigger(once=True): process all available data in one go (single batch).
4. trigger(availableNow=True): process all available data in multiple batches then stops. good option as long running steams can be bypassed with one fault tolerance guarantee.
D. .awaitTermination(): This option blocks execution of next cell until the incremental batch write has succeeded.
Wednesday
Wednesday
Thanks for discussion. I have a tiny suggestion. Based on my experience working with streaming loads, I often find the checkpoint location hard enough to actually check the offset information or delete that directory for fresh load of data. Hence I have used TBLPROPERTIES ('checkpointLocation' = '/path/to/checkpoint') by altering the table. Any user will then be able to know where is checkpoint of this particular table. Databricks/Spark by principle do not want to tightly couple the checkpoint with the table for scalability purposes but having it in some reference to table makes more sense, Without having this critical piece of info, table user needs to reach out to developer only to get the location.
a week ago
Hi @seefoods
Let me break down your questions about Delta Lake write operations and streaming strategies.
Static vs Streaming Write Operations
The write modes behave differently between static writes and streaming (autoloader):
Static Write Operations:
overwrite: Replaces entire table/partition data
append: Adds new records without checking for duplicates
merge: Uses MERGE SQL operations for upserts/complex logic
Streaming Write Operations:
complete: Outputs entire result table each trigger (rare use case)
update: Not a standard streaming mode - you might be thinking of append
append: Default streaming mode - adds new records as they arrive
The key difference is that streaming operations are incremental and trigger-based, while static operations process the entire dataset at once.
Write Modes in Streaming Context
append: Most common for streaming. Processes new data incrementally
complete: Rewrites entire output each trigger - only works with aggregations that can be fully recomputed
update: This isn't a standard Spark streaming output mode. You might be confusing it with append or thinking of update operations within foreachBatch
Idempotency Strategies for Delta Lake
For idempotent writes to Delta Lake, you have two main approaches:
1. Direct writeStream with Deduplication
# Example with built-in deduplication
df.writeStream \
.option("checkpointLocation", checkpoint_path) \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.table("target_table")
2. foreachBatch with Custom Logic
def upsert_to_delta(microBatchDF, batchId):
microBatchDF.createOrReplaceTempView("updates")
microBatchDF._jdf.sparkSession().sql("""
MERGE INTO target_table t
USING updates u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
df.writeStream \
.foreachBatch(upsert_to_delta) \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.start()
Recommended Strategy for Idempotency
Use foreachBatch when you need:
- Complex merge logic
- Multiple target tables
- Custom deduplication logic
- Cross-batch deduplication
Use direct writeStream when you have:
- Simple append scenarios
- Natural deduplication keys
- Partitioned data that naturally avoids duplicates
a week ago
Thanks you @lingareddy_Alva it's clear now
a week ago
a week ago - last edited a week ago
While writing stream data :
spark.table('table_name').writeStream works fine.
important settings are required for that:
A. Checkpointing :
1. Stores the current status of the streaming job with combination of "Write ahead log".
2. These checkpoint locations are unique.
B. Output Mode:
1. outputMode('append') : Only new records are added. default setting.
2. outputMode('completed') : Recalculated each time a write is triggered. Used mostly for aggregated tables.
3. outputMode('update') : Allows to perform complex upserts in streaming data. requires .start() command too.
C. Trigger Interval:
1. trigger(): if unspecified then, processingTime='500ms". Automatically detect and process all data in the source which are added post last run.
2. trigger(processingTime='2 minutes"): processes available data in micro batch at user specified time.
3. trigger(once=True): process all available data in one go (single batch).
4. trigger(availableNow=True): process all available data in multiple batches then stops. good option as long running steams can be bypassed with one fault tolerance guarantee.
D. .awaitTermination(): This option blocks execution of next cell until the incremental batch write has succeeded.
Wednesday
Wednesday
Thanks for discussion. I have a tiny suggestion. Based on my experience working with streaming loads, I often find the checkpoint location hard enough to actually check the offset information or delete that directory for fresh load of data. Hence I have used TBLPROPERTIES ('checkpointLocation' = '/path/to/checkpoint') by altering the table. Any user will then be able to know where is checkpoint of this particular table. Databricks/Spark by principle do not want to tightly couple the checkpoint with the table for scalability purposes but having it in some reference to table makes more sense, Without having this critical piece of info, table user needs to reach out to developer only to get the location.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now