You’re running into the classic “event-time watermark based on a value that’s known before the data is actually committed” problem. The fix is to anchor incrementality on commit/offset semantics, not on loadts that’s computed inside the write tasks.
Below are 3 options (no guessing look-back).
Option 1 – Make the downstream ETL a Structured Streaming read from Delta (Trigger.Once / AvailableNow) [Recommended]
Idea:
Treat your bronze Delta table as a streaming source and let Structured Streaming track offsets in the Delta log, not your loadts. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.
Pattern (Python-ish):
df = (spark.readStream
.format("delta")
.table("bronze_kafka_delta"))
query = (df
.transform(your_silver_logic) # joins, filters, etc
.writeStream
.option("checkpointLocation", "...") # on UC, use checkpoint on volume
.trigger(availableNow=True) # or Trigger.Once for batchy behavior
.toTable("silver_table"))
Key points:
- No
WHERE loadts >= last_max_date anymore; the checkpoint tracks which Delta versions/files have been processed.
- You can still keep
loadts for lineage/audit, but it’s not the incremental anchor.
- Works well on Serverless performance-optimized with low-ops semantics and batch-like scheduling (run-once jobs that use
availableNow / Trigger.Once).
Pros: Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.
Cons: Requires refactoring your downstream job to Structured Streaming semantics.
Option 2 – Drive incrementality by Delta table version / commit timestamp (batch but commit-aware)
If you must stay with pure batch SQL:
- Maintain an audit of last processed Delta version or commit timestamp for the source table.
- On each run:
- Read changes since that version using Change Data Feed (
table_changes) or VERSION AS OF ranges.
- After successful processing, update the stored “last_processed_version”.
Example pattern with CDF:
-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';
-- In job:
SELECT *
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);
-- After job succeeds:
UPDATE audit_table
SET last_processed_version = <current_max_version>
WHERE source_table = 'bronze_kafka_delta';
Because Delta versions are assigned at commit time, you never advance past data that hasn’t committed yet, regardless of loadts or storage _file_modified_at gaps.
Pros: Still batch, fully deterministic, no time look-back guessing.
Cons: Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.
Option 3 – Use a true ingestion/commit-time column as the watermark (if you can change ingestion)
If you control the ingestion job, you can:
- Use Auto Loader or a structured streaming Kafka → Delta writer that adds an ingestion timestamp that reflects “commit-ish” time (or at least ingestion-time on the writer, not at record creation); or
- Populate a
ingest_commit_ts based on Delta metadata in a post-step (e.g., a small job that stamps new rows using commit info from the log).
Your downstream incremental then keys off ingest_commit_ts, not loadts, e.g.:
WHERE ingest_commit_ts >= last_max_ingest_commit_ts
This makes the logical watermark much closer to when the data actually became visible, so even if loadts is early, you still don’t skip data.
Pros: Keeps a timestamp-based pattern if that’s required by some tooling.
Cons: More custom work; still subtly more fragile than Options 1–2, and you must ensure ingest_commit_ts is monotonic with commit.
Recommendation
Given your requirements (high availability, Serverless, and desire for a bulletproof pattern), I strongly recommend:
Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using Trigger.Once or availableNow and relying on Delta log offsets instead of loadts.
If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.