Hello Brahma,

Hope you are having a good day.

I took your suggestion to rewrite my logic to use MERGE to simulate SCD 2 in my DLT pipeline, and need your help. I am keeping it simple as far as what changes I am testing in my customers table. Which has a column email that I am trying to change and make sure my Dim Customers is tracking the history by updating the change and inserting a new record. But I am not able to get to see the new inserted record in my Dim Customers, though update of the new value of the email is reflected. Below is my code for the SCD 2. Please let me know if you need more details.

Thanks!

 

Code for SCD 2 -

from delta.tables import DeltaTable

@dlt.table(name="dim_customers", comment="Customer dimension with manual SCD Type 2")
def dim_customers():
    new_df = (dlt.read("customers_silver")
                .withColumn("start_date", col("updated_at"))  # Use source timestamp
                .withColumn("is_current", lit(True))
                .withColumn("end_date", lit(None).cast("timestamp"))
    )

    try:
        # Read the existing dimension table (dim_customers)
        delta_table = DeltaTable.forName("dim_customers")
    except Exception:
        # First run — no dim_customers yet
        return new_df.withColumn("is_current", lit(True)).withColumn("end_date", lit(None).cast("timestamp"))
   
    # Perform MERGE operation to implement SCD Type 2
    merge_condition = """
        existing.customer_id = new.customer_id
        AND existing.is_current = true
        AND (
            existing.email <> new.email
        )
    """

    # We want to close old records when there is a change in data (email)
    # Apply the MERGE statement
    delta_table.alias("existing").merge(
        new_df.alias("new"),
        merge_condition
    ).whenMatchedUpdate(
        set={
            "is_current": lit(False),
            "end_date": col("new.updated_at")
        }
    ).whenNotMatchedInsertAll().execute()

    return delta_table.toDF()