Lu_Wang_ENB_DBX
Databricks Employee
Databricks Employee

You’re running into the classic “event-time watermark based on a value that’s known before the data is actually committed” problem. The fix is to anchor incrementality on commit/offset semantics, not on loadts that’s computed inside the write tasks.

Below are 3 options (no guessing look-back).


Option 1 – Make the downstream ETL a Structured Streaming read from Delta (Trigger.Once / AvailableNow) [Recommended]

Idea:
Treat your bronze Delta table as a streaming source and let Structured Streaming track offsets in the Delta log, not your loadts. The engine only advances its checkpoint after a commit is visible, so you never skip files regardless of how long they took to finalize.

Pattern (Python-ish):

df = (spark.readStream
          .format("delta")
          .table("bronze_kafka_delta"))

query = (df
   .transform(your_silver_logic)          # joins, filters, etc
   .writeStream
   .option("checkpointLocation", "...")   # on UC, use checkpoint on volume
   .trigger(availableNow=True)            # or Trigger.Once for batchy behavior
   .toTable("silver_table"))

Key points:

  • No WHERE loadts >= last_max_date anymore; the checkpoint tracks which Delta versions/files have been processed.
  • You can still keep loadts for lineage/audit, but it’s not the incremental anchor.
  • Works well on Serverless performance-optimized with low-ops semantics and batch-like scheduling (run-once jobs that use availableNow / Trigger.Once).

Pros: Exactly-once, no look-back guessing, fully “industry standard” Delta Architecture pattern.
Cons: Requires refactoring your downstream job to Structured Streaming semantics.


Option 2 – Drive incrementality by Delta table version / commit timestamp (batch but commit-aware)

If you must stay with pure batch SQL:

  1. Maintain an audit of last processed Delta version or commit timestamp for the source table.
  2. On each run:
    • Read changes since that version using Change Data Feed (table_changes) or VERSION AS OF ranges.
    • After successful processing, update the stored “last_processed_version”.

Example pattern with CDF:

-- Audit table stores last_processed_version for 'bronze_kafka_delta'
SELECT last_processed_version FROM audit_table WHERE source_table = 'bronze_kafka_delta';

-- In job:
SELECT * 
FROM table_changes('bronze_kafka_delta', last_processed_version + 1);

-- After job succeeds:
UPDATE audit_table
SET last_processed_version = <current_max_version>
WHERE source_table = 'bronze_kafka_delta';

Because Delta versions are assigned at commit time, you never advance past data that hasn’t committed yet, regardless of loadts or storage _file_modified_at gaps.

Pros: Still batch, fully deterministic, no time look-back guessing.
Cons: Slightly more plumbing for version tracking and CDF, less “drop-in” than Option 1.


Option 3 – Use a true ingestion/commit-time column as the watermark (if you can change ingestion)

If you control the ingestion job, you can:

  • Use Auto Loader or a structured streaming Kafka → Delta writer that adds an ingestion timestamp that reflects “commit-ish” time (or at least ingestion-time on the writer, not at record creation); or
  • Populate a ingest_commit_ts based on Delta metadata in a post-step (e.g., a small job that stamps new rows using commit info from the log).

Your downstream incremental then keys off ingest_commit_ts, not loadts, e.g.:

WHERE ingest_commit_ts >= last_max_ingest_commit_ts

This makes the logical watermark much closer to when the data actually became visible, so even if loadts is early, you still don’t skip data.

Pros: Keeps a timestamp-based pattern if that’s required by some tooling.
Cons: More custom work; still subtly more fragile than Options 1–2, and you must ensure ingest_commit_ts is monotonic with commit.


Recommendation

Given your requirements (high availability, Serverless, and desire for a bulletproof pattern), I strongly recommend:

Adopt Option 1: downstream as a Structured Streaming read from the bronze Delta table, using Trigger.Once or availableNow and relying on Delta log offsets instead of loadts.

If organizationally you must stay “batch-only,” Option 2 (Delta version / CDF–based incrementality) is the next-best alternative.

View solution in original post