If you’ve ever needed to maintain historical truth in a data warehouse, you’ve likely bumped into Slowly Changing Dimensions (SCD)—specifically Type 2. In SCD2, we keep every version of a record as it changes over time, so analysis can answer questions like “What was the customer’s region when they placed that order in March?”.
In this post, I’ll show you how to implement a robust, production‑style SCD Type 2 pipeline on Delta Lake using Databricks Community Edition (CE). No enterprise bells and whistles—just lean, elegant techniques you can run today.
What you’ll learn
- A clean SCD2 schema pattern for Delta tables
- How to compute change detection at scale (using hashes)
- A two‑phase upsert strategy that’s easy to reason about
- How to add time travel, auditing, and vacuum hygiene
- Optional performance tuning tips that work even on CE
Prerequisites
- A running cluster in Databricks Community Edition
- A Python notebook (SQL will be used where it makes sense)
- Basic familiarity with Spark DataFrames and Delta tables
The SCD2 Table Shape (Delta)
We’ll maintain a dimension table with the following columns:
- business_key: natural key (e.g., customer_id)
- attributes: e.g., name, region, segment
- valid_from, valid_to: versioning window
- is_current: fast filter for the latest version
- record_hash: hash of attributes for quick change detection
- ingest_ts and batch_id: for auditing lineage
Tip: This pattern keeps history queries simple and makes current-state lookups blazing fast.
Step 1: Set Up a Workspace and Some Sample Data
We’ll create a small, realistic dataset with a few evolving records.
from pyspark.sql import functions as Ffrom pyspark.sql import types as T
# Workspace configcatalog_db = "scd2_demo_db"spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_db}")spark.sql(f"USE {catalog_db}")
# Helper: stable hash for change detection (pick the cols you care about)def hash_cols(*cols): return F.sha2(F.concat_ws("||", *[F.coalesce(c.cast("string"), F.lit("")) for c in cols]), 256)
# Initial batch (b1)data_b1 = [ (101, "Asha", "North", "SMB"), (102, "Ravi", "West", "Enterprise"), (103, "Meera", "South", "MidMarket"),]schema = T.StructType([ T.StructField("customer_id", T.IntegerType(), False), T.StructField("name", T.StringType(), True), T.StructField("region", T.StringType(), True), T.StructField("segment", T.StringType(), True),])
b1_df = spark.createDataFrame(data_b1, schema) \ .withColumn("record_hash", hash_cols(F.col("name"), F.col("region"), F.col("segment"))) \ .withColumn("batch_id", F.lit("b1")) \ .withColumn("ingest_ts", F.current_timestamp())
b1_df.display()Step 2: Create the SCD2 Delta Table
CREATE TABLE IF NOT EXISTS scd2_customer_dim ( customer_id INT NOT NULL, name STRING, region STRING, segment STRING, valid_from TIMESTAMP, valid_to TIMESTAMP, is_current BOOLEAN, record_hash STRING, ingest_ts TIMESTAMP, batch_id STRING)USING DELTA;
-- Optional: add simple expectation-like constraints you’ll police in your pipeline-- (In CE, we do this via queries/process rather than Unity Catalog-level constraints.)Load the first batch as the initial current state:from pyspark.sql import functions as F
b1_for_insert = b1_df.select( "customer_id","name","region","segment","record_hash","ingest_ts","batch_id").withColumn("valid_from", F.current_timestamp()) \.withColumn("valid_to", F.lit(None).cast("timestamp")) \.withColumn("is_current", F.lit(True))
b1_for_insert.write.format("delta").mode("append").saveAsTable("scd2_customer_dim")Step 3: The Two‑Phase SCD2 Algorithm
Why two phases? Because a single MERGE can get tricky when you need to both expire the old version and insert a new one for the same key in the same run. The two‑phase approach is explicit, auditable, and easy to test.
Phase A — Detect and expire changed records
Phase B — Insert new versions for changed or brand‑new records
Step 4: Simulate a New Ingest (Changes + New Rows)
Let’s say:
- Ravi moves from West → North (change)
- Meera is unchanged
- New customer Kiran arrives
data_b2 = [ (101, "Asha", "North", "SMB"), # unchanged (102, "Ravi", "North", "Enterprise"), # changed region (103, "Meera", "South", "MidMarket"), # unchanged (104, "Kiran", "East", "SMB"), # brand new]
b2_df = spark.createDataFrame(data_b2, schema) \ .withColumn("record_hash", hash_cols(F.col("name"), F.col("region"), F.col("segment"))) \ .withColumn("batch_id", F.lit("b2")) \ .withColumn("ingest_ts", F.current_timestamp())Phase A — Detect Changes and Expire Old Versions
- Identify the current target rows (fast filter on is_current = true).
- Join with the incoming batch on the business key.
- Mark those where the hash differs as changed.
- Expire those in the target (set valid_to and is_current=false).
from pyspark.sql import functions as F
target_cur = spark.table("scd2_customer_dim").where("is_current = true").select( "customer_id","record_hash").withColumnRenamed("record_hash","tgt_hash")
incoming = b2_df.select("customer_id","record_hash")
changed_keys = incoming.join(target_cur, "customer_id", "inner") \ .where(F.col("record_hash") != F.col("tgt_hash")) \ .select("customer_id").distinct()
changed_keys.createOrReplaceTempView("changed_keys_view")
# Expire the previous versions for changed keysspark.sql(""" UPDATE scd2_customer_dim SET valid_to = current_timestamp(), is_current = false WHERE is_current = true AND customer_id IN (SELECT customer_id FROM changed_keys_view)""")Phase B — Insert New Versions (Changed + Brand‑New)
We’ll prepare two sets:
- changed_now: keys we just expired
- brand_new: keys never seen before
# Changed records (for new version rows)changed_now = b2_df.join(changed_keys, "customer_id", "inner")
# Brand new keys: anti-join against ANY historical target rowall_target_keys = spark.table("scd2_customer_dim").select("customer_id").distinct()brand_new = b2_df.join(all_target_keys, "customer_id", "left_anti")
to_insert = changed_now.unionByName(brand_new).withColumn("valid_from", F.current_timestamp()) \ .withColumn("valid_to", F.lit(None).cast("timestamp")) \ .withColumn("is_current", F.lit(True))
to_insert.select( "customer_id","name","region","segment", "valid_from","valid_to","is_current", "record_hash","ingest_ts","batch_id").write.format("delta").mode("append").saveAsTable("scd2_customer_dim")Validate the result:
SELECT customer_id, name, region, segment, is_current, valid_from, valid_to, batch_idFROM scd2_customer_dimORDER BY customer_id, valid_from;You should now see
Ravi’s prior record closed (is_current=false, valid_to filled) and a new current row with region='North'. Kiran appears as a first‑time insert.Time Travel (Debugging & Audits)
Delta Lake’s time travel makes SCD2 pleasant to operate. Check historical snapshots:
DESCRIBE HISTORY scd2_customer_dim;
-- Pick a version from the history output:SELECT * FROM scd2_customer_dim VERSION AS OF 0;Hygiene: Vacuum & Retention
CE is perfect for practicing data lifecycle management:
-- See current retentionSHOW TBLPROPERTIES scd2_customer_dim;
-- Clean up old files (default retention 7 days; don’t go below unless you know what you’re doing)VACUUM scd2_customer_dim;``⚠️ If you tweak retention below 7 days for learning, remember you’ll lose the ability to time travel that far back.
Optional: Performance Tips That Work in CE
Even small clusters benefit from good habits:
- Selective columns in joins and writes — trim the DataFrame early.
- Predicate pushdown — filter by is_current before joining.
- Skew awareness — if one key is super hot, try salting (add a random bucket on the source, then aggregate).
- Optimize small dimensions — broadcast join them when referencing from facts:
from pyspark.sql.functions import broadcastfact_join = facts_df.join(broadcast(spark.table("scd2_customer_dim").where("is_current = true")), "customer_id", "left")Z-Ordering / Optimize — if your CE runtime exposes it in your account, you can try:OPTIMIZE scd2_customer_dim ZORDER BY (customer_id);Turning This Into a Stream (Bonus)
When you’re ready, you can convert the batch logic into micro‑batch streaming using foreachBatch and call the same two‑phase functions:
def scd2_upsert(batch_df, batch_id): # compute hashes, then run Phase A + Phase B against scd2_customer_dim from pyspark.sql import functions as F incoming = batch_df.withColumn("record_hash", hash_cols("name","region","segment")) \ .withColumn("ingest_ts", F.current_timestamp()) \ .withColumn("batch_id", F.lit(str(batch_id))) incoming.createOrReplaceTempView("incoming_view")
# Phase A (expire changed) spark.sql(""" WITH tgt_cur AS ( SELECT customer_id, record_hash AS tgt_hash FROM scd2_customer_dim WHERE is_current = true ), inc AS ( SELECT customer_id, record_hash FROM incoming_view ), changed AS ( SELECT inc.customer_id FROM inc JOIN tgt_cur USING (customer_id) WHERE inc.record_hash <> tgt_cur.tgt_hash ) UPDATE scd2_customer_dim SET valid_to = current_timestamp(), is_current = false WHERE is_current = true AND customer_id IN (SELECT customer_id FROM changed) """)
# Phase B (insert changed + brand-new) changed_keys_df = spark.sql(""" WITH tgt_cur AS ( SELECT customer_id, record_hash AS tgt_hash FROM scd2_customer_dim WHERE is_current = true ), inc AS ( SELECT customer_id, record_hash FROM incoming_view ) SELECT inc.customer_id FROM inc JOIN tgt_cur USING (customer_id) WHERE inc.record_hash <> tgt_cur.tgt_hash """).distinct()
changed_now = incoming.join(changed_keys_df, "customer_id", "inner") all_target_keys = spark.table("scd2_customer_dim").select("customer_id").distinct() brand_new = incoming.join(all_target_keys, "customer_id", "left_anti")
to_insert = changed_now.unionByName(brand_new) \ .withColumn("valid_from", F.current_timestamp()) \ .withColumn("valid_to", F.lit(None).cast("timestamp")) \ .withColumn("is_current", F.lit(True))
to_insert.select( "customer_id","name","region","segment", "valid_from","valid_to","is_current", "record_hash","ingest_ts","batch_id" ).write.format("delta").mode("append").saveAsTable("scd2_customer_dim")
# Example streaming source: drop CSVs into /FileStore/landing/customerslanding_path = "/FileStore/landing/customers"
schema_stream = schema # reusespark.readStream.format("csv").schema(schema_stream).option("header", True).load(landing_path) \ .writeStream.outputMode("append").trigger(processingTime="10 seconds") \ .foreachBatch(scd2_upsert).start()Note: CE doesn’t provide Auto Loader, but file‑based streams work well for demos. Just upload small CSVs to the landing path between triggers.Why This Approach Works in the Real World
- Deterministic logic: expire → insert is explicit and testable
- Auditable: every row has valid_from/valid_to, batch_id, ingest_ts
- Fast queries: is_current = true makes most BI joins trivial
- Time travel: snapshot consistency for investigations and “as‑of” analytics
How to Personalize This Post (to keep it uniquely yours)
- Add a real scenario (e.g., “We changed territory mappings in Q4 and needed backtesting”)
- Drop in screenshots of your notebook outputs and DESCRIBE HISTORY
- Mention a gotcha you hit (e.g., forgetting to expire before inserting and getting duplicates)
- Close with metrics: how many rows processed, runtime on CE, etc.
Recap
You just built a clean, production‑style SCD Type 2 pipeline on Delta Lake using Databricks Community Edition: