cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Handling Dropped Records in Delta Live Tables with Watermark - Need Optimization Strategy

minhhung0507
New Contributor III

Hi Databricks Community,I'm encountering an issue with watermarks in Delta Live Tables that's causing data loss in my streaming pipeline. Let me explain my specific problem:

Current Situation

I've implemented watermarks for stateful processing in my Delta Live Tables pipeline. However, I'm observing that some records are being dropped, as evidenced by the monitoring graphs I've attached. The graphs show:

  • Spikes in the number of late rows dropped by watermark
  • One graph shows a peak of around 20 records dropped
  • Another graph displays multiple peaks with up to 250 records being dropped

Questions

  1. What are the recommended strategies to optimize watermark configuration to minimize data loss while maintaining processing efficiency?
  2. Is there a way to:
    • Capture these dropped records for later processing?
    • Implement a recovery mechanism to reprocess these dropped records?
    • Update the final table with these recovered records without causing duplicates or inconsistencies?

Additional Context

I understand that watermarks are essential for managing state and preventing out-of-memory issues, but I need to ensure data completeness for my use case. Any guidance on best practices or alternative approaches would be greatly appreciated.

1 ACCEPTED SOLUTION

Accepted Solutions

VZLA
Databricks Employee
Databricks Employee

@minhhung0507 

Short Answer; This requires custom design implementation:

  1. DLT does not natively โ€œauto-saveโ€ records dropped by watermark. You must configure a secondary output or pipeline to capture potentially late records (e.g., ingest the same source with a more permissive watermark or no watermark at all).
  2. Once saved, reprocess those late records with APPLY CHANGES (or a merge-based approach) into your main table. Use a unique key and the DEDUPLICATE ON clause to avoid duplicates.

Details (Example):

  1. Capturing Dropped Records

    • Separate โ€œLate Eventsโ€ Table: Create a second DLT table reading from the same source but with a more relaxed watermark (or no watermark) to capture events that arrive after the main pipelineโ€™s cutoff. This effectively quarantines all potential late events.
    • Example:
      CREATE STREAMING LIVE TABLE late_events
      AS SELECT * 
      FROM STREAM(live.source_data)
      -- Optionally set a larger watermark or omit it here
       
    • Why: DLT itself does not offer a built-in โ€œauto-save dropped recordsโ€ feature, so this extra DLT table prevents data loss and isolates potentially late data.
  2. Reprocessing & Updating the Final Table

    • Use APPLY CHANGES: Point it to both your main input table and the late events tableโ€”either in the same pipeline or in a separate โ€œcleanupโ€ pipeline.
    • Example:
      APPLY CHANGES INTO live.final_table
        FROM STREAM(live.late_events)
        KEYS (id)
        SEQUENCE BY event_time
        DEDUPLICATE ON (id)
        WHEN NOT MATCHED THEN INSERT *
        WHEN MATCHED THEN UPDATE SET *
       
    • Deduplication: Rely on a unique key (id) plus DEDUPLICATE ON to avoid duplicates when reprocessing late arrivals.
  3. Auto-Triggering Updates

    • Incremental Pipeline: Schedule a job to process the quarantined late_events table on a periodic basis. This way, any new late arrivals automatically flow in.
    • Best Practice: Keep your main pipelineโ€™s watermark at a balanced threshold for normal loads. Let the โ€œlate eventsโ€ pipeline handle stragglers in batches to avoid ballooning resource usage.
  4. Efficiency Tips

    • Monitoring: Track late arrivals in the โ€œlate_eventsโ€ table. If volumes are consistently high, you may need to adjust your main pipelineโ€™s watermark, so this is a great benefit.
    • Resource Usage: Keep your โ€œlate eventsโ€ pipeline idle between scheduled runs if possible, minimizing overhead and costs.

By splitting your pipeline into โ€œon-time processingโ€ (with a reasonable watermark) and โ€œlate-arrivals handlingโ€ (with no/loose watermark), you ensure minimal data loss and an automated path to reconcile late records without duplications.

