In Part 1 of this blog series, we explored the various types of duplicates, considerations for remediation, and the impacts of unchecked duplicated records on strategic decision-making.
If you want to learn more about this, please read: How to implement Slowly Changing Dimensions when you have duplicates - Part 1: What to look out for?
In this blog, we will focus on leveraging Delta Live Tables pipelines as a robust solution for handling duplicates and building an efficient data pipeline to maintain your Slowly Changing Dimensions.
Your data pipeline implements the Medallion Architecture which loads source data into a Bronze table, applies data quality rules and expectations in the Silver table, and maintains the slowly changing dimensions in a Gold table.
Figure 1 - High level solution architecture diagram of the sample data pipeline
The data pipeline begins with the incremental loading of source data with Databricks Auto Loader into a Bronze table. The Bronze table acts as the initial landing zone for incoming streaming data, where records are inserted into in an append-only fashion.
To aid with the identification of all kinds of duplicates, we will append a new column called “data_hash” which we can specify to be calculated on all columns or a select subset. Records with the same data_hash value will indicate that they are duplicate records.
If opting for a select subset of records - such as omitting technical columns to identify same business meaning duplicates - ensure you have determined and agreed this with your business stakeholders.
import dlt
from pyspark.sql.functions import concat_ws, md5, col, current_timestamp
@dlt.table
def bronze_customers():
return (
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("header", "true") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.load("cloud-storage-source-data-directory") \
.withColumn("data_hash", \
md5(concat_ws('-', col("cust_id"), \
col("cust_name"), \
col("cust_location")))) \
.withColumn("file_name", col("_metadata.file_name")) \
.withColumn("insert_timestamp", current_timestamp())
)
The Bronze table will store all data received from the source including all duplicates received. These duplicates need to be effectively processed and resolved in subsequent stages of the pipeline.
Note! |
The next step in the pipeline involves further data cleaning of records as incrementally received from the Bronze Table.
Here, we apply business data quality (DQ) rules and perform data cleaning operations in the silver table with Delta Live Tables Expectations. Expectations provides you with a simple but powerful DQ framework that will enhance the overall reliability and trustworthiness of your data pipeline.
@dlt.table(name="silver_customers")
@dlt.expect_or_drop("valid_cust_id", "cust_id > 0")
def silver_customers():
return (
dlt.read_stream("bronze_customers")
.select("*")
)
To read more about DLT Expectations - please refer to the following documentation.
The final stage of the data pipeline focuses on maintaining slowly changing dimensions in the Gold table which serves as the trusted source for historical analysis and decision-making. Here, we will remove the duplicates in 2 steps: first the intra-batch duplicates in a view, followed by the inter-batch duplicates.
In the below code snippet, we are leveraging the native dropDuplicates() function and are passing in the previously calculated data_hash column to handle the intra-batch duplicates.
This will drop any records with the same value in the data_hash column. As hashes are deterministic, we can safely remove intra-batch duplicate records including those we identified as having the same business meaning. However, we are still to handle inter-batch duplicates.
@dlt.view
def gold_customers_intra_dedup():
return (
dlt.read_stream("silver_customers")
.select("*")
.dropDuplicates(["data_hash"])
)
As we have defined this as a DLT View, data will not be persisted.
To maintain your SCD Type 1 and Type 2 dimensions, Delta Live Tables provides the APPLY CHANGES INTO syntax which streamlines the process of synchronizing data and maintaining the integrity of the SCD tables. With minimal code, it can efficiently handle any INSERT, UPDATE and/or DELETE operations required when merging changes from the source table. This provides a seamless and scalable solution for managing evolving data in real-time.
dlt.create_streaming_table("gold_customers")
dlt.apply_changes(
target = "gold_customers",
source = "gold_customers_intra_dedup",
keys = ["cust_id"],
sequence_by = col("cust_last_updt_ts"),
stored_as_scd_type = "2",
track_history_column_list = ["data_hash"]
)
Let’s walk-through what’s happening in the above code-snippet:
As discussed in Part 1 of the blog series, there is no functional need to specifically handle inter-batch duplicates for SCD Type 1 tables as they do not maintain historical versions or track changes over time. However, inter-batch duplicates do need to be handled for SCD Type 2 tables in order to maintain data accuracy or integrity.
To mitigate this, the track_history_column_list specified on the data_hash column tells DLT to only generate a changed, upserted record when there is a different value between source and target. If the values are the same, then just rather hard-update the record (similar to an SCD Type 1).
To use the track_history_column_list in Delta Live Tables, you must explicitly enable the feature in your pipeline by adding the following configuration to your Delta Live Tables pipeline settings.
This can be done by adding in the below configuration in Advanced section of the Pipeline Settings in the UI.
You can also modify the DLT JSON configuration:
{
"configuration": {
"pipelines.enableTrackHistory": "true"
}
}
And that is it!
You can now save and run your DLT pipeline which will look a little like this!
In this section, we will analyze the outcomes of executing the Delta Live Tables (DLT) pipeline against each test case mentioned earlier, specifically focusing on the SCD Type 2 tables.
Our first test will perform the initial load of our Gold SCD Type 2 table with the first batch of data containing an exact, intra-batch duplicate for John Doe.
cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,United States,2023-03-02 12:31:53
2,John Doe,Australia,2023-03-02 12:31:53
2,John Doe,Australia,2023-03-02 12:31:53
After successful execution of the above code, we can see that DLT has only loaded the correct two records.
Our second batch builds on after the successful completion of the first test but this time we will be loading the below batch of data containing an inter-batch, exact duplicate for Jane Smith; and a new record for Angela.
cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,United States,2023-03-02 12:31:53
3,Angela Hamilton,England,2023-03-02 12:31:59
After successful execution of the above code, we can see that DLT has appropriately handled the duplicate record for Jane.
Our next batch continues this time with intra- and inter-batch duplicates where we have received records that are not exact duplicates as the cust_last_updt_ts column holds different values.
cust_id,cust_name,cust_location,cust_last_updt_ts
3,Angela Hamilton,England,2023-03-02 13:32:00
4,Sachin Singh,India,2023-03-02 13:31:59
4,Sachin Singh,India,2023-03-02 13:32:00
After agreeing with the business that a change only to technical columns, such as cust_last_updt_ts, our DLT pipeline does not trigger a new ‘change’ record in our SCD Type 2 Gold Table for Angela and only ingests the additional single record for Sachin.
Our final batch does not consist of duplicates but rather intra- and inter-batch records that have been received out of order. Not managing these edge-cases correctly could lead to severe issues in data quality that could significantly impact decision making.
In the example below, we receive a previous record for Jane Smith location prior to moving to the United States and two records for Sachin - a move to Indonesia before his move to India, and a move after to Australia.
cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,England,2022-01-01 00:00:00
4,Sachin Singh,Indonesia,2022-01-01 00:00:00
4,Sachin Singh,Australia,2023-06-01 00:00:00
As you can see from the result set below, DLT was able to remediate and stitch these records correctly into the Gold Table!
In conclusion, leveraging Delta Live Tables enhances the robustness of your data pipelines and simplifies the management of duplicate records when implementing your slowly changing dimensions. With its provided APPLY CHANGES INTO syntax, you can effectively handle intra-batch, inter-batch, and same business meaning duplicates as well as out-of-order records that could further hinder data quality. With clean and accurate dimensions, organizations can unlock the full potential of their data to enable accurate historical analysis, data-driven decision-making, and better foster trust in the data.
In Part 3 of this blog series, we will walk through a practical implementation of a Spark Structured Streaming data pipeline exposed to the same challenges!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.