cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Implement SCD Type 2 in Bronze Layer of DLT Pipeline with Structured Streaming

mkEngineer
New Contributor

Hi everyone,

I am implementing SCD Type 2 in the Bronze layer of a Delta Live Table (DLT) pipeline using Structured Streaming. I am curious about the necessity of having a table or view before loading data into the Bronze table. Without this, it seems like change tracking (e.g., Change Data Feed) doesn't start until the Silver table.

Here’s an example of code where the Change Data Feed is enabled, but it begins only at the Silver table (the next layer):

@Dlt.table(
comment="New customer data ingested from cloud landing source",
table_properties={
"delta.enableChangeDataFeed": "true", 
"quality": "bronze"
}
)
def Customer_Bronze():
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path) # Loaded from the raw storage location.
)
return df

What are the best practices for setting up the Bronze table in this case? Should I rely on staging tables or views for proper ingestion and change tracking? Any advice on optimizing SCD Type 2 with DLT and Structured Streaming would be appreciated!

Thank you!

2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

To implement Slowly Changing Dimensions (SCD) Type 2 in the Bronze layer of a Delta Live Table (DLT) pipeline using Structured Streaming, it is essential to understand the best practices and optimization techniques. Here are some key points to consider:

 

  1. Change Data Feed (CDF) Activation:
    • Change Data Feed (CDF) can be enabled at the table level to track changes. However, it is important to note that CDF starts tracking changes only after the table is created and data is loaded into it. This means that if you enable CDF at the Silver table, change tracking will begin from that point onward.1
  2. Staging Tables or Views:
    • Using staging tables or views before loading data into the Bronze table can be beneficial. This approach allows you to preprocess and clean the data, ensuring that only the necessary changes are captured and propagated through the pipeline.
    • Staging tables can help in managing schema evolution and handling data quality issues before the data is ingested into the Bronze table.
  3. APPLY CHANGES API:
    • The APPLY CHANGES API simplifies change data capture (CDC) with Delta Live Tables. It supports both SCD Type 1 and Type 2 updates.
    • For SCD Type 2, the API retains a history of records by propagating sequencing values to the target table’s __START_AT and __END_AT columns. This ensures that historical data is preserved for each update.
  4. Creating Streaming Tables:
    • To perform CDC processing, you first create a streaming table and then use the APPLY CHANGES statement in SQL or the apply_changes() function in Python to specify the source, keys, and sequencing for the change feed.
    • Example for creating a streaming table and applying changes:

 

import dlt

from pyspark.sql.functions import col, expr

 

@Dlt.view

def users():

    return spark.readStream.table("cdc_data.users")

 

dlt.create_streaming_table("target")

dlt.apply_changes(

    target="target",

    source="users",

    keys=["userId"],

    sequence_by=col("sequenceNum"),

    apply_as_deletes=expr("operation = 'DELETE'"),

    except_column_list=["operation", "sequenceNum"],

    stored_as_scd_type=2

)

Alberto_Umana
Databricks Employee
Databricks Employee

Optimizing SCD Type 2:

  • Ensure that the column used for sequencing is a sortable data type.
  • Handle out-of-sequence records by specifying a column in the source data that represents the proper ordering of the source data.1
  • Use the track_history_except_column_list option to exclude certain columns from history tracking if needed.

Example for SCD Type 2 with History Tracking:

dlt.apply_changes(

    target="target",

    source="users",

    keys=["userId"],

    sequence_by=col("sequenceNum"),

    apply_as_deletes=expr("operation = 'DELETE'"),

    except_column_list=["operation", "sequenceNum"],

    stored_as_scd_type=2,

    track_history_except_column_list=["city"]

)

By following these best practices and utilizing the APPLY CHANGES API, you can effectively implement and optimize SCD Type 2 in your Delta Live Tables pipeline with Structured Streaming. For more detailed information, you can refer to the Delta Live Tables documentation on CDC.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group