To implement Slowly Changing Dimensions (SCD) Type 2 in the Bronze layer of a Delta Live Table (DLT) pipeline using Structured Streaming, it is essential to understand the best practices and optimization techniques. Here are some key points to consider:
- Change Data Feed (CDF) Activation:
- Change Data Feed (CDF) can be enabled at the table level to track changes. However, it is important to note that CDF starts tracking changes only after the table is created and data is loaded into it. This means that if you enable CDF at the Silver table, change tracking will begin from that point onward.1
- Staging Tables or Views:
- Using staging tables or views before loading data into the Bronze table can be beneficial. This approach allows you to preprocess and clean the data, ensuring that only the necessary changes are captured and propagated through the pipeline.
- Staging tables can help in managing schema evolution and handling data quality issues before the data is ingested into the Bronze table.
- APPLY CHANGES API:
- The APPLY CHANGES API simplifies change data capture (CDC) with Delta Live Tables. It supports both SCD Type 1 and Type 2 updates.
- For SCD Type 2, the API retains a history of records by propagating sequencing values to the target table’s __START_AT and __END_AT columns. This ensures that historical data is preserved for each update.
- Creating Streaming Tables:
- To perform CDC processing, you first create a streaming table and then use the APPLY CHANGES statement in SQL or the apply_changes() function in Python to specify the source, keys, and sequencing for the change feed.
- Example for creating a streaming table and applying changes:
import dlt
from pyspark.sql.functions import col, expr
@Dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target="target",
source="users",
keys=["userId"],
sequence_by=col("sequenceNum"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "sequenceNum"],
stored_as_scd_type=2
)