โ11-15-2024 02:58 AM
Can anyone provide a sample MERGE INTO SQL query for implementing SCD Type 2 in Databricks using Delta Tables?
โ11-15-2024 03:42 AM
โ11-26-2024 12:36 AM
Is there any limitation to the length of the string passed to md5 function when concatenating multiple columns to generate hash_value field ?
โ11-15-2024 03:57 AM
also, in PySpark, the same example in pyspark:
from pyspark.sql.functions import col, concat_ws, current_date, lit, md5
source_df = spark.table("source_table")
target_df = spark.table("target_table")
source_with_hash_df = source_df.withColumn("hash_value", md5(concat_ws("|", col("name"), col("address"), col("email"), col("phone"))))
target_df.alias("target").merge(
source_with_hash_df.alias("source"),
"target.customer_id = source.customer_id AND target.is_current = true"
).whenMatchedUpdate(
condition="target.hash_value != source.hash_value",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
).whenNotMatchedInsert(
values={
"customer_id": col("source.customer_id"),
"name": col("source.name"),
"address": col("source.address"),
"email": col("source.email"),
"phone": col("source.phone"),
"valid_from": current_date(),
"valid_to": lit("9999-12-31"),
"is_current": lit(True),
"hash_value": col("source.hash_value")
}
).whenNotMatchedBySourceUpdate(
condition="target.is_current = true",
set={
"valid_to": current_date() - 1,
"is_current": lit(False)
}
)
You have to add an action to execute.
2 weeks ago
Doesn't you example code perform SCD Type 1 rather than Type 2?
whenMatchedUpdate() updates an existing record.
whenNotMatchedInsert() inserts new records.
whenNotMatchedBySourceUpdate() updates records not available in the source
In SCD Type 2, when an old record is updated, a corresponding new row needs to be inserted with is_current as 'true'. Where is this happening?
a week ago
Yep, I was thinking the same. The only way I know is to have a seperated INSERT INTO command before the MERGE INTO.
INSERT INTO target_table (
columns,
effectiveStartDate,
effectiveEndDate,
isCurrent,
version
)
SELECT
new.columns,
DATE(new.timestamp),
DATE('9999-12-31'),
TRUE,
target.version + 1
FROM df as new
LEFT JOIN destination_table as target
ON new.customerId = target.customerId and target.isCurrent
WHERE (
target.column <> new.column
OR target.column <> new.column
)
โ11-25-2024 10:16 PM
Hi @Akshay_Petkar , please refer this code ,
โ11-28-2024 02:51 AM
@JissMathew and @David_Torrejon , Thanks for sharing the example
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group