But... Having said that, simply extending the watermark might still be the simplest solutionโ€”provided the use case can handle the increased state and potential higher latency.

Questions and considerations you should consider: 

  • Frequency of Late Events: How common are these late arrivals? If theyโ€™re rare, slightly extending the watermark might be enough.
  • Resource Constraints: Can your cluster handle the extra memory overhead of a larger watermark window?
  • Acceptable Latency: If you extend the watermark, are you prepared for a longer overall processing delay?
  • Downstream Impact: Will delaying your final table updates to accommodate out-of-order data create downstream bottlenecks?
  • Business Requirements: Do you actually need 100% completeness in near real-time, or can later arrived records be reconciled less frequently?

These questions help decide if adjusting the watermark alone addresses your needs or if you truly need a more involved multi-pipeline approach.

Hope this is helpful. Please remember to follow common best practices and test it in a development environment before deploying it in production; this is not a trivial change.

View solution in original post

5 REPLIES 5

Walter_C
Databricks Employee
Databricks Employee

The watermark threshold determines how late data can be before it is considered too late and dropped. A smaller threshold results in lower latency but increases the likelihood of dropping late records. Conversely, a larger threshold reduces data loss but may increase latency and require more resources

Continuously monitor your streaming pipeline to understand the patterns of late data and adjust the watermark threshold accordingly. This can help balance between latency and data completeness.

To capture records that are dropped due to being late, you can use the withEventTimeOrder option. This ensures that the initial snapshot is processed in event time order, reducing the likelihood of records being dropped as late events.

 

 

.option("withEventTimeOrder", "true")

You can implement a recovery mechanism by storing the dropped records in a separate Delta table for later processing. This can be achieved by configuring your streaming query to write late records to a different sink.

 

minhhung0507
New Contributor III

 

Dear @Walter_C, thank you for your detailed response regarding watermark handling in Delta Live Tables (DLT). I appreciate the guidance provided, but I would like further clarification on a couple of points related to our use case.

1. Auto-Saving Dropped Records Due to Watermark

We are currently using the APPLY CHANGES API in Delta Live Tables for our pipeline. Is there a built-in mechanism or recommended approach to automatically save records that are dropped due to watermark thresholds? Specifically, we are looking for:

  • Step-by-step guidelines or documentation on how to implement this functionality.
  • Code examples or configurations that demonstrate how to capture these late records into a separate Delta table for later processing.

If this is not natively supported, could you recommend an alternative approach to achieve this while maintaining pipeline efficiency?

2. Auto-Triggering Updates for Dropped Records

Once the dropped records are saved, we aim to reprocess them and update the final table automatically. Could you provide guidance on:

  • How to design a recovery mechanism that triggers updates for these saved records without manual intervention?
  • The best practices to ensure these updates do not introduce duplicates or inconsistencies in the final table.
  • Any optimizations we can apply to minimize resource usage and processing time during this recovery process.

We would greatly appreciate any additional insights or references to best practices for managing late-arriving data in DLT pipelines.Looking forward to your response!

minhhung0507
New Contributor III

Dear @Walter_C@VZLA ,

I hope this topic finds you well. I'm following up on the technical issues I submitted last week that remains unresolved. This issue has become our team's most significant blocker, preventing us from progressing with our project implementation.

We greatly value your expertise and would deeply appreciate your guidance on this matter. Given the critical nature of this blocker, any insights you could provide would be immensely helpful in moving our project forward.

Please let me know if you need any additional information from our side to assist with the resolution.

Thank you for your time and attention to this matter.

Best regards.

VZLA
Databricks Employee
Databricks Employee

@minhhung0507 

Short Answer; This requires custom design implementation:

  1. DLT does not natively โ€œauto-saveโ€ records dropped by watermark. You must configure a secondary output or pipeline to capture potentially late records (e.g., ingest the same source with a more permissive watermark or no watermark at all).
  2. Once saved, reprocess those late records with APPLY CHANGES (or a merge-based approach) into your main table. Use a unique key and the DEDUPLICATE ON clause to avoid duplicates.

