cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

Need a Sample MERGE INTO Query for SCD Type 2 Implementation

Akshay_Petkar
Contributor

Can anyone provide a sample MERGE INTO SQL query for implementing SCD Type 2 in Databricks using Delta Tables?

5 REPLIES 5

David_Torrejon
New Contributor III
Here an example for a customer table. 
 
The source_table contains new or updated customer data, and the target_table is the Delta table that maintains historical records.
 
Table Structures
source_table: contains the latest customer data.
customer_id: Unique identifier for the customer.
name: Customer's name.
address: Customer's address.
email: Customer's email.
phone: Customer's phone number.
 
target_table: contains the historical customer data.
customer_id: Unique identifier for the customer.
name: Customer's name.
address: Customer's address.
email: Customer's email.
phone: Customer's phone number.
valid_from: Date when the record became effective.
valid_to: Date until the record is effective.
is_current: Flag indicating the current active record.
hash_value: Hash of the attributes to detect changes.
 
WITH source_with_hash AS (
  SELECT 
    customer_id,
    name,
    address,
    email,
    phone,
    md5(concat_ws('|', name, address, email, phone)) AS hash_value
  FROM source_table
)
 
MERGE INTO target_table AS target
USING source_with_hash AS source
ON target.customer_id = source.customer_id
AND target.is_current = true
 
WHEN MATCHED AND target.hash_value != source.hash_value THEN
  UPDATE SET
    target.valid_to = current_date - 1,
    target.is_current = false
 
WHEN NOT MATCHED BY TARGET THEN
  INSERT (customer_id, name, address, email, phone, valid_from, valid_to, is_current, hash_value)
  VALUES (source.customer_id, source.name, source.address, source.email, source.phone, current_date, '9999-12-31', true, source.hash_value)
 
WHEN NOT MATCHED BY SOURCE AND target.is_current = true THEN
  UPDATE SET
    target.valid_to = current_date - 1,
    target.is_current = false;
 
Here the explanation about all parts of the sentence.
 
WITH Clause:
Creates a subquery source_with_hash that adds a hash_value column to the source_table. This column contains an MD5 hash of the relevant attributes to detect changes.
 
MATCHED Clause:
Handles updates where there are changes in the source data (source.hash_value is different from target.hash_value).
Updates the valid_to date of the current record in the target table to the previous day and sets is_current to false.
 
NOT MATCHED BY TARGET Clause:
Inserts new records that do not exist in the target table.
Inserts the new records with valid_from set to the current date, valid_to set to '9999-12-31', and is_current set to true.
 
NOT MATCHED BY SOURCE Clause:
Handles records that are in the target table but not in the source table (optional, if you want to handle deletions).
Updates the valid_to date to the previous day and sets is_current to false.
 
You only have to adjust the column names and logic according to your specific schema and requirements.
 
I hope it helps you.

Is there any limitation to the length of the string passed to md5 function when concatenating multiple columns to generate hash_value field ?

David_Torrejon
New Contributor III

also, in PySpark, the same example in pyspark:

from pyspark.sql.functions import col, concat_ws, current_date, lit, md5

source_df = spark.table("source_table")
target_df = spark.table("target_table")

source_with_hash_df = source_df.withColumn("hash_value", md5(concat_ws("|", col("name"), col("address"),  col("email"), col("phone"))))

target_df.alias("target").merge(
source_with_hash_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(
condition="target.hash_value != source.hash_value",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
).whenNotMatchedInsert(
values={
"customer_id": col("source.customer_id"),
"name": col("source.name"),
"address": col("source.address"),
"email": col("source.email"),
"phone": col("source.phone"),
"valid_from": current_date(),
"valid_to": lit("9999-12-31"),
"is_current": lit(True),
"hash_value": col("source.hash_value")
}
).whenNotMatchedBySourceUpdate(
condition="target.is_current = true",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
)

You have to add an action to execute.

 

 

JissMathew
New Contributor III

Hi @Akshay_Petkar , please refer this code ,

df = spark.read.format("delta").load(f"{bronze_folder_path}/Test_new")
 
Table Structure 
%sql
CREATE TABLE IF NOT EXISTS test_project_ws.demo.Test_merge (
ID INT,
 Name STRING ,
Address STRING,
date DATE,
 createdDate TIMESTAMP,
 updatedDate TIMESTAMP
 )
 USING DELTA
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

table_name = "test_project_ws.demo.Test_merge"
deltaTable = DeltaTable.forName(spark, table_name)

deltaTable.alias("tgt").merge(
    bronze_df.alias("upd"),
    "tgt.Id = upd.Id"
).whenMatchedUpdate(
    set={
        "Id": "upd.Id",
        "Name": "upd.Name",
        "Address": "upd.Address",
        "date": "upd.date",
        "updatedDate": "current_timestamp()"
    }
).whenNotMatchedInsert(
    values={
        "Id": "upd.Id",
        "Name": "upd.Name",
        "Address": "upd.Address",
        "date": "upd.date",
        "createdDate": "current_timestamp()"
    }
).execute()

bhanu_gautam
Contributor

@JissMathew  and @David_Torrejon , Thanks for sharing the example

Regards
Bhanu Gautam

Kudos are appreciated

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group