- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-08-2026 10:02 PM
Hi @samuelperezh,
Building on @aleksandra_ch's reply, I wanted to add some additional detail around each of your three questions, especially around the grace period implementation and the backfill strategy.
1. GRACE PERIOD PATTERN
As aleksandra_ch noted, create_auto_cdc_from_snapshot_flow with stored_as_scd_type=2 is the right starting point. The SCD2 target table automatically sets __END_AT when a product disappears from a snapshot, then creates a new active row if it reappears. This gives you the raw history you need.
The grace period logic is best handled as a downstream materialized view in your pipeline. The idea is: let the AUTO CDC target capture every disappearance and reappearance faithfully, then apply your 3-day grace window in a clean-up layer on top.
Here is a corrected and more complete pattern. The key insight is that you want to "stitch together" consecutive active periods for the same product_id when the gap between them is 3 days or fewer:
CREATE MATERIALIZED VIEW inventory_scd2_with_grace AS
WITH ordered AS (
SELECT
*,
LAG(__END_AT) OVER (
PARTITION BY product_id
ORDER BY __START_AT
) AS prev_end_at
FROM inventory_scd2_raw
),
stitched AS (
SELECT
*,
CASE
WHEN prev_end_at IS NOT NULL
AND __START_AT <= prev_end_at + INTERVAL 3 DAYS
THEN 0
ELSE 1
END AS new_group_flag
FROM ordered
),
grouped AS (
SELECT
*,
SUM(new_group_flag) OVER (
PARTITION BY product_id
ORDER BY __START_AT
ROWS UNBOUNDED PRECEDING
) AS group_id
FROM stitched
)
SELECT
product_id,
-- take attributes from the latest row in each group
LAST_VALUE(name) OVER (
PARTITION BY product_id, group_id
ORDER BY __START_AT
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS name,
LAST_VALUE(price) OVER (
PARTITION BY product_id, group_id
ORDER BY __START_AT
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS price,
MIN(__START_AT) OVER (
PARTITION BY product_id, group_id
) AS valid_from,
MAX(__END_AT) OVER (
PARTITION BY product_id, group_id
) AS valid_to
FROM grouped
Then to derive is_sold and sold_date:
CREATE MATERIALIZED VIEW inventory_sold_status AS
SELECT DISTINCT
product_id,
name,
price,
valid_from,
valid_to,
CASE
WHEN valid_to IS NOT NULL
AND valid_to < current_date() - INTERVAL 3 DAYS
THEN TRUE
ELSE FALSE
END AS is_sold,
CASE
WHEN valid_to IS NOT NULL
AND valid_to < current_date() - INTERVAL 3 DAYS
THEN valid_to
ELSE NULL
END AS sold_date
FROM inventory_scd2_with_grace
The logic here: a product is considered sold only when it has been absent for more than 3 days (valid_to is more than 3 days in the past). Products that disappeared recently (within the grace window) are not yet marked as sold.
An alternative approach that avoids the downstream stitching entirely is to "carry forward" missing rows in your snapshot before passing it to create_auto_cdc_from_snapshot_flow. You would create a view that unions the current snapshot with products from the prior 3 snapshots that are not in the current one. This way the AUTO CDC engine never sees the temporary disappearances at all. The tradeoff is that it adds complexity to your source definition but simplifies the downstream.
2. BACKFILL STRATEGY
For the historical backfill (Sept 2025 to today), use the next_snapshot_and_version callback with create_auto_cdc_from_snapshot_flow. This is specifically designed for replaying historical snapshots in order. Here is a pattern:
import dlt
from pyspark.sql.functions import *
def next_snapshot_and_version(latest_version):
from datetime import date, timedelta
start_date = date(2025, 9, 1)
if latest_version is None:
next_date = start_date
else:
next_date = date.fromisoformat(str(latest_version)) + timedelta(days=1)
if next_date > date.today():
return None
snapshot_path = f"s3://your-bucket/bronze/inventory/{next_date.isoformat()}/"
try:
df = spark.read.parquet(snapshot_path)
return (df, next_date.isoformat())
except Exception:
# Skip dates with no snapshot file
return next_snapshot_and_version(next_date.isoformat())
dlt.create_auto_cdc_from_snapshot_flow(
target="inventory_scd2_raw",
source=next_snapshot_and_version,
keys=["product_id"],
stored_as_scd_type=2,
track_history_column_list=["name", "price", "status"]
)
A few tips for the backfill:
- Use track_history_column_list to limit which column changes generate new history rows. This keeps the SCD2 table from growing unnecessarily large.
- The pipeline will process one snapshot per update cycle. For ~180 historical dates, expect multiple pipeline runs. You can set the pipeline to run in triggered mode and schedule it to run repeatedly until caught up.
- After the backfill is complete, switch the source to a periodic snapshot (a live view of your current bronze table) for daily incremental processing.
Documentation reference for historical snapshot ingestion:
https://docs.databricks.com/en/dlt/cdc.html
3. BRONZE CLEANUP
You can safely reduce Bronze retention once the SCD2 Silver layer is built, but there are important caveats:
- If you do a full refresh of the Lakeflow Spark Declarative Pipeline (SDP), it drops the AUTO CDC target table and rebuilds from scratch. If Bronze only has 7 days of data, you lose all history prior to that.
- To prevent accidental full refreshes, set the pipeline configuration: pipelines.reset.allowed = false. This blocks the "Full refresh" button.
- Consider moving older Bronze snapshots to a cold storage tier (S3 Glacier, Azure Cool/Archive) rather than deleting them. This gives you a safety net for reprocessing at lower cost.
- VACUUM on managed tables respects the delta.deletedFileRetentionDuration setting (default 7 days). You can DELETE rows older than 7 days from your Bronze table and then VACUUM to reclaim space.
One more note: since you mentioned the product name changed, the feature formerly known as DLT is now called Lakeflow Spark Declarative Pipelines (SDP). The APIs (dlt.create_auto_cdc_from_snapshot_flow, etc.) remain the same, just the product branding has been updated.
Documentation references:
- AUTO CDC from snapshot: https://docs.databricks.com/en/dlt/cdc.html
- Pipeline configuration: https://docs.databricks.com/en/dlt/configure-pipeline.html
- VACUUM: https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.