Implement SCD Type 2 in Bronze Layer of DLT Pipeline with Structured Streaming
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-12-2024 04:37 AM
Hi everyone,
I am implementing SCD Type 2 in the Bronze layer of a Delta Live Table (DLT) pipeline using Structured Streaming. I am curious about the necessity of having a table or view before loading data into the Bronze table. Without this, it seems like change tracking (e.g., Change Data Feed) doesn't start until the Silver table.
Here’s an example of code where the Change Data Feed is enabled, but it begins only at the Silver table (the next layer):
@Dlt.table(
comment="New customer data ingested from cloud landing source",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Customer_Bronze():
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path) # Loaded from the raw storage location.
)
return df
What are the best practices for setting up the Bronze table in this case? Should I rely on staging tables or views for proper ingestion and change tracking? Any advice on optimizing SCD Type 2 with DLT and Structured Streaming would be appreciated!
Thank you!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-12-2024 05:16 AM
To implement Slowly Changing Dimensions (SCD) Type 2 in the Bronze layer of a Delta Live Table (DLT) pipeline using Structured Streaming, it is essential to understand the best practices and optimization techniques. Here are some key points to consider:
- Change Data Feed (CDF) Activation:
- Change Data Feed (CDF) can be enabled at the table level to track changes. However, it is important to note that CDF starts tracking changes only after the table is created and data is loaded into it. This means that if you enable CDF at the Silver table, change tracking will begin from that point onward.1
- Staging Tables or Views:
- Using staging tables or views before loading data into the Bronze table can be beneficial. This approach allows you to preprocess and clean the data, ensuring that only the necessary changes are captured and propagated through the pipeline.
- Staging tables can help in managing schema evolution and handling data quality issues before the data is ingested into the Bronze table.
- APPLY CHANGES API:
- The APPLY CHANGES API simplifies change data capture (CDC) with Delta Live Tables. It supports both SCD Type 1 and Type 2 updates.
- For SCD Type 2, the API retains a history of records by propagating sequencing values to the target table’s __START_AT and __END_AT columns. This ensures that historical data is preserved for each update.
- Creating Streaming Tables:
- To perform CDC processing, you first create a streaming table and then use the APPLY CHANGES statement in SQL or the apply_changes() function in Python to specify the source, keys, and sequencing for the change feed.
- Example for creating a streaming table and applying changes:
import dlt
from pyspark.sql.functions import col, expr
@Dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target="target",
source="users",
keys=["userId"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "sequenceNum"],
stored_as_scd_type=2
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-12-2024 05:17 AM
Optimizing SCD Type 2:
- Ensure that the column used for sequencing is a sortable data type.
- Handle out-of-sequence records by specifying a column in the source data that represents the proper ordering of the source data.1
- Use the track_history_except_column_list option to exclude certain columns from history tracking if needed.
Example for SCD Type 2 with History Tracking:
dlt.apply_changes(
target="target",
source="users",
keys=["userId"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "sequenceNum"],
stored_as_scd_type=2,
track_history_except_column_list=["city"]
)
By following these best practices and utilizing the APPLY CHANGES API, you can effectively implement and optimize SCD Type 2 in your Delta Live Tables pipeline with Structured Streaming. For more detailed information, you can refer to the Delta Live Tables documentation on CDC.

