โ11-15-2024 02:58 AM
Can anyone provide a sample MERGE INTO SQL query for implementing SCD Type 2 in Databricks using Delta Tables?
yesterday
-- Step 1: Close out records that changed (updates and deletes)
MERGE INTO west_division.retail_data.customers_type2 AS target
USING (
SELECT DISTINCT customer_id, _commit_timestamp
FROM table_changes('east_division_shared.retail.customers', 2, 5)
WHERE _change_type IN ('update_postimage', 'delete')
ORDER BY _commit_timestamp
) AS source
ON target.customer_id = source.customer_id AND target.is_current = true
WHEN MATCHED THEN
UPDATE SET
end_date = source._commit_timestamp,
is_current = false;
-- Step 2: Insert new versions (inserts and updates)
INSERT INTO west_division.retail_data.customers_type2
SELECT
customer_id, customer_name, email, country, signup_date, customer_segment,
_commit_timestamp as start_date,
NULL as end_date,
true as is_current
FROM table_changes('east_division_shared.retail.customers', 2, 5)
WHERE _change_type IN ('insert', 'update_postimage')
ORDER BY _commit_timestamp;
โ11-15-2024 03:42 AM
โ11-26-2024 12:36 AM
Is there any limitation to the length of the string passed to md5 function when concatenating multiple columns to generate hash_value field ?
โ11-15-2024 03:57 AM
also, in PySpark, the same example in pyspark:
from pyspark.sql.functions import col, concat_ws, current_date, lit, md5
source_df = spark.table("source_table")
target_df = spark.table("target_table")
source_with_hash_df = source_df.withColumn("hash_value", md5(concat_ws("|", col("name"), col("address"), col("email"), col("phone"))))
target_df.alias("target").merge(
source_with_hash_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(
condition="target.hash_value != source.hash_value",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
).whenNotMatchedInsert(
values={
"customer_id": col("source.customer_id"),
"name": col("source.name"),
"address": col("source.address"),
"email": col("source.email"),
"phone": col("source.phone"),
"valid_from": current_date(),
"valid_to": lit("9999-12-31"),
"is_current": lit(True),
"hash_value": col("source.hash_value")
}
).whenNotMatchedBySourceUpdate(
condition="target.is_current = true",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
)
You have to add an action to execute.
โ01-06-2025 03:21 AM
Doesn't you example code perform SCD Type 1 rather than Type 2?
whenMatchedUpdate() updates an existing record.
whenNotMatchedInsert() inserts new records.
whenNotMatchedBySourceUpdate() updates records not available in the source
In SCD Type 2, when an old record is updated, a corresponding new row needs to be inserted with is_current as 'true'. Where is this happening?
โ01-07-2025 02:16 AM
Yep, I was thinking the same. The only way I know is to have a seperated INSERT INTO command before the MERGE INTO.
INSERT INTO target_table (
columns,
effectiveStartDate,
effectiveEndDate,
isCurrent,
version
)
SELECT
new.columns,
DATE(new.timestamp),
DATE('9999-12-31'),
TRUE,
target.version + 1
FROM df as new
LEFT JOIN destination_table as target
ON new.customerId = target.customerId and target.isCurrent
WHERE (
target.column <> new.column
OR target.column <> new.column
)
Monday - last edited Monday
Same Concern where is this happening do we have any other example where its handling it correctly by maintaining history
โ11-25-2024 10:16 PM
Hi @Akshay_Petkar , please refer this code ,
โ11-28-2024 02:51 AM
@JissMathew and @David_Torrejon , Thanks for sharing the example
yesterday
-- Step 1: Close out records that changed (updates and deletes)
MERGE INTO west_division.retail_data.customers_type2 AS target
USING (
SELECT DISTINCT customer_id, _commit_timestamp
FROM table_changes('east_division_shared.retail.customers', 2, 5)
WHERE _change_type IN ('update_postimage', 'delete')
ORDER BY _commit_timestamp
) AS source
ON target.customer_id = source.customer_id AND target.is_current = true
WHEN MATCHED THEN
UPDATE SET
end_date = source._commit_timestamp,
is_current = false;
-- Step 2: Insert new versions (inserts and updates)
INSERT INTO west_division.retail_data.customers_type2
SELECT
customer_id, customer_name, email, country, signup_date, customer_segment,
_commit_timestamp as start_date,
NULL as end_date,
true as is_current
FROM table_changes('east_division_shared.retail.customers', 2, 5)
WHERE _change_type IN ('insert', 'update_postimage')
ORDER BY _commit_timestamp;
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now