The "Target table is not empty" error occurs because the DLT/LDP engine needs to initialize its own internal metadata and SCD2 tracking columns (__START_AT, __END_AT) from the first micro-batch.
To solve this, instead of manually copying the data into the target table, you should feed the historical state through the flow as the initial snapshot.
1. Prepare your Migration Data
Ensure your migration data from day t-1 is available in a source location (like a Parquet/Delta table or a landing folder). Even if itโs an SCD2 table now, the snapshot flow works best if you can provide it as a state snapshot.
2. Create an Empty Target Streaming Table
Do not use INSERT INTO. Use the DLT/LDP declaration to create a clean slate.
3. Use a Versioned Source Function for the Migration
The key is the source argument. You can pass a function that allows you to "replay" history. If you have multiple daily snapshots from the legacy system, this function can iterate through them. If you only have the final t-1 state, the function will load that first.
def get_snapshot(version):
# If version is None, this is the first run (Migration Day)
if version is None:
# Load your legacy SCD2 data as the initial snapshot
df = spark.read.table("legacy_db.customer_migration_t_minus_1")
return (df, "v1")
# Logic for subsequent daily snapshots (Post-Migration)
# The 'version' variable helps track what has already been processed
df_daily = spark.read.format("cloudFiles").load("/path/to/daily/snapshots")
return (df_daily, "v2_plus")
dp.create_auto_cdc_from_snapshot_flow(
target = "customer_scd2",
source = get_snapshot,
keys = ["customer_id"],
sequence_by = "valid_from", # Use your legacy valid_from as the sequence
stored_as_scd_type = 2
)
If your legacy data is already in SCD2 format (multiple rows per ID with valid_to dates), make sure your sequence_by column is unique and strictly increasing per key so the flow can correctly order the history during the initial load.