Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-18-2025 01:01 PM
Hello Brahma,
Hope you are having a good day.
I took your suggestion to rewrite my logic to use MERGE to simulate SCD 2 in my DLT pipeline, and need your help. I am keeping it simple as far as what changes I am testing in my customers table. Which has a column email that I am trying to change and make sure my Dim Customers is tracking the history by updating the change and inserting a new record. But I am not able to get to see the new inserted record in my Dim Customers, though update of the new value of the email is reflected. Below is my code for the SCD 2. Please let me know if you need more details.
Thanks!
Code for SCD 2 -
from delta.tables import DeltaTable
@dlt.table(name="dim_customers", comment="Customer dimension with manual SCD Type 2")
def dim_customers():
new_df = (dlt.read("customers_silver")
.withColumn("start_date", col("updated_at")) # Use source timestamp
.withColumn("is_current", lit(True))
.withColumn("end_date", lit(None).cast("timestamp"))
)
try:
# Read the existing dimension table (dim_customers)
delta_table = DeltaTable.forName("dim_customers")
except Exception:
# First run — no dim_customers yet
return new_df.withColumn("is_current", lit(True)).withColumn("end_date", lit(None).cast("timestamp"))
# Perform MERGE operation to implement SCD Type 2
merge_condition = """
existing.customer_id = new.customer_id
AND existing.is_current = true
AND (
existing.email <> new.email
)
"""
# We want to close old records when there is a change in data (email)
# Apply the MERGE statement
delta_table.alias("existing").merge(
new_df.alias("new"),
merge_condition
).whenMatchedUpdate(
set={
"is_current": lit(False),
"end_date": col("new.updated_at")
}
).whenNotMatchedInsertAll().execute()
return delta_table.toDF()