cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
cancel
Showing results for 
Search instead for 
Did you mean: 
Avnish_Jain
Moderator
Moderator

Introduction

In Part 1 of this blog series, we explored the various types of duplicates, considerations for remediation, and the impacts of unchecked duplicated records on strategic decision-making. 

If you want to learn more about this, please read: How to implement Slowly Changing Dimensions when you have duplicates - Part 1: What to look out for?

In this blog, we will focus on leveraging Delta Live Tables pipelines as a robust solution for handling duplicates and building an efficient data pipeline to maintain your Slowly Changing Dimensions.

Table of Contents

About your data pipeline

Your data pipeline implements the Medallion Architecture which loads source data into a Bronze table, applies data quality rules and expectations in the Silver table, and maintains the slowly changing dimensions in a Gold table.

Avnish_Jain_0-1694079325009.pngFigure 1 - High level solution architecture diagram of the sample data pipeline

Loading your Bronze table with Databricks Autoloader

The data pipeline begins with the incremental loading of source data with Databricks Auto Loader into a Bronze table. The Bronze table acts as the initial landing zone for incoming streaming data, where records are inserted into in an append-only fashion. 

To aid with the identification of all kinds of duplicates, we will append a new column called data_hash which we can specify to be calculated on all columns or a select subset. Records with the same data_hash value will indicate that they are duplicate records. 

If opting for a select subset of records - such as omitting technical columns to identify same business meaning duplicates - ensure you have determined and agreed this with your business stakeholders.

 

 

import dlt
from pyspark.sql.functions import concat_ws, md5, col, current_timestamp

@dlt.table
def bronze_customers():
  return (
    spark.readStream.format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .option("header", "true") \
      .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
      .load("cloud-storage-source-data-directory") \
      .withColumn("data_hash", \
        md5(concat_ws('-', col("cust_id"), \ 
        col("cust_name"), \
        col("cust_location")))) \
      .withColumn("file_name", col("_metadata.file_name")) \
      .withColumn("insert_timestamp", current_timestamp())
)

 

 

The Bronze table will store all data received from the source including all duplicates received. These duplicates need to be effectively processed and resolved in subsequent stages of the pipeline.

Note!

Ensure your chosen hash function is deterministic (i.e. given the same input, the same exact output value will always be generated).


Cleanse from other data quality issues in the Silver Layer

The next step in the pipeline involves further data cleaning of records as incrementally received from the Bronze Table. 

Here, we apply business data quality (DQ) rules and perform data cleaning operations in the silver table with Delta Live Tables Expectations. Expectations provides you with a simple but powerful DQ framework that will enhance the overall reliability and trustworthiness of your data pipeline.

 

 

@dlt.table(name="silver_customers")
@dlt.expect_or_drop("valid_cust_id", "cust_id > 0")
def silver_customers():
  return (
    dlt.read_stream("bronze_customers")
      .select("*")
  )

 

 

To read more about DLT Expectations - please refer to the following documentation.

Maintaining your slowly changing dimensions in your Gold Layer

The final stage of the data pipeline focuses on maintaining slowly changing dimensions in the Gold table which serves as the trusted source for historical analysis and decision-making. Here, we will remove the duplicates in 2 steps: first the intra-batch duplicates in a view, followed by the inter-batch duplicates. 

Dropping Intra-batch duplicates

In the below code snippet, we are leveraging the native dropDuplicates() function and are passing in the previously calculated data_hash column to handle the intra-batch duplicates.

This will drop any records with the same value in the data_hash column. As hashes are deterministic, we can safely remove intra-batch duplicate records including those we identified as having the same business meaning. However, we are still to handle inter-batch duplicates. 

 

 

@dlt.view
def gold_customers_intra_dedup():
  return (
    dlt.read_stream("silver_customers")
      .select("*")
      .dropDuplicates(["data_hash"])
  )

 

 

As we have defined this as a DLT View, data will not be persisted.

Handling Inter-batch duplicates

To maintain your SCD Type 1 and Type 2 dimensions, Delta Live Tables provides the APPLY CHANGES INTO syntax which streamlines the process of synchronizing data and maintaining the integrity of the SCD tables. With minimal code, it can efficiently handle any INSERT, UPDATE and/or DELETE operations required when merging changes from the source table. This provides a seamless and scalable solution for managing evolving data in real-time.

 

 

dlt.create_streaming_table("gold_customers")

dlt.apply_changes(
  target = "gold_customers",
  source = "gold_customers_intra_dedup",
  keys = ["cust_id"],
  sequence_by = col("cust_last_updt_ts"),
  stored_as_scd_type = "2",
  track_history_column_list = ["data_hash"]
)

 

 

Let’s walk-through what’s happening in the above code-snippet:

  • target: the name of our Gold slowly changing dimension table.
  • source: the name of our Silver cleansed table.
  • keys: the column(s) used for joining to identify matching (and unmatching) rows between the target and source tables. This will be the primary key(s) of the Gold/Silver table.
  • sequence_by: the column that determines the logical order of records. DLT also uses this sequencing to handle change events that arrive out of order. Timestamps are commonly used such as the insert_timestamp (of when the record was ingested by your platform), or a business timestamp as received in the source data.
  • stored_as_scd_type: Either “1” or “2” to determine which SCD Type the target table will be maintained as.
  • track_history_column_list: For SCD Type 2 tables, we can specify the column(s) that are used to track for history and trigger a changed record. This is the parameter that will help us handle any inter-batch duplicates. 

