Streaming Table data leakage to historical permanent table

Akash_Varuna
New Contributor II

Data Leakage in Historical Table from Streaming Table

Environment

  • Platform: Azure Databricks + Azure Event Hubs
  • Streaming Framework: Spark Structured Streaming
  • Storage: Delta Lake

Pipeline

 
 
Event Hubs → stream_messages (live 24hr rolling window) → messages

The load_messages notebook reads from stream_messages as a static batch inside foreachBatch, manually tracking its position using technical_sequenceNumber per technical_partition, stored in last_seq_nums_per_partition. It applies an EXPIRING_TIME filter of 30 minutes where now is derived from max(context_updatedAt) of the current micro-batch:

 
 
python
now = microbatch_df.agg(F.max('context_updatedAt')).collect()[0][0]

delayed_stream_increment = (
    spark.read.table(input_stream_table_name)
    .where(
        reduce(
            lambda x, y: x | y,
            [
                (F.col('technical_partition') == partition) & (F.col('technical_sequenceNumber') > last_record_number)
                for partition, last_record_number in last_seq_nums_per_partition.items()
            ]
        )
    )
    .where(F.col('context_updatedAt') < now - EXPIRING_TIME)
)

The Problem

There is a scheduled maintenance window for optimizing the messages table. During this window the load_messages job is paused. When comparing counts between stream_messages and messages for the affected dates, we see discrepancies for some days and some days there is no discrepancies 

Example for today (2026-02-17), stream_messages and messages have the same count — confirming the leakage is tied specifically to the optimization/maintenance window dates.

I have looked and am not sure is it because of the Optimizing Task which causes this leakage because the micro batch is dropped?

Please if you have any idea on alternatives to avoid this please do let me know and the possible reason for this in current set up.

Thanks