04-22-2026 02:01 PM
Hi everyone,
I am facing a data consistency issue in my Databricks incremental pipeline where records are being skipped because of a time gap between when a record is processed and when the physical file is finalized in Azure Blob Storage (ABFS).
Our Architecture:
1. Ingestion: Data is pulled from Kafka and written as Delta Files at a blob path. During this write, we create a column called `loadts` using `current_timestamp()`.
2. Downstream ETL: A scheduled batch job reads from these Delta files incrementally. It fetches the `max(loadts)` from a audit table and filters the source: `WHERE loadts >= last_max_date`.
The Issue:
We are seeing a significant gap (often 20+ minutes) between the `loadts` inside the data and the system-level `_file_modified_at` timestamp for some of the delta files.
Example: A record is assigned a `loadts` of 09:05:00(when the Spark executor processes it).
Latency: Due to write volume and Delta transaction overhead, the file is not committed and visible in ABFS until 09:27:55.
The Conflict: If an incremental job runs at 09:15:00, it doesn't "see" that file yet. The job finishes, and the checkpoint moves forward. When the next job runs at 10:00:00, it searches for data where `loadts >= 09:15:00`. Consequently, the record born at 09:05:00 (but delivered at 09:27:55) is skipped forever.
Current Workaround:
We currently use a hardcoded 15-minute look-back window: `timestampadd(MINUTE, -15, last_max_date)`. However, this is brittle because processing times vary, and we cannot guarantee that a 15 or even 30-minute window will always cover the commit latency.
Question:
What is the industry-standard way to handle this in Databricks without "guessing" a look-back window?
Any advice on how to make this pipeline bulletproof would be greatly appreciated.
Note: Using Serverless Performance optimized cluster, as the data need to be highly available
a month ago
You’re running into the classic “event-time watermark based on a value that’s known before the data is actually committed” problem. The fix is to anchor incrementality on commit/offset semantics, not on loadts that’s computed inside the write tasks.
Below are 3 options (no guessing look-back).
Idea:
Treat your bronze Delta table as a streaming source and let Structured Streaming track offsets in the Delta log, not your loadts. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.
Pattern (Python-ish):
df = (spark.readStream
.format("delta")
.table("bronze_kafka_delta"))
query = (df
.transform(your_silver_logic) # joins, filters, etc
.writeStream
.option("checkpointLocation", "...") # on UC, use checkpoint on volume
.trigger(availableNow=True) # or Trigger.Once for batchy behavior
.toTable("silver_table"))
Key points:
WHERE loadts >= last_max_date anymore; the checkpoint tracks which Delta versions/files have been processed.loadts for lineage/audit, but it’s not the incremental anchor.availableNow / Trigger.Once).Pros: Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.
Cons: Requires refactoring your downstream job to Structured Streaming semantics.
If you must stay with pure batch SQL:
table_changes) or VERSION AS OF ranges.Example pattern with CDF:
-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';
-- In job:
SELECT *
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);
-- After job succeeds:
UPDATE audit_table
SET last_processed_version = <current_max_version>
WHERE source_table = 'bronze_kafka_delta';
Because Delta versions are assigned at commit time, you never advance past data that hasn’t committed yet, regardless of loadts or storage _file_modified_at gaps.
Pros: Still batch, fully deterministic, no time look-back guessing.
Cons: Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.
If you control the ingestion job, you can:
ingest_commit_ts based on Delta metadata in a post-step (e.g., a small job that stamps new rows using commit info from the log).Your downstream incremental then keys off ingest_commit_ts, not loadts, e.g.:
WHERE ingest_commit_ts >= last_max_ingest_commit_ts
This makes the logical watermark much closer to when the data actually became visible, so even if loadts is early, you still don’t skip data.
Pros: Keeps a timestamp-based pattern if that’s required by some tooling.
Cons: More custom work; still subtly more fragile than Options 1–2, and you must ensure ingest_commit_ts is monotonic with commit.
Given your requirements (high availability, Serverless, and desire for a bulletproof pattern), I strongly recommend:
Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using
Trigger.OnceoravailableNowand relying on Delta log offsets instead ofloadts.
If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.
a month ago
You’re running into the classic “event-time watermark based on a value that’s known before the data is actually committed” problem. The fix is to anchor incrementality on commit/offset semantics, not on loadts that’s computed inside the write tasks.
Below are 3 options (no guessing look-back).
Idea:
Treat your bronze Delta table as a streaming source and let Structured Streaming track offsets in the Delta log, not your loadts. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.
Pattern (Python-ish):
df = (spark.readStream
.format("delta")
.table("bronze_kafka_delta"))
query = (df
.transform(your_silver_logic) # joins, filters, etc
.writeStream
.option("checkpointLocation", "...") # on UC, use checkpoint on volume
.trigger(availableNow=True) # or Trigger.Once for batchy behavior
.toTable("silver_table"))
Key points:
WHERE loadts >= last_max_date anymore; the checkpoint tracks which Delta versions/files have been processed.loadts for lineage/audit, but it’s not the incremental anchor.availableNow / Trigger.Once).Pros: Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.
Cons: Requires refactoring your downstream job to Structured Streaming semantics.
If you must stay with pure batch SQL:
table_changes) or VERSION AS OF ranges.Example pattern with CDF:
-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';
-- In job:
SELECT *
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);
-- After job succeeds:
UPDATE audit_table
SET last_processed_version = <current_max_version>
WHERE source_table = 'bronze_kafka_delta';
Because Delta versions are assigned at commit time, you never advance past data that hasn’t committed yet, regardless of loadts or storage _file_modified_at gaps.
Pros: Still batch, fully deterministic, no time look-back guessing.
Cons: Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.
If you control the ingestion job, you can:
ingest_commit_ts based on Delta metadata in a post-step (e.g., a small job that stamps new rows using commit info from the log).Your downstream incremental then keys off ingest_commit_ts, not loadts, e.g.:
WHERE ingest_commit_ts >= last_max_ingest_commit_ts
This makes the logical watermark much closer to when the data actually became visible, so even if loadts is early, you still don’t skip data.
Pros: Keeps a timestamp-based pattern if that’s required by some tooling.
Cons: More custom work; still subtly more fragile than Options 1–2, and you must ensure ingest_commit_ts is monotonic with commit.
Given your requirements (high availability, Serverless, and desire for a bulletproof pattern), I strongly recommend:
Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using
Trigger.OnceoravailableNowand relying on Delta log offsets instead ofloadts.
If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.
a month ago
You can handle it as below