As discussed in Part 1 of the blog series, there is no functional need to specifically handle inter-batch duplicates for SCD Type 1 tables as they do not maintain historical versions or track changes over time. However, inter-batch duplicates do need to be handled for SCD Type 2 tables in order to maintain data accuracy or integrity. 

To mitigate this, the track_history_column_list specified on the data_hash column tells DLT to only generate a changed, upserted record when there is a different value between source and target. If the values are the same, then just rather hard-update the record (similar to an SCD Type 1).

Configuring your Delta Live Tables pipeline

To use the track_history_column_list in Delta Live Tables, you must explicitly enable the feature in your pipeline by adding the following configuration to your Delta Live Tables pipeline settings. 

This can be done by adding in the below configuration in Advanced section of the Pipeline Settings in the UI. 

Avnish_Jain_0-1694081511796.png


You can also modify the DLT JSON configuration:

{
  "configuration": {
    "pipelines.enableTrackHistory": "true"
  }
}

And that is it! 

You can now save and run your DLT pipeline which will look a little like this!

Avnish_Jain_0-1694081643022.png

Let’s run some tests!

In this section, we will analyze the outcomes of executing the Delta Live Tables (DLT) pipeline against each test case mentioned earlier, specifically focusing on the SCD Type 2 tables.


Test #1 - Initial Load with Intra-Batch Exact Duplicates

Our first test will perform the initial load of our Gold SCD Type 2 table with the first batch of data containing an exact, intra-batch duplicate for John Doe.

cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,United States,2023-03-02 12:31:53
2,John Doe,Australia,2023-03-02 12:31:53
2,John Doe,Australia,2023-03-02 12:31:53

After successful execution of the above code, we can see that DLT has only loaded the correct two records.

Avnish_Jain_0-1694081886071.png

 

 

 


Test #2 - Next batch with Inter-Batch, Exact Duplicates

Our second batch builds on after the successful completion of the first test but this time we will be loading the below batch of data containing an inter-batch, exact duplicate for Jane Smith; and a new record for Angela.

cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,United States,2023-03-02 12:31:53
3,Angela Hamilton,England,2023-03-02 12:31:59

After successful execution of the above code, we can see that DLT has appropriately handled the duplicate record for Jane.

Avnish_Jain_1-1694082079933.png


Test #3 - Load with Intra- & Inter- Batch Same Business Meaning Duplicates

Our next batch continues this time with intra- and inter-batch duplicates where we have received records that are not exact duplicates as the cust_last_updt_ts column holds different values.

cust_id,cust_name,cust_location,cust_last_updt_ts
3,Angela Hamilton,England,2023-03-02 13:32:00
4,Sachin Singh,India,2023-03-02 13:31:59
4,Sachin Singh,India,2023-03-02 13:32:00

After agreeing with the business that a change only to technical columns, such as cust_last_updt_ts, our DLT pipeline does not trigger a new ‘change’ record in our SCD Type 2 Gold Table for Angela and only ingests the additional single record for Sachin.

Avnish_Jain_0-1694082297092.png


Test #4 - Load with Intra- & Inter- Batch Out of Order Records

Our final batch does not consist of duplicates but rather intra- and inter-batch records that have been received out of order. Not managing these edge-cases correctly could lead to severe issues in data quality that could significantly impact decision making.

In the example below, we receive a previous record for Jane Smith location prior to moving to the United States and two records for Sachin - a move to Indonesia before his move to India, and a move after to Australia.

cust_id,cust_name,cust_location,cust_last_updt_ts
1,Jane Smith,England,2022-01-01 00:00:00
4,Sachin Singh,Indonesia,2022-01-01 00:00:00
4,Sachin Singh,Australia,2023-06-01 00:00:00

As you can see from the result set below, DLT was able to remediate and stitch these records correctly into the Gold Table!

Avnish_Jain_1-1694082386133.png


Conclusion

In conclusion, leveraging Delta Live Tables enhances the robustness of your data pipelines and simplifies the management of duplicate records when implementing your slowly changing dimensions. With its provided APPLY CHANGES INTO syntax, you can effectively handle intra-batch, inter-batch, and same business meaning duplicates as well as out-of-order records that could further hinder data quality. With clean and accurate dimensions, organizations can unlock the full potential of their data to enable accurate historical analysis, data-driven decision-making, and better foster trust in the data.

Coming up next!

In Part 3 of this blog series, we will walk through a practical implementation of a Spark Structured Streaming data pipeline exposed to the same challenges!

2 Comments
sandeep_gunda
New Contributor

Hey @Avnish_Jain, we're implementing a very similar inter and intra-batch deduplication process albeit with SCD type-1. However,  we are afraid the drop_duplicates() (in your case dropDuplicates(["data_hash"])) might be looking at the whole stream of data as it could be stateful. How do we ensure, the drop_duplicates is only looking at the data inside the microbatch?

rantav
New Contributor

Thank you for this, when is Part 3 which includes Spark Structured Streaming due?