11-15-2024 02:58 AM
Can anyone provide a sample MERGE INTO SQL query for implementing SCD Type 2 in Databricks using Delta Tables?
11-15-2024 03:42 AM
11-26-2024 12:36 AM
Is there any limitation to the length of the string passed to md5 function when concatenating multiple columns to generate hash_value field ?
11-15-2024 03:57 AM
also, in PySpark, the same example in pyspark:
from pyspark.sql.functions import col, concat_ws, current_date, lit, md5
source_df = spark.table("source_table")
target_df = spark.table("target_table")
source_with_hash_df = source_df.withColumn("hash_value", md5(concat_ws("|", col("name"), col("address"), col("email"), col("phone"))))
target_df.alias("target").merge(
source_with_hash_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(
condition="target.hash_value != source.hash_value",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
).whenNotMatchedInsert(
values={
"customer_id": col("source.customer_id"),
"name": col("source.name"),
"address": col("source.address"),
"email": col("source.email"),
"phone": col("source.phone"),
"valid_from": current_date(),
"valid_to": lit("9999-12-31"),
"is_current": lit(True),
"hash_value": col("source.hash_value")
}
).whenNotMatchedBySourceUpdate(
condition="target.is_current = true",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
)
You have to add an action to execute.
01-06-2025 03:21 AM
Doesn't you example code perform SCD Type 1 rather than Type 2?
whenMatchedUpdate() updates an existing record.
whenNotMatchedInsert() inserts new records.
whenNotMatchedBySourceUpdate() updates records not available in the source
In SCD Type 2, when an old record is updated, a corresponding new row needs to be inserted with is_current as 'true'. Where is this happening?
01-07-2025 02:16 AM
Yep, I was thinking the same. The only way I know is to have a seperated INSERT INTO command before the MERGE INTO.
INSERT INTO target_table (
columns,
effectiveStartDate,
effectiveEndDate,
isCurrent,
version
)
SELECT
new.columns,
DATE(new.timestamp),
DATE('9999-12-31'),
TRUE,
target.version + 1
FROM df as new
LEFT JOIN destination_table as target
ON new.customerId = target.customerId and target.isCurrent
WHERE (
target.column <> new.column
OR target.column <> new.column
)
11-25-2024 10:16 PM
Hi @Akshay_Petkar , please refer this code ,
11-28-2024 02:51 AM
@JissMathew and @David_Torrejon , Thanks for sharing the example
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now