cancel
Showing results for 
Search instead for 
Did you mean: 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results for 
Search instead for 
Did you mean: 

Building a Production‑Style SCD Type 2 Dimension on Delta Lake — Using Databricks Community Edition

Prosenjeet33
New Contributor III

If you’ve ever needed to maintain historical truth in a data warehouse, you’ve likely bumped into Slowly Changing Dimensions (SCD)—specifically Type 2. In SCD2, we keep every version of a record as it changes over time, so analysis can answer questions like “What was the customer’s region when they placed that order in March?”.

In this post, I’ll show you how to implement a robust, production‑style SCD Type 2 pipeline on Delta Lake using Databricks Community Edition (CE). No enterprise bells and whistles—just lean, elegant techniques you can run today.

What you’ll learn

  • A clean SCD2 schema pattern for Delta tables
  • How to compute change detection at scale (using hashes)
  • A two‑phase upsert strategy that’s easy to reason about
  • How to add time travel, auditing, and vacuum hygiene
  • Optional performance tuning tips that work even on CE

    Prerequisites

    • A running cluster in Databricks Community Edition
    • A Python notebook (SQL will be used where it makes sense)
    • Basic familiarity with Spark DataFrames and Delta tables

      The SCD2 Table Shape (Delta)

      We’ll maintain a dimension table with the following columns:

      • business_key: natural key (e.g., customer_id)
      • attributes: e.g., name, region, segment
      • valid_from, valid_to: versioning window
      • is_current: fast filter for the latest version
      • record_hash: hash of attributes for quick change detection
      • ingest_ts and batch_id: for auditing lineage

        Tip: This pattern keeps history queries simple and makes current-state lookups blazing fast.

        Step 1: Set Up a Workspace and Some Sample Data

        We’ll create a small, realistic dataset with a few evolving records.

        from pyspark.sql import functions as F
        from pyspark.sql import types as T

         

        # Workspace config
        catalog_db = "scd2_demo_db"
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_db}")
        spark.sql(f"USE {catalog_db}")

         

        # Helper: stable hash for change detection (pick the cols you care about)
        def hash_cols(*cols):
            return F.sha2(F.concat_ws("||", *[F.coalesce(c.cast("string"), F.lit("")) for c in cols]), 256)

         

        # Initial batch (b1)
        data_b1 = [
            (101, "Asha",   "North", "SMB"),
            (102, "Ravi",   "West",  "Enterprise"),
            (103, "Meera",  "South", "MidMarket"),
        ]
        schema = T.StructType([
            T.StructField("customer_id", T.IntegerType(), False),
            T.StructField("name",        T.StringType(),  True),
            T.StructField("region",      T.StringType(),  True),
            T.StructField("segment",     T.StringType(),  True),
        ])

         

        b1_df = spark.createDataFrame(data_b1, schema) \
            .withColumn("record_hash", hash_cols(F.col("name"), F.col("region"), F.col("segment"))) \
            .withColumn("batch_id", F.lit("b1")) \
            .withColumn("ingest_ts", F.current_timestamp())

         

        b1_df.display()
        Step 2: Create the SCD2 Delta Table
        CREATE TABLE IF NOT EXISTS scd2_customer_dim (
          customer_id INT NOT NULL,
          name        STRING,
          region      STRING,
          segment     STRING,
          valid_from  TIMESTAMP,
          valid_to    TIMESTAMP,
          is_current  BOOLEAN,
          record_hash STRING,
          ingest_ts   TIMESTAMP,
          batch_id    STRING
        )
        USING DELTA;

         

        -- Optional: add simple expectation-like constraints you’ll police in your pipeline
        -- (In CE, we do this via queries/process rather than Unity Catalog-level constraints.)
        Load the first batch as the initial current state:
        from pyspark.sql import functions as F

         

        b1_for_insert = b1_df.select(
            "customer_id","name","region","segment","record_hash","ingest_ts","batch_id"
        ).withColumn("valid_from", F.current_timestamp()) \
        .withColumn("valid_to",   F.lit(None).cast("timestamp")) \
        .withColumn("is_current", F.lit(True))

         

        b1_for_insert.write.format("delta").mode("append").saveAsTable("scd2_customer_dim")
        Step 3: The Two‑Phase SCD2 Algorithm

        Why two phases? Because a single MERGE can get tricky when you need to both expire the old version and insert a new one for the same key in the same run. The two‑phase approach is explicit, auditable, and easy to test.

        Phase A — Detect and expire changed records
        Phase B — Insert new versions for changed or brand‑new records

         

        Step 4: Simulate a New Ingest (Changes + New Rows)

        Let’s say:

        • Ravi moves from West → North (change)
        • Meera is unchanged
        • New customer Kiran arrives
          data_b2 = [
              (101, "Asha",  "North", "SMB"),         # unchanged
              (102, "Ravi",  "North", "Enterprise"),  # changed region
              (103, "Meera", "South", "MidMarket"),   # unchanged
              (104, "Kiran", "East",  "SMB"),         # brand new
          ]

           

          b2_df = spark.createDataFrame(data_b2, schema) \
              .withColumn("record_hash", hash_cols(F.col("name"), F.col("region"), F.col("segment"))) \
              .withColumn("batch_id", F.lit("b2")) \
              .withColumn("ingest_ts", F.current_timestamp())

          Phase A — Detect Changes and Expire Old Versions

          1. Identify the current target rows (fast filter on is_current = true).
          2. Join with the incoming batch on the business key.
          3. Mark those where the hash differs as changed.
          4. Expire those in the target (set valid_to and is_current=false).
            from pyspark.sql import functions as F

             

            target_cur = spark.table("scd2_customer_dim").where("is_current = true").select(
                "customer_id","record_hash"
            ).withColumnRenamed("record_hash","tgt_hash")

             

            incoming = b2_df.select("customer_id","record_hash")

             

            changed_keys = incoming.join(target_cur, "customer_id", "inner") \
                .where(F.col("record_hash") != F.col("tgt_hash")) \
                .select("customer_id").distinct()

             

            changed_keys.createOrReplaceTempView("changed_keys_view")

             

            # Expire the previous versions for changed keys
            spark.sql("""
              UPDATE scd2_customer_dim
              SET valid_to = current_timestamp(), is_current = false
              WHERE is_current = true
                AND customer_id IN (SELECT customer_id FROM changed_keys_view)
            """)

            Phase B — Insert New Versions (Changed + Brand‑New)

            We’ll prepare two sets:

            • changed_now: keys we just expired
            • brand_new: keys never seen before
              # Changed records (for new version rows)
              changed_now = b2_df.join(changed_keys, "customer_id", "inner")

               

              # Brand new keys: anti-join against ANY historical target row
              all_target_keys = spark.table("scd2_customer_dim").select("customer_id").distinct()
              brand_new = b2_df.join(all_target_keys, "customer_id", "left_anti")

               

              to_insert = changed_now.unionByName(brand_new).withColumn("valid_from", F.current_timestamp()) \
                  .withColumn("valid_to",   F.lit(None).cast("timestamp")) \
                  .withColumn("is_current", F.lit(True))

               

              to_insert.select(
                  "customer_id","name","region","segment",
                  "valid_from","valid_to","is_current",
                  "record_hash","ingest_ts","batch_id"
              ).write.format("delta").mode("append").saveAsTable("scd2_customer_dim")
              Validate the result:
              SELECT customer_id, name, region, segment, is_current, valid_from, valid_to, batch_id
              FROM scd2_customer_dim
              ORDER BY customer_id, valid_from;
              You should now see Ravi’s prior record closed (is_current=false, valid_to filled) and a new current row with region='North'. Kiran appears as a first‑time insert.

              Time Travel (Debugging & Audits)

              Delta Lake’s time travel makes SCD2 pleasant to operate. Check historical snapshots:

              DESCRIBE HISTORY scd2_customer_dim;

               

              -- Pick a version from the history output:
              SELECT * FROM scd2_customer_dim VERSION AS OF 0;

              Hygiene: Vacuum & Retention

              CE is perfect for practicing data lifecycle management:

              -- See current retention
              SHOW TBLPROPERTIES scd2_customer_dim;

               

              -- Clean up old files (default retention 7 days; don’t go below unless you know what you’re doing)
              VACUUM scd2_customer_dim;
              ``
              ⚠️ If you tweak retention below 7 days for learning, remember you’ll lose the ability to time travel that far back.

              Optional: Performance Tips That Work in CE

              Even small clusters benefit from good habits:

              1. Selective columns in joins and writes — trim the DataFrame early.
              2. Predicate pushdown — filter by is_current before joining.
              3. Skew awareness — if one key is super hot, try salting (add a random bucket on the source, then aggregate).
              4. Optimize small dimensions — broadcast join them when referencing from facts:
                from pyspark.sql.functions import broadcast
                fact_join = facts_df.join(broadcast(spark.table("scd2_customer_dim").where("is_current = true")),
                                          "customer_id", "left")
                Z-Ordering / Optimize — if your CE runtime exposes it in your account, you can try:
                OPTIMIZE scd2_customer_dim ZORDER BY (customer_id);

                Turning This Into a Stream (Bonus)

                When you’re ready, you can convert the batch logic into micro‑batch streaming using foreachBatch and call the same two‑phase functions:

                def scd2_upsert(batch_df, batch_id):
                    # compute hashes, then run Phase A + Phase B against scd2_customer_dim
                    from pyspark.sql import functions as F
                    incoming = batch_df.withColumn("record_hash", hash_cols("name","region","segment")) \
                                       .withColumn("ingest_ts", F.current_timestamp()) \
                                       .withColumn("batch_id", F.lit(str(batch_id)))
                    incoming.createOrReplaceTempView("incoming_view")

                 

                    # Phase A (expire changed)
                    spark.sql("""
                      WITH tgt_cur AS (
                        SELECT customer_id, record_hash AS tgt_hash
                        FROM scd2_customer_dim WHERE is_current = true
                      ),
                      inc AS (
                        SELECT customer_id, record_hash FROM incoming_view
                      ),
                      changed AS (
                        SELECT inc.customer_id
                        FROM inc JOIN tgt_cur USING (customer_id)
                        WHERE inc.record_hash <> tgt_cur.tgt_hash
                      )
                      UPDATE scd2_customer_dim
                      SET valid_to = current_timestamp(), is_current = false
                      WHERE is_current = true
                        AND customer_id IN (SELECT customer_id FROM changed)
                    """)

                 

                    # Phase B (insert changed + brand-new)
                    changed_keys_df = spark.sql("""
                      WITH tgt_cur AS (
                        SELECT customer_id, record_hash AS tgt_hash
                        FROM scd2_customer_dim WHERE is_current = true
                      ),
                      inc AS (
                        SELECT customer_id, record_hash FROM incoming_view
                      )
                      SELECT inc.customer_id
                      FROM inc JOIN tgt_cur USING (customer_id)
                      WHERE inc.record_hash <> tgt_cur.tgt_hash
                    """).distinct()

                 

                    changed_now = incoming.join(changed_keys_df, "customer_id", "inner")
                    all_target_keys = spark.table("scd2_customer_dim").select("customer_id").distinct()
                    brand_new = incoming.join(all_target_keys, "customer_id", "left_anti")

                 

                    to_insert = changed_now.unionByName(brand_new) \
                        .withColumn("valid_from", F.current_timestamp()) \
                        .withColumn("valid_to",   F.lit(None).cast("timestamp")) \
                        .withColumn("is_current", F.lit(True))

                 

                    to_insert.select(
                        "customer_id","name","region","segment",
                        "valid_from","valid_to","is_current",
                        "record_hash","ingest_ts","batch_id"
                    ).write.format("delta").mode("append").saveAsTable("scd2_customer_dim")

                 

                # Example streaming source: drop CSVs into /FileStore/landing/customers
                landing_path = "/FileStore/landing/customers"

                 

                schema_stream = schema  # reuse
                spark.readStream.format("csv").schema(schema_stream).option("header", True).load(landing_path) \
                     .writeStream.outputMode("append").trigger(processingTime="10 seconds") \
                     .foreachBatch(scd2_upsert).start()
                Note: CE doesn’t provide Auto Loader, but file‑based streams work well for demos. Just upload small CSVs to the landing path between triggers.

                Why This Approach Works in the Real World

                • Deterministic logic: expire → insert is explicit and testable
                • Auditable: every row has valid_from/valid_to, batch_id, ingest_ts
                • Fast queries: is_current = true makes most BI joins trivial
                • Time travel: snapshot consistency for investigations and “as‑of” analytics

                  How to Personalize This Post (to keep it uniquely yours)

                  • Add a real scenario (e.g., “We changed territory mappings in Q4 and needed backtesting”)
                  • Drop in screenshots of your notebook outputs and DESCRIBE HISTORY
                  • Mention a gotcha you hit (e.g., forgetting to expire before inserting and getting duplicates)
                  • Close with metrics: how many rows processed, runtime on CE, etc.

                    Recap

                    You just built a clean, production‑style SCD Type 2 pipeline on Delta Lake using Databricks Community Edition:

                    • Two‑phase upsert (expire then insert)
                    • Hash‑based change detection
                    • Delta time travel + vacuum hygiene
                    • Optional tuning that works even on small clusters

                      If you’d like, I can turn this into:

                      • A LinkedIn‑ready shorter version (700–900 words)
                      • A GitHub gist with the full notebook code
                      • A diagram of the SCD2 flow (PNG/SVG)
                      • A follow‑up on incremental facts joining to current dimensions
                         

0 REPLIES 0