Details (Example):

  1. Capturing Dropped Records

    • Separate โ€œLate Eventsโ€ Table: Create a second DLT table reading from the same source but with a more relaxed watermark (or no watermark) to capture events that arrive after the main pipelineโ€™s cutoff. This effectively quarantines all potential late events.
    • Example:
      CREATE STREAMING LIVE TABLE late_events
      AS SELECT * 
      FROM STREAM(live.source_data)
      -- Optionally set a larger watermark or omit it here
       
    • Why: DLT itself does not offer a built-in โ€œauto-save dropped recordsโ€ feature, so this extra DLT table prevents data loss and isolates potentially late data.
  2. Reprocessing & Updating the Final Table

    • Use APPLY CHANGES: Point it to both your main input table and the late events tableโ€”either in the same pipeline or in a separate โ€œcleanupโ€ pipeline.
    • Example:
      APPLY CHANGES INTO live.final_table
        FROM STREAM(live.late_events)
        KEYS (id)
        SEQUENCE BY event_time
        DEDUPLICATE ON (id)
        WHEN NOT MATCHED THEN INSERT *
        WHEN MATCHED THEN UPDATE SET *
       
    • Deduplication: Rely on a unique key (id) plus DEDUPLICATE ON to avoid duplicates when reprocessing late arrivals.
  3. Auto-Triggering Updates

    • Incremental Pipeline: Schedule a job to process the quarantined late_events table on a periodic basis. This way, any new late arrivals automatically flow in.
    • Best Practice: Keep your main pipelineโ€™s watermark at a balanced threshold for normal loads. Let the โ€œlate eventsโ€ pipeline handle stragglers in batches to avoid ballooning resource usage.
  4. Efficiency Tips

    • Monitoring: Track late arrivals in the โ€œlate_eventsโ€ table. If volumes are consistently high, you may need to adjust your main pipelineโ€™s watermark, so this is a great benefit.
    • Resource Usage: Keep your โ€œlate eventsโ€ pipeline idle between scheduled runs if possible, minimizing overhead and costs.

By splitting your pipeline into โ€œon-time processingโ€ (with a reasonable watermark) and โ€œlate-arrivals handlingโ€ (with no/loose watermark), you ensure minimal data loss and an automated path to reconcile late records without duplications.

But... Having said that, simply extending the watermark might still be the simplest solutionโ€”provided the use case can handle the increased state and potential higher latency.

Questions and considerations you should consider: 

  • Frequency of Late Events: How common are these late arrivals? If theyโ€™re rare, slightly extending the watermark might be enough.
  • Resource Constraints: Can your cluster handle the extra memory overhead of a larger watermark window?
  • Acceptable Latency: If you extend the watermark, are you prepared for a longer overall processing delay?
  • Downstream Impact: Will delaying your final table updates to accommodate out-of-order data create downstream bottlenecks?
  • Business Requirements: Do you actually need 100% completeness in near real-time, or can later arrived records be reconciled less frequently?

These questions help decide if adjusting the watermark alone addresses your needs or if you truly need a more involved multi-pipeline approach.

Hope this is helpful. Please remember to follow common best practices and test it in a development environment before deploying it in production; this is not a trivial change.

minhhung0507
New Contributor III

 

Dear @VZLA@Walter_C ,

I wanted to take a moment to express my sincere gratitude for your incredibly detailed explanation and thoughtful suggestions. Your guidance has been immensely valuable and has provided us with a clear path forward in addressing the challenge with late-arriving records in DLT.

Weโ€™ve decided to adopt the approach you recommended, including implementing a secondary pipeline to handle late events. The clarity and depth of your explanation have given us great confidence in the solution, and we truly appreciate the time and effort youโ€™ve put into sharing your expertise.

Once again, thank you so much for your invaluable support.

Best regards,
Hung.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group