Hi, I'm facing this scenario in my DLT pipeline where in my silver layer I'm doing some filtering to prevent my test data to go to silver schema and then in the end i'm using apply_changes to create the tables and I'm using sequence_by clause within that to keep the recently updated version of each record.
Now, the issue is where in the recently updated version of a record, its test flag becomes true and hence it becomes a test record. Then with the logic, it gets filtered out by my code and by the time data reaches my sequence_by clause within apply_changes, the recently updated entry is already filtered out and since DLT does not support deletes so the prev version of record is considered the recent one and hence is getting forward to silver schema. Where, in reality, that version of record is outdated now and we do not need that in silver schema.
in short, the outdated records are moving forward to silver schema because of filtering scenario.
Please suggest me what is the best approach to handle this situation?
This is my silver layer's code structure:
@Dlt.view(name = bronze_dlt_view
//code to fetch tables from bronze and apply filtering
dlt.create_streaming_table(
name = silver_table,
table_properties= table_props
comment = "Silver table with MERGE into logic from bronze"
)
dlt.apply_changes(
target = silver_table,
source = bronze_dlt_view,
keys = primary_keys,
sequence_by = col(sequence_col),
stored_as_scd_type = 1,
)