4 weeks ago
Hi Databricks Community,I'm encountering an issue with watermarks in Delta Live Tables that's causing data loss in my streaming pipeline. Let me explain my specific problem:
I've implemented watermarks for stateful processing in my Delta Live Tables pipeline. However, I'm observing that some records are being dropped, as evidenced by the monitoring graphs I've attached. The graphs show:
I understand that watermarks are essential for managing state and preventing out-of-memory issues, but I need to ensure data completeness for my use case. Any guidance on best practices or alternative approaches would be greatly appreciated.
3 weeks ago
Short Answer; This requires custom design implementation:
APPLY CHANGES
(or a merge-based approach) into your main table. Use a unique key and the DEDUPLICATE ON
clause to avoid duplicates.Details (Example):
Capturing Dropped Records
CREATE STREAMING LIVE TABLE late_events
AS SELECT *
FROM STREAM(live.source_data)
-- Optionally set a larger watermark or omit it here
Reprocessing & Updating the Final Table
APPLY CHANGES INTO live.final_table
FROM STREAM(live.late_events)
KEYS (id)
SEQUENCE BY event_time
DEDUPLICATE ON (id)
WHEN NOT MATCHED THEN INSERT *
WHEN MATCHED THEN UPDATE SET *
id
) plus DEDUPLICATE ON
to avoid duplicates when reprocessing late arrivals.Auto-Triggering Updates
Efficiency Tips
By splitting your pipeline into “on-time processing” (with a reasonable watermark) and “late-arrivals handling” (with no/loose watermark), you ensure minimal data loss and an automated path to reconcile late records without duplications.
But... Having said that, simply extending the watermark might still be the simplest solution—provided the use case can handle the increased state and potential higher latency.
Questions and considerations you should consider:
These questions help decide if adjusting the watermark alone addresses your needs or if you truly need a more involved multi-pipeline approach.
Hope this is helpful. Please remember to follow common best practices and test it in a development environment before deploying it in production; this is not a trivial change.
4 weeks ago
The watermark threshold determines how late data can be before it is considered too late and dropped. A smaller threshold results in lower latency but increases the likelihood of dropping late records. Conversely, a larger threshold reduces data loss but may increase latency and require more resources
Continuously monitor your streaming pipeline to understand the patterns of late data and adjust the watermark threshold accordingly. This can help balance between latency and data completeness.
To capture records that are dropped due to being late, you can use the withEventTimeOrder
option. This ensures that the initial snapshot is processed in event time order, reducing the likelihood of records being dropped as late events.
.option("withEventTimeOrder", "true")
You can implement a recovery mechanism by storing the dropped records in a separate Delta table for later processing. This can be achieved by configuring your streaming query to write late records to a different sink.
3 weeks ago
Dear @Walter_C, thank you for your detailed response regarding watermark handling in Delta Live Tables (DLT). I appreciate the guidance provided, but I would like further clarification on a couple of points related to our use case.
We are currently using the APPLY CHANGES API in Delta Live Tables for our pipeline. Is there a built-in mechanism or recommended approach to automatically save records that are dropped due to watermark thresholds? Specifically, we are looking for:
If this is not natively supported, could you recommend an alternative approach to achieve this while maintaining pipeline efficiency?
Once the dropped records are saved, we aim to reprocess them and update the final table automatically. Could you provide guidance on:
We would greatly appreciate any additional insights or references to best practices for managing late-arriving data in DLT pipelines.Looking forward to your response!
3 weeks ago
I hope this topic finds you well. I'm following up on the technical issues I submitted last week that remains unresolved. This issue has become our team's most significant blocker, preventing us from progressing with our project implementation.
We greatly value your expertise and would deeply appreciate your guidance on this matter. Given the critical nature of this blocker, any insights you could provide would be immensely helpful in moving our project forward.
Please let me know if you need any additional information from our side to assist with the resolution.
Thank you for your time and attention to this matter.
Best regards.
3 weeks ago
Short Answer; This requires custom design implementation:
APPLY CHANGES
(or a merge-based approach) into your main table. Use a unique key and the DEDUPLICATE ON
clause to avoid duplicates.Details (Example):
Capturing Dropped Records
CREATE STREAMING LIVE TABLE late_events
AS SELECT *
FROM STREAM(live.source_data)
-- Optionally set a larger watermark or omit it here
Reprocessing & Updating the Final Table
APPLY CHANGES INTO live.final_table
FROM STREAM(live.late_events)
KEYS (id)
SEQUENCE BY event_time
DEDUPLICATE ON (id)
WHEN NOT MATCHED THEN INSERT *
WHEN MATCHED THEN UPDATE SET *
id
) plus DEDUPLICATE ON
to avoid duplicates when reprocessing late arrivals.Auto-Triggering Updates
Efficiency Tips
By splitting your pipeline into “on-time processing” (with a reasonable watermark) and “late-arrivals handling” (with no/loose watermark), you ensure minimal data loss and an automated path to reconcile late records without duplications.
But... Having said that, simply extending the watermark might still be the simplest solution—provided the use case can handle the increased state and potential higher latency.
Questions and considerations you should consider:
These questions help decide if adjusting the watermark alone addresses your needs or if you truly need a more involved multi-pipeline approach.
Hope this is helpful. Please remember to follow common best practices and test it in a development environment before deploying it in production; this is not a trivial change.
3 weeks ago
I wanted to take a moment to express my sincere gratitude for your incredibly detailed explanation and thoughtful suggestions. Your guidance has been immensely valuable and has provided us with a clear path forward in addressing the challenge with late-arriving records in DLT.
We’ve decided to adopt the approach you recommended, including implementing a secondary pipeline to handle late events. The clarity and depth of your explanation have given us great confidence in the solution, and we truly appreciate the time and effort you’ve put into sharing your expertise.
Once again, thank you so much for your invaluable support.
Best regards,
Hung.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group