Architecture Advice: DLT Strategy for Daily Snapshots to SCD2 with "Grace Period" Deletes

samuelperezh
New Contributor

Hi everyone,

I’m looking for architectural advice on building a Silver layer in DLT. I am dealing with inventory data and need to handle historical tracking, "sold" logic based on disappearance, and storage cost optimization.

Here's how the situation looks like:

  • Source (Bronze): I receive a full snapshot of the inventory every day (~60GB/day).

  • History: I have raw snapshots sitting in object storage starting from September 2025 (it's already at around 7TB).

  • Goal (Silver): I need an SCD Type 2 table to track changes in attributes (price, status) and determine when a product is sold.

These are the challenges I'm having:

  1. "Sold" Logic with Grace Period:
    Since the source is a full snapshot, a missing product id implies it was sold. However, data quality issues mean products sometimes disappear for 1-2 days and then reappear.

    • I need to implement a 3-day grace period.

    • If a product id is missing for >3 consecutive days, mark it as is_sold = True and set sold_date to the first day it went missing.

    • If it reappears within 3 days, it should remain active (ignore the gap).

  2. Backfill + Incremental:
    I need to process the history (Sept 2025 to Today) sequentially to build the SCD2 history correctly, and then switch to daily incremental processing for new snapshots.

  3. Storage Costs:
    My cloud storage costs are high ($300+/mo) because I am retaining daily 60GB full snapshots in Bronze indefinitely. Once the data is processed into the SCD2 Silver layer, I want to aggressively clean up the Bronze table/files and keep only 7 days.

I am looking at using dlt.create_auto_cdc_from_snapshot_flow. However, I'm unsure how to inject the Grace Period logic into the native CDC flow, as the native function tends to mark deletes immediately upon disappearance.

My Questions:

  1. Pattern for Grace Period: Is it better to stick with create_auto_cdc_from_snapshot_flow and try to manipulate the snapshot frame before passing it to the CDC engine like carrying forward missing rows for 3 days? Or should I write a custom apply_changes logic that manages a "consecutive_missing_days" counter manually?

  2. Backfill Strategy: For the initial run, should I create a logic that loops through all historical dates in order within the DLT definition? Or is there a better way to "replay" history into an SCD2 table?

  3. Bronze Cleanup: Since Silver will contain the full history (SCD2), is it safe/recommended to run a daily VACUUM and DELETE on the Bronze table to keep only the last 7 days of snapshots? I want to ensure this doesn't break the DLT lineage or backfill capabilities if I ever need to recompute from recent history.

Any snippets or patterns for handling this problem on full snapshots would be greatly appreciated!

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @samuelperezh ,

Pattern for Grace Period

The auto_cdc_from_snapshot SCD2 should be able to naturally take care of your scenario. Let's walk through an example:

1. Initial load with 2 products on 25/02/2026:

product_id name price
1 foo 10
2 bar 20
3 baz 30

The target of the auto_cdc_from_snapshot SCD2 will be:

product_id name price __START_AT __END_AT
1 foo 10 25/02/2026 NULL
2 bar 20 25/02/2026 NULL
3 baz 30 25/02/2026 NULL

2. Assume that bar and baz disappear on the load of 26/02/2026:

product_id name price
1 foo 10

The target of auto_cdc_from_snapshot will be updated to:

product_id name price __START_AT __END_AT
1 foo 10 25/02/2026 NULL
2 bar 20 25/02/2026 26/02/2026
3 baz 30 25/02/2026 26/02/2026

3. The bar reappears the next day, on 27/02/2026:

product_id name price
1 foo 10
2 bar 20

The target CDC table now reflects the reappearance:

product_id name price __START_AT __END_AT
1 foo 10 25/02/2026 NULL
2 bar 20 25/02/2026 26/02/2026
2 bar 20 27/02/2026 NULL
3 baz 30 25/02/2026 26/02/2026

The wrongly deleted data will appear as a duplicate which needs to be handled in the downstream:

  • If the new __START_AT is earlier than previous __END_AT + 3 days, we keep the record. Otherwise, it will be ignored.
  • For each identical row, we retrieve only the newest record.
CREATE MATERIALIZED VIEW target_auto_cdc_clean AS
WITH filtered AS (
  SELECT *,
    LEAD(__END_AT) OVER (PARTITION BY name, age ORDER BY __END_AT DESC NULLS FIRST) AS prev_end_at 
  FROM names_cdc
),
valid AS (
  SELECT *
  FROM filtered
  WHERE prev_end_at IS NULL 
     OR (
          prev_end_at IS NOT NULL 
          AND __START_AT <= prev_end_at + INTERVAL 3 DAYS 
        )
),
ranked AS (
  SELECT *, row_number() OVER (PARTITION BY name, age ORDER BY __START_AT DESC) AS rn 
  FROM valid
)
SELECT * FROM ranked WHERE rn = 1

This will return:

product_id name price __START_AT __END_AT
1 foo 10 25/02/2026 NULL
2 bar 20 27/02/2026 NULL
3 baz 30 25/02/2026 26/02/2026

Then, to apply the grace logic:

SELECT *, 
    CASE 
        WHEN __END_AT > current_date() + INTERVAL 3 DAYS THEN TRUE
        ELSE FALSE 
    END as is_sold,
    CASE
        WHEN __END_AT > current_date() + INTERVAL 3 DAYS THEN __END_AT
        ELSE NULL END as sold_date
FROM target_auto_cdc_clean
Feel free to double check the queries to make sure they follow the business logic.
 
2. Backfill strategy. You can leverage the next_snapshot_and_version function to loop over the past dates to perform historical backfilling. However, this would be an expensive operation if you have a lot of history. There is a tradeoff between the history depth in the SCD2 table vs time and cost spent to build it. 
 
3. Bronze Cleanup. Be mindful that, if you delete the data from the bronze tables, you won't be able to recompute the whole history. For example, if you hit Run pipeline with full table refresh, it will drop the AUTO CDC target table and start from scratch. If only last 7 days of data are present in the bronze layer, the new fully refreshed AUTO CDC target will contain only the last 7 days. You can move the old bronze data into a cold storage to reduce costs. Retrieve it whenever a full refresh is needed. This is again a tradeoff between cost and history depth. You can also prevent full table refresh at all by setting pipelines.reset.allowed = false in the pipeline.
 
Hope it helps.
 
Best regards,

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @samuelperezh,

Building on @aleksandra_ch's reply, I wanted to add some additional detail around each of your three questions, especially around the grace period implementation and the backfill strategy.

1. GRACE PERIOD PATTERN

As aleksandra_ch noted, create_auto_cdc_from_snapshot_flow with stored_as_scd_type=2 is the right starting point. The SCD2 target table automatically sets __END_AT when a product disappears from a snapshot, then creates a new active row if it reappears. This gives you the raw history you need.

The grace period logic is best handled as a downstream materialized view in your pipeline. The idea is: let the AUTO CDC target capture every disappearance and reappearance faithfully, then apply your 3-day grace window in a clean-up layer on top.

Here is a corrected and more complete pattern. The key insight is that you want to "stitch together" consecutive active periods for the same product_id when the gap between them is 3 days or fewer:

CREATE MATERIALIZED VIEW inventory_scd2_with_grace AS
WITH ordered AS (
SELECT
  *,
  LAG(__END_AT) OVER (
    PARTITION BY product_id
    ORDER BY __START_AT
  ) AS prev_end_at
FROM inventory_scd2_raw
),
stitched AS (
SELECT
  *,
  CASE
    WHEN prev_end_at IS NOT NULL
      AND __START_AT <= prev_end_at + INTERVAL 3 DAYS
    THEN 0
    ELSE 1
  END AS new_group_flag
FROM ordered
),
grouped AS (
SELECT
  *,
  SUM(new_group_flag) OVER (
    PARTITION BY product_id
    ORDER BY __START_AT
    ROWS UNBOUNDED PRECEDING
  ) AS group_id
FROM stitched
)
SELECT
product_id,
-- take attributes from the latest row in each group
LAST_VALUE(name) OVER (
  PARTITION BY product_id, group_id
  ORDER BY __START_AT
  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS name,
LAST_VALUE(price) OVER (
  PARTITION BY product_id, group_id
  ORDER BY __START_AT
  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS price,
MIN(__START_AT) OVER (
  PARTITION BY product_id, group_id
) AS valid_from,
MAX(__END_AT) OVER (
  PARTITION BY product_id, group_id
) AS valid_to
FROM grouped

Then to derive is_sold and sold_date:

CREATE MATERIALIZED VIEW inventory_sold_status AS
SELECT DISTINCT
product_id,
name,
price,
valid_from,
valid_to,
CASE
  WHEN valid_to IS NOT NULL
    AND valid_to < current_date() - INTERVAL 3 DAYS
  THEN TRUE
  ELSE FALSE
END AS is_sold,
CASE
  WHEN valid_to IS NOT NULL
    AND valid_to < current_date() - INTERVAL 3 DAYS
  THEN valid_to
  ELSE NULL
END AS sold_date
FROM inventory_scd2_with_grace

The logic here: a product is considered sold only when it has been absent for more than 3 days (valid_to is more than 3 days in the past). Products that disappeared recently (within the grace window) are not yet marked as sold.

An alternative approach that avoids the downstream stitching entirely is to "carry forward" missing rows in your snapshot before passing it to create_auto_cdc_from_snapshot_flow. You would create a view that unions the current snapshot with products from the prior 3 snapshots that are not in the current one. This way the AUTO CDC engine never sees the temporary disappearances at all. The tradeoff is that it adds complexity to your source definition but simplifies the downstream.

2. BACKFILL STRATEGY

For the historical backfill (Sept 2025 to today), use the next_snapshot_and_version callback with create_auto_cdc_from_snapshot_flow. This is specifically designed for replaying historical snapshots in order. Here is a pattern:

import dlt
from pyspark.sql.functions import *

def next_snapshot_and_version(latest_version):
  from datetime import date, timedelta

  start_date = date(2025, 9, 1)

  if latest_version is None:
      next_date = start_date
  else:
      next_date = date.fromisoformat(str(latest_version)) + timedelta(days=1)

  if next_date > date.today():
      return None

  snapshot_path = f"s3://your-bucket/bronze/inventory/{next_date.isoformat()}/"

  try:
      df = spark.read.parquet(snapshot_path)
      return (df, next_date.isoformat())
  except Exception:
      # Skip dates with no snapshot file
      return next_snapshot_and_version(next_date.isoformat())

dlt.create_auto_cdc_from_snapshot_flow(
  target="inventory_scd2_raw",
  source=next_snapshot_and_version,
  keys=["product_id"],
  stored_as_scd_type=2,
  track_history_column_list=["name", "price", "status"]
)

A few tips for the backfill:
- Use track_history_column_list to limit which column changes generate new history rows. This keeps the SCD2 table from growing unnecessarily large.
- The pipeline will process one snapshot per update cycle. For ~180 historical dates, expect multiple pipeline runs. You can set the pipeline to run in triggered mode and schedule it to run repeatedly until caught up.
- After the backfill is complete, switch the source to a periodic snapshot (a live view of your current bronze table) for daily incremental processing.

Documentation reference for historical snapshot ingestion:
https://docs.databricks.com/en/dlt/cdc.html

3. BRONZE CLEANUP

You can safely reduce Bronze retention once the SCD2 Silver layer is built, but there are important caveats:

- If you do a full refresh of the Lakeflow Spark Declarative Pipeline (SDP), it drops the AUTO CDC target table and rebuilds from scratch. If Bronze only has 7 days of data, you lose all history prior to that.
- To prevent accidental full refreshes, set the pipeline configuration: pipelines.reset.allowed = false. This blocks the "Full refresh" button.
- Consider moving older Bronze snapshots to a cold storage tier (S3 Glacier, Azure Cool/Archive) rather than deleting them. This gives you a safety net for reprocessing at lower cost.
- VACUUM on managed tables respects the delta.deletedFileRetentionDuration setting (default 7 days). You can DELETE rows older than 7 days from your Bronze table and then VACUUM to reclaim space.

One more note: since you mentioned the product name changed, the feature formerly known as DLT is now called Lakeflow Spark Declarative Pipelines (SDP). The APIs (dlt.create_auto_cdc_from_snapshot_flow, etc.) remain the same, just the product branding has been updated.

Documentation references:
- AUTO CDC from snapshot: https://docs.databricks.com/en/dlt/cdc.html
- Pipeline configuration: https://docs.databricks.com/en/dlt/configure-pipeline.html
- VACUUM: https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.