cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Data Loss in Incremental Batch Jobs Due to Latency in delta file write to blob

Avinash_Narala
Databricks Partner

Hi everyone,

I am facing a data consistency issue in my Databricks incremental pipeline where records are being skipped because of a time gap between when a record is processed and when the physical file is finalized in Azure Blob Storage (ABFS).

Our Architecture:
1. Ingestion: Data is pulled from Kafka and written as  Delta Files at a blob path. During this write, we create a column called `loadts` using `current_timestamp()`.
2. Downstream ETL: A scheduled batch job reads from these Delta files incrementally. It fetches the `max(loadts)` from a audit table and filters the source: `WHERE loadts >= last_max_date`.

The Issue:
We are seeing a significant gap (often 20+ minutes) between the `loadts` inside the data and the system-level `_file_modified_at` timestamp for some of the delta files.

Example: A record is assigned a `loadts` of 09:05:00(when the Spark executor processes it).
Latency: Due to write volume and Delta transaction overhead, the file is not committed and visible in ABFS until 09:27:55.
The Conflict: If an incremental job runs at 09:15:00, it doesn't "see" that file yet. The job finishes, and the checkpoint moves forward. When the next job runs at 10:00:00, it searches for data where `loadts >= 09:15:00`. Consequently, the record born at 09:05:00 (but delivered at 09:27:55) is skipped forever.

Current Workaround:
We currently use a hardcoded 15-minute look-back window: `timestampadd(MINUTE, -15, last_max_date)`. However, this is brittle because processing times vary, and we cannot guarantee that a 15 or even 30-minute window will always cover the commit latency.

Question:
What is the industry-standard way to handle this in Databricks without "guessing" a look-back window?

Any advice on how to make this pipeline bulletproof would be greatly appreciated.

Note: Using Serverless Performance optimized cluster, as the data need to be highly available

2 REPLIES 2

Lu_Wang_ENB_DBX
Databricks Employee
Databricks Employee

You’re running into the classic “event-time watermark based on a value that’s known before the data is actually committed” problem. The fix is to anchor incrementality on commit/offset semantics, not on loadts that’s computed inside the write tasks.

Below are 3 options (no guessing look-back).


Option 1 – Make the downstream ETL a Structured Streaming read from Delta (Trigger.Once / AvailableNow) [Recommended]

Idea:
Treat your bronze Delta table as a streaming source and let Structured Streaming track offsets in the Delta log, not your loadts. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.

Pattern (Python-ish):

df = (spark.readStream
          .format("delta")
          .table("bronze_kafka_delta"))

query = (df
   .transform(your_silver_logic)          # joins, filters, etc
   .writeStream
   .option("checkpointLocation", "...")   # on UC, use checkpoint on volume
   .trigger(availableNow=True)            # or Trigger.Once for batchy behavior
   .toTable("silver_table"))

Key points:

  • No WHERE loadts >= last_max_date anymore; the checkpoint tracks which Delta versions/files have been processed.
  • You can still keep loadts for lineage/audit, but it’s not the incremental anchor.
  • Works well on Serverless performance-optimized with low-ops semantics and batch-like scheduling (run-once jobs that use availableNow / Trigger.Once).

Pros: Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.
Cons: Requires refactoring your downstream job to Structured Streaming semantics.


Option 2 – Drive incrementality by Delta table version / commit timestamp (batch but commit-aware)

If you must stay with pure batch SQL:

  1. Maintain an audit of last processed Delta version or commit timestamp for the source table.
  2. On each run:
    • Read changes since that version using Change Data Feed (table_changes) or VERSION AS OF ranges.
    • After successful processing, update the stored “last_processed_version”.

Example pattern with CDF:

-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';

-- In job:
SELECT * 
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);

-- After job succeeds:
UPDATE audit_table
SET last_processed_version = <current_max_version>
WHERE source_table = 'bronze_kafka_delta';

Because Delta versions are assigned at commit time, you never advance past data that hasn’t committed yet, regardless of loadts or storage _file_modified_at gaps.

Pros: Still batch, fully deterministic, no time look-back guessing.
Cons: Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.


Option 3 – Use a true ingestion/commit-time column as the watermark (if you can change ingestion)

If you control the ingestion job, you can:

  • Use Auto Loader or a structured streaming Kafka → Delta writer that adds an ingestion timestamp that reflects “commit-ish” time (or at least ingestion-time on the writer, not at record creation); or
  • Populate a ingest_commit_ts based on Delta metadata in a post-step (e.g., a small job that stamps new rows using commit info from the log).

Your downstream incremental then keys off ingest_commit_ts, not loadts, e.g.:

WHERE ingest_commit_ts >= last_max_ingest_commit_ts

This makes the logical watermark much closer to when the data actually became visible, so even if loadts is early, you still don’t skip data.

Pros: Keeps a timestamp-based pattern if that’s required by some tooling.
Cons: More custom work; still subtly more fragile than Options 1–2, and you must ensure ingest_commit_ts is monotonic with commit.


Recommendation

Given your requirements (high availability, Serverless, and desire for a bulletproof pattern), I strongly recommend:

Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using Trigger.Once or availableNow and relying on Delta log offsets instead of loadts.

If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.

balajij8
Contributor III

You can handle it as below

  • Fix the Bronze Write - The 20+ minutes commit gap suggests metadata contention or "Small File Issues" in the bronze delta tables. You can optimize tables manually or enable Optimized Write and Auto Optimize if feasible. This ensures the Spark driver balances data across executors to write larger more efficient Parquet files significantly reducing the number of entries the Delta Log needs to commit.
  • Streaming Jobs to avoid Look Back: You can use streaming jobs to avoid maintaining Look Back & audit table as it automatically uses the checkpoint to find the unprocessed records from bronze