cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Need a Sample MERGE INTO Query for SCD Type 2 Implementation

Akshay_Petkar
Contributor

Can anyone provide a sample MERGE INTO SQL query for implementing SCD Type 2 in Databricks using Delta Tables?

5 REPLIES 5

David_Torrejon
New Contributor III
Here an example for a customer table. 
 
The source_table contains new or updated customer data, and the target_table is the Delta table that maintains historical records.
 
Table Structures
source_table: contains the latest customer data.
customer_id: Unique identifier for the customer.
name: Customer's name.
address: Customer's address.
email: Customer's email.
phone: Customer's phone number.
 
target_table: contains the historical customer data.
customer_id: Unique identifier for the customer.
name: Customer's name.
address: Customer's address.
email: Customer's email.
phone: Customer's phone number.
valid_from: Date when the record became effective.
valid_to: Date until the record is effective.
is_current: Flag indicating the current active record.
hash_value: Hash of the attributes to detect changes.
 
WITH source_with_hash AS (
  SELECT 
    customer_id,
    name,
    address,
    email,
    phone,
    md5(concat_ws('|', name, address, email, phone)) AS hash_value
  FROM source_table
)
 
MERGE INTO target_table AS target
USING source_with_hash AS source
ON target.customer_id = source.customer_id
AND target.is_current = true
 
WHEN MATCHED AND target.hash_value != source.hash_value THEN
  UPDATE SET
    target.valid_to = current_date - 1,
    target.is_current = false
 
WHEN NOT MATCHED BY TARGET THEN
  INSERT (customer_id, name, address, email, phone, valid_from, valid_to, is_current, hash_value)
  VALUES (source.customer_id, source.name, source.address, source.email, source.phone, current_date, '9999-12-31', true, source.hash_value)
 
WHEN NOT MATCHED BY SOURCE AND target.is_current = true THEN
  UPDATE SET
    target.valid_to = current_date - 1,
    target.is_current = false;
 
Here the explanation about all parts of the sentence.
 
WITH Clause:
Creates a subquery source_with_hash that adds a hash_value column to the source_table. This column contains an MD5 hash of the relevant attributes to detect changes.
 
MATCHED Clause:
Handles updates where there are changes in the source data (source.hash_value is different from target.hash_value).
Updates the valid_to date of the current record in the target table to the previous day and sets is_current to false.
 
NOT MATCHED BY TARGET Clause:
Inserts new records that do not exist in the target table.
Inserts the new records with valid_from set to the current date, valid_to set to '9999-12-31', and is_current set to true.
 
NOT MATCHED BY SOURCE Clause:
Handles records that are in the target table but not in the source table (optional, if you want to handle deletions).
Updates the valid_to date to the previous day and sets is_current to false.
 
You only have to adjust the column names and logic according to your specific schema and requirements.
 
I hope it helps you.

Is there any limitation to the length of the string passed to md5 function when concatenating multiple columns to generate hash_value field ?

David_Torrejon
New Contributor III

also, in PySpark, the same example in pyspark:

from pyspark.sql.functions import col, concat_ws, current_date, lit, md5

source_df = spark.table("source_table")
target_df = spark.table("target_table")

source_with_hash_df = source_df.withColumn("hash_value", md5(concat_ws("|", col("name"), col("address"),  col("email"), col("phone"))))

target_df.alias("target").merge(
source_with_hash_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(
condition="target.hash_value != source.hash_value",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
).whenNotMatchedInsert(
values={
"customer_id": col("source.customer_id"),
"name": col("source.name"),
"address": col("source.address"),
"email": col("source.email"),
"phone": col("source.phone"),
"valid_from": current_date(),
"valid_to": lit("9999-12-31"),
"is_current": lit(True),
"hash_value": col("source.hash_value")
}
).whenNotMatchedBySourceUpdate(
condition="target.is_current = true",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
)

You have to add an action to execute.

 

 

JissMathew
New Contributor III

Hi @Akshay_Petkar , please refer this code ,

df = spark.read.format("delta").load(f"{bronze_folder_path}/Test_new")
 
Table Structure 
%sql
CREATE TABLE IF NOT EXISTS test_project_ws.demo.Test_merge (
ID INT,
 Name STRING ,
Address STRING,
date DATE,
 createdDate TIMESTAMP,
 updatedDate TIMESTAMP
 )
 USING DELTA
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

table_name = "test_project_ws.demo.Test_merge"
deltaTable = DeltaTable.forName(spark, table_name)

deltaTable.alias("tgt").merge(
    bronze_df.alias("upd"),
    "tgt.Id = upd.Id"
).whenMatchedUpdate(
    set={
        "Id": "upd.Id",
        "Name": "upd.Name",
        "Address": "upd.Address",
        "date": "upd.date",
        "updatedDate": "current_timestamp()"
    }
).whenNotMatchedInsert(
    values={
        "Id": "upd.Id",
        "Name": "upd.Name",
        "Address": "upd.Address",
        "date": "upd.date",
        "createdDate": "current_timestamp()"
    }
).execute()

bhanu_gautam
Contributor

@JissMathew  and @David_Torrejon , Thanks for sharing the example

Regards
Bhanu Gautam

Kudos are appreciated

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group