DLT Pipeline from Streaming Table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2025 06:43 AM
Hi
I have a bronze table with Product_id, *, start_at, end_At which is a streaming and SCD Type 2 Table, which means any change in product_attributes would insert a new row with end_at as null. So when we take this table with end_at as null , the table would become a full product table without any duplicates.
I want to do incremental loading for this table, new records are added or existing records are changed. What kind of solution do I need to take here. I know I should use only a streaming table with apply_changes, below is my code but it does not work
SOURCE_FULL is my bronze_table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-11-2025 03:32 AM - edited 09-11-2025 03:32 AM
Hi @km1837 ,
Instead of trying to implement a stream table on a stream table, for your use case I think using Materialized View on next child table would be the best choice.
For e.g.:
@dlt.table(name="workspace.silver.current_product")
def sample_trips_stream():
return dlt.read(SOURCE_FULL).filter("__end_at IS NULL")
Materialized views are refreshed using one of two methods.
- Incremental refresh - The system evaluates the view's query to identify changes that happened after the last update and merges only the new or modified data.
- Full refresh - If an incremental refresh can't be performed, the system runs the entire query and replaces the existing data in the materialized view with the new results.
I have also tried to replicate your solution by defining a SCD type 2 in bronze and SCD type 1 in silver, it doesn't work, it throws an error regarding the SCD. So not sure if that would be possible to implemented it that way in DTL.
Hope that helps. Let me know how it goes!
Best, Ilir