SteveOstrowski
Databricks Employee
Databricks Employee

Hi @samuelperezh,

Building on @aleksandra_ch's reply, I wanted to add some additional detail around each of your three questions, especially around the grace period implementation and the backfill strategy.

1. GRACE PERIOD PATTERN

As aleksandra_ch noted, create_auto_cdc_from_snapshot_flow with stored_as_scd_type=2 is the right starting point. The SCD2 target table automatically sets __END_AT when a product disappears from a snapshot, then creates a new active row if it reappears. This gives you the raw history you need.

The grace period logic is best handled as a downstream materialized view in your pipeline. The idea is: let the AUTO CDC target capture every disappearance and reappearance faithfully, then apply your 3-day grace window in a clean-up layer on top.

Here is a corrected and more complete pattern. The key insight is that you want to "stitch together" consecutive active periods for the same product_id when the gap between them is 3 days or fewer:

CREATE MATERIALIZED VIEW inventory_scd2_with_grace AS
WITH ordered AS (
SELECT
  *,
  LAG(__END_AT) OVER (
    PARTITION BY product_id
    ORDER BY __START_AT
  ) AS prev_end_at
FROM inventory_scd2_raw
),
stitched AS (
SELECT
  *,
  CASE
    WHEN prev_end_at IS NOT NULL
      AND __START_AT <= prev_end_at + INTERVAL 3 DAYS
    THEN 0
    ELSE 1
  END AS new_group_flag
FROM ordered
),
grouped AS (
SELECT
  *,
  SUM(new_group_flag) OVER (
    PARTITION BY product_id
    ORDER BY __START_AT
    ROWS UNBOUNDED PRECEDING
  ) AS group_id
FROM stitched
)
SELECT
product_id,
-- take attributes from the latest row in each group
LAST_VALUE(name) OVER (
  PARTITION BY product_id, group_id
  ORDER BY __START_AT
  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS name,
LAST_VALUE(price) OVER (
  PARTITION BY product_id, group_id
  ORDER BY __START_AT
  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS price,
MIN(__START_AT) OVER (
  PARTITION BY product_id, group_id
) AS valid_from,
MAX(__END_AT) OVER (
  PARTITION BY product_id, group_id
) AS valid_to
FROM grouped

Then to derive is_sold and sold_date:

CREATE MATERIALIZED VIEW inventory_sold_status AS
SELECT DISTINCT
product_id,
name,
price,
valid_from,
valid_to,
CASE
  WHEN valid_to IS NOT NULL
    AND valid_to < current_date() - INTERVAL 3 DAYS
  THEN TRUE
  ELSE FALSE
END AS is_sold,
CASE
  WHEN valid_to IS NOT NULL
    AND valid_to < current_date() - INTERVAL 3 DAYS
  THEN valid_to
  ELSE NULL
END AS sold_date
FROM inventory_scd2_with_grace

The logic here: a product is considered sold only when it has been absent for more than 3 days (valid_to is more than 3 days in the past). Products that disappeared recently (within the grace window) are not yet marked as sold.

An alternative approach that avoids the downstream stitching entirely is to "carry forward" missing rows in your snapshot before passing it to create_auto_cdc_from_snapshot_flow. You would create a view that unions the current snapshot with products from the prior 3 snapshots that are not in the current one. This way the AUTO CDC engine never sees the temporary disappearances at all. The tradeoff is that it adds complexity to your source definition but simplifies the downstream.

2. BACKFILL STRATEGY

For the historical backfill (Sept 2025 to today), use the next_snapshot_and_version callback with create_auto_cdc_from_snapshot_flow. This is specifically designed for replaying historical snapshots in order. Here is a pattern:

import dlt
from pyspark.sql.functions import *

def next_snapshot_and_version(latest_version):
  from datetime import date, timedelta

  start_date = date(2025, 9, 1)

  if latest_version is None:
      next_date = start_date
  else:
      next_date = date.fromisoformat(str(latest_version)) + timedelta(days=1)

  if next_date > date.today():
      return None

  snapshot_path = f"s3://your-bucket/bronze/inventory/{next_date.isoformat()}/"

  try:
      df = spark.read.parquet(snapshot_path)
      return (df, next_date.isoformat())
  except Exception:
      # Skip dates with no snapshot file
      return next_snapshot_and_version(next_date.isoformat())

dlt.create_auto_cdc_from_snapshot_flow(
  target="inventory_scd2_raw",
  source=next_snapshot_and_version,
  keys=["product_id"],
  stored_as_scd_type=2,
  track_history_column_list=["name", "price", "status"]
)

A few tips for the backfill:
- Use track_history_column_list to limit which column changes generate new history rows. This keeps the SCD2 table from growing unnecessarily large.
- The pipeline will process one snapshot per update cycle. For ~180 historical dates, expect multiple pipeline runs. You can set the pipeline to run in triggered mode and schedule it to run repeatedly until caught up.
- After the backfill is complete, switch the source to a periodic snapshot (a live view of your current bronze table) for daily incremental processing.

Documentation reference for historical snapshot ingestion:
https://docs.databricks.com/en/dlt/cdc.html

3. BRONZE CLEANUP

You can safely reduce Bronze retention once the SCD2 Silver layer is built, but there are important caveats:

- If you do a full refresh of the Lakeflow Spark Declarative Pipeline (SDP), it drops the AUTO CDC target table and rebuilds from scratch. If Bronze only has 7 days of data, you lose all history prior to that.
- To prevent accidental full refreshes, set the pipeline configuration: pipelines.reset.allowed = false. This blocks the "Full refresh" button.
- Consider moving older Bronze snapshots to a cold storage tier (S3 Glacier, Azure Cool/Archive) rather than deleting them. This gives you a safety net for reprocessing at lower cost.
- VACUUM on managed tables respects the delta.deletedFileRetentionDuration setting (default 7 days). You can DELETE rows older than 7 days from your Bronze table and then VACUUM to reclaim space.

One more note: since you mentioned the product name changed, the feature formerly known as DLT is now called Lakeflow Spark Declarative Pipelines (SDP). The APIs (dlt.create_auto_cdc_from_snapshot_flow, etc.) remain the same, just the product branding has been updated.

Documentation references:
- AUTO CDC from snapshot: https://docs.databricks.com/en/dlt/cdc.html
- Pipeline configuration: https://docs.databricks.com/en/dlt/configure-pipeline.html
- VACUUM: https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.