<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Data Loss in Incremental Batch Jobs Due to Latency in delta file write to blob in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155244#M54218</link>
    <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;I am facing a data consistency issue in my Databricks incremental pipeline where records are being skipped because of a time gap between when a record is processed and when the physical file is finalized in Azure Blob Storage (ABFS).&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;Our Architecture:&lt;/STRONG&gt;&lt;/FONT&gt;&lt;BR /&gt;1. I&lt;STRONG&gt;ngestion&lt;/STRONG&gt;: Data is pulled from Kafka and written as&amp;nbsp; Delta Files at a blob path. During this write, we create a column called `loadts` using `current_timestamp()`.&lt;BR /&gt;2. &lt;STRONG&gt;Downstream ETL&lt;/STRONG&gt;: A scheduled batch job reads from these Delta files incrementally. It fetches the `max(loadts)` from a audit table and filters the source: `WHERE loadts &amp;gt;= last_max_date`.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;The Issue:&lt;/STRONG&gt;&lt;/FONT&gt;&lt;BR /&gt;We are seeing a significant gap (often 20+ minutes) between the `&lt;STRONG&gt;loadts&lt;/STRONG&gt;` inside the data and the system-level `&lt;STRONG&gt;_file_modified_at&lt;/STRONG&gt;` timestamp for some of the delta files.&lt;/P&gt;&lt;P&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;Example&lt;/STRONG&gt;&lt;/FONT&gt;: A record is assigned a `loadts` of 09:05:00(when the Spark executor processes it).&lt;BR /&gt;&lt;STRONG&gt;Latency&lt;/STRONG&gt;: Due to write volume and Delta transaction overhead, the file is not committed and visible in ABFS until 09:27:55.&lt;BR /&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;The Conflict:&lt;/STRONG&gt;&lt;/FONT&gt; If an incremental job runs at 09:15:00, it doesn't "see" that file yet. The job finishes, and the checkpoint moves forward. When the next job runs at 10:00:00, it searches for data where `loadts &amp;gt;= 09:15:00`. Consequently, the record born at 09:05:00 (but delivered at 09:27:55) is skipped forever.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;Current Workaround&lt;/STRONG&gt;:&lt;/FONT&gt;&lt;BR /&gt;We currently use a hardcoded 15-minute look-back window: `timestampadd(MINUTE, -15, last_max_date)`. However, this is brittle because processing times vary, and we cannot guarantee that a 15 or even 30-minute window will always cover the commit latency.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;Question:&lt;/FONT&gt;&lt;BR /&gt;What is the industry-standard way to handle this in Databricks without "guessing" a look-back window?&lt;/P&gt;&lt;P&gt;Any advice on how to make this pipeline bulletproof would be greatly appreciated.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Note: Using Serverless Performance optimized cluster, as the data need to be highly available&lt;/STRONG&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 22 Apr 2026 21:01:45 GMT</pubDate>
    <dc:creator>Avinash_Narala</dc:creator>
    <dc:date>2026-04-22T21:01:45Z</dc:date>
    <item>
      <title>Data Loss in Incremental Batch Jobs Due to Latency in delta file write to blob</title>
      <link>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155244#M54218</link>
      <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;I am facing a data consistency issue in my Databricks incremental pipeline where records are being skipped because of a time gap between when a record is processed and when the physical file is finalized in Azure Blob Storage (ABFS).&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;Our Architecture:&lt;/STRONG&gt;&lt;/FONT&gt;&lt;BR /&gt;1. I&lt;STRONG&gt;ngestion&lt;/STRONG&gt;: Data is pulled from Kafka and written as&amp;nbsp; Delta Files at a blob path. During this write, we create a column called `loadts` using `current_timestamp()`.&lt;BR /&gt;2. &lt;STRONG&gt;Downstream ETL&lt;/STRONG&gt;: A scheduled batch job reads from these Delta files incrementally. It fetches the `max(loadts)` from a audit table and filters the source: `WHERE loadts &amp;gt;= last_max_date`.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;The Issue:&lt;/STRONG&gt;&lt;/FONT&gt;&lt;BR /&gt;We are seeing a significant gap (often 20+ minutes) between the `&lt;STRONG&gt;loadts&lt;/STRONG&gt;` inside the data and the system-level `&lt;STRONG&gt;_file_modified_at&lt;/STRONG&gt;` timestamp for some of the delta files.&lt;/P&gt;&lt;P&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;Example&lt;/STRONG&gt;&lt;/FONT&gt;: A record is assigned a `loadts` of 09:05:00(when the Spark executor processes it).&lt;BR /&gt;&lt;STRONG&gt;Latency&lt;/STRONG&gt;: Due to write volume and Delta transaction overhead, the file is not committed and visible in ABFS until 09:27:55.&lt;BR /&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;The Conflict:&lt;/STRONG&gt;&lt;/FONT&gt; If an incremental job runs at 09:15:00, it doesn't "see" that file yet. The job finishes, and the checkpoint moves forward. When the next job runs at 10:00:00, it searches for data where `loadts &amp;gt;= 09:15:00`. Consequently, the record born at 09:05:00 (but delivered at 09:27:55) is skipped forever.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;&lt;STRONG&gt;Current Workaround&lt;/STRONG&gt;:&lt;/FONT&gt;&lt;BR /&gt;We currently use a hardcoded 15-minute look-back window: `timestampadd(MINUTE, -15, last_max_date)`. However, this is brittle because processing times vary, and we cannot guarantee that a 15 or even 30-minute window will always cover the commit latency.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;Question:&lt;/FONT&gt;&lt;BR /&gt;What is the industry-standard way to handle this in Databricks without "guessing" a look-back window?&lt;/P&gt;&lt;P&gt;Any advice on how to make this pipeline bulletproof would be greatly appreciated.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Note: Using Serverless Performance optimized cluster, as the data need to be highly available&lt;/STRONG&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 22 Apr 2026 21:01:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155244#M54218</guid>
      <dc:creator>Avinash_Narala</dc:creator>
      <dc:date>2026-04-22T21:01:45Z</dc:date>
    </item>
    <item>
      <title>Re: Data Loss in Incremental Batch Jobs Due to Latency in delta file write to blob</title>
      <link>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155579#M54278</link>
      <description>&lt;P&gt;You’re running into the classic “event-time watermark based on a value that’s known &lt;EM&gt;before&lt;/EM&gt; the data is actually committed” problem. The fix is to anchor incrementality on &lt;STRONG&gt;commit/offset semantics&lt;/STRONG&gt;, not on &lt;CODE&gt;loadts&lt;/CODE&gt; that’s computed inside the write tasks.&lt;/P&gt;
&lt;P&gt;Below are &lt;STRONG&gt;3 options&lt;/STRONG&gt; (no guessing look-back).&lt;/P&gt;
&lt;HR /&gt;
&lt;H3&gt;Option 1 – Make the downstream ETL a &lt;EM&gt;Structured Streaming&lt;/EM&gt; read from Delta (Trigger.Once / AvailableNow) &lt;STRONG&gt;[Recommended]&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;STRONG&gt;Idea:&lt;/STRONG&gt;&lt;BR /&gt;Treat your bronze Delta table as a &lt;STRONG&gt;streaming source&lt;/STRONG&gt; and let Structured Streaming track offsets in the Delta log, not your &lt;CODE&gt;loadts&lt;/CODE&gt;. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Pattern (Python-ish):&lt;/STRONG&gt;&lt;/P&gt;
&lt;PRE&gt;&lt;CODE class="language-python"&gt;df = (spark.readStream
          .format("delta")
          .table("bronze_kafka_delta"))

query = (df
   .transform(your_silver_logic)          # joins, filters, etc
   .writeStream
   .option("checkpointLocation", "...")   # on UC, use checkpoint on volume
   .trigger(availableNow=True)            # or Trigger.Once for batchy behavior
   .toTable("silver_table"))
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Key points:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;STRONG&gt;No &lt;CODE&gt;WHERE loadts &amp;gt;= last_max_date&lt;/CODE&gt; anymore&lt;/STRONG&gt;; the checkpoint tracks which Delta versions/files have been processed.&lt;/LI&gt;
&lt;LI&gt;You can still keep &lt;CODE&gt;loadts&lt;/CODE&gt; for lineage/audit, but it’s not the incremental anchor.&lt;/LI&gt;
&lt;LI&gt;Works well on &lt;STRONG&gt;Serverless performance-optimized&lt;/STRONG&gt; with low-ops semantics and batch-like scheduling (run-once jobs that use &lt;CODE&gt;availableNow&lt;/CODE&gt; / &lt;CODE&gt;Trigger.Once&lt;/CODE&gt;).&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;STRONG&gt;Pros:&lt;/STRONG&gt; Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.&lt;BR /&gt;&lt;STRONG&gt;Cons:&lt;/STRONG&gt; Requires refactoring your downstream job to Structured Streaming semantics.&lt;/P&gt;
&lt;HR /&gt;
&lt;H3&gt;Option 2 – Drive incrementality by &lt;STRONG&gt;Delta table version / commit timestamp&lt;/STRONG&gt; (batch but commit-aware)&lt;/H3&gt;
&lt;P&gt;If you must stay with pure batch SQL:&lt;/P&gt;
&lt;OL&gt;
&lt;LI&gt;Maintain an &lt;STRONG&gt;audit of last processed Delta version or commit timestamp&lt;/STRONG&gt; for the source table.&lt;/LI&gt;
&lt;LI&gt;On each run:
&lt;UL&gt;
&lt;LI&gt;Read changes &lt;STRONG&gt;since&lt;/STRONG&gt; that version using &lt;STRONG&gt;Change Data Feed&lt;/STRONG&gt; (&lt;CODE&gt;table_changes&lt;/CODE&gt;) or &lt;CODE&gt;VERSION AS OF&lt;/CODE&gt; ranges.&lt;/LI&gt;
&lt;LI&gt;After successful processing, update the stored “last_processed_version”.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;Example pattern with CDF:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE class="language-sql"&gt;-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';

-- In job:
SELECT * 
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);

-- After job succeeds:
UPDATE audit_table
SET last_processed_version = &amp;lt;current_max_version&amp;gt;
WHERE source_table = 'bronze_kafka_delta';
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Because &lt;STRONG&gt;Delta versions are assigned at commit time&lt;/STRONG&gt;, you never advance past data that hasn’t committed yet, regardless of &lt;CODE&gt;loadts&lt;/CODE&gt; or storage &lt;CODE&gt;_file_modified_at&lt;/CODE&gt; gaps.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Pros:&lt;/STRONG&gt; Still batch, fully deterministic, no time look-back guessing.&lt;BR /&gt;&lt;STRONG&gt;Cons:&lt;/STRONG&gt; Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.&lt;/P&gt;
&lt;HR /&gt;
&lt;H3&gt;Option 3 – Use a true &lt;STRONG&gt;ingestion/commit-time column&lt;/STRONG&gt; as the watermark (if you can change ingestion)&lt;/H3&gt;
&lt;P&gt;If you control the ingestion job, you can:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Use &lt;STRONG&gt;Auto Loader&lt;/STRONG&gt; or a structured streaming Kafka → Delta writer that adds an &lt;STRONG&gt;ingestion timestamp that reflects “commit-ish” time&lt;/STRONG&gt; (or at least ingestion-time on the writer, not at record creation); or&lt;/LI&gt;
&lt;LI&gt;Populate a &lt;CODE&gt;ingest_commit_ts&lt;/CODE&gt; based on &lt;STRONG&gt;Delta metadata&lt;/STRONG&gt; in a post-step (e.g., a small job that stamps new rows using commit info from the log).&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;Your downstream incremental then keys off &lt;STRONG&gt;&lt;CODE&gt;ingest_commit_ts&lt;/CODE&gt;&lt;/STRONG&gt;, not &lt;CODE&gt;loadts&lt;/CODE&gt;, e.g.:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE class="language-sql"&gt;WHERE ingest_commit_ts &amp;gt;= last_max_ingest_commit_ts
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;This makes the logical watermark much closer to when the data actually became visible, so even if &lt;CODE&gt;loadts&lt;/CODE&gt; is early, you still don’t skip data.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Pros:&lt;/STRONG&gt; Keeps a timestamp-based pattern if that’s required by some tooling.&lt;BR /&gt;&lt;STRONG&gt;Cons:&lt;/STRONG&gt; More custom work; still subtly more fragile than Options 1–2, and you must ensure &lt;CODE&gt;ingest_commit_ts&lt;/CODE&gt; is monotonic with commit.&lt;/P&gt;
&lt;HR /&gt;
&lt;H3&gt;Recommendation&lt;/H3&gt;
&lt;P&gt;Given your requirements (high availability, Serverless, and desire for a &lt;EM&gt;bulletproof&lt;/EM&gt; pattern), I strongly recommend:&lt;/P&gt;
&lt;BLOCKQUOTE&gt;
&lt;P&gt;&lt;STRONG&gt;Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using &lt;CODE&gt;Trigger.Once&lt;/CODE&gt; or &lt;CODE&gt;availableNow&lt;/CODE&gt; and relying on Delta log offsets instead of &lt;CODE&gt;loadts&lt;/CODE&gt;.&lt;/STRONG&gt;&lt;/P&gt;
&lt;/BLOCKQUOTE&gt;
&lt;P&gt;If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.&lt;/P&gt;</description>
      <pubDate>Mon, 27 Apr 2026 15:36:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155579#M54278</guid>
      <dc:creator>Lu_Wang_ENB_DBX</dc:creator>
      <dc:date>2026-04-27T15:36:42Z</dc:date>
    </item>
    <item>
      <title>Re: Data Loss in Incremental Batch Jobs Due to Latency in delta file write to blob</title>
      <link>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155617#M54282</link>
      <description>&lt;P&gt;You can handle it as below&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;STRONG&gt;Fix the Bronze Write&lt;/STRONG&gt; - The &lt;SPAN&gt;20+ minutes commit gap &lt;/SPAN&gt;suggests metadata contention or "Small File Issues" in the bronze delta tables. You can optimize tables manually or enable Optimized Write and Auto Optimize if feasible. This ensures the Spark driver balances data across executors to write larger more efficient Parquet files significantly reducing the number of entries the Delta Log needs to commit.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Streaming Jobs to avoid Look Back&lt;/STRONG&gt;: You can use streaming jobs to avoid maintaining Look Back &amp;amp; audit table as it automatically uses the checkpoint to find the unprocessed records from bronze&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Mon, 27 Apr 2026 18:26:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/data-loss-in-incremental-batch-jobs-due-to-latency-in-delta-file/m-p/155617#M54282</guid>
      <dc:creator>balajij8</dc:creator>
      <dc:date>2026-04-27T18:26:59Z</dc:date>
    </item>
  </channel>
</rss>

