cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline from Streaming Table

km1837
New Contributor

Hi

I have a bronze table with Product_id, *, start_at, end_At which is a streaming and SCD Type 2 Table, which means any change in product_attributes would insert a new row with end_at as null. So when we take this table with end_at as null , the table would become a full product table without any duplicates.

I want to do incremental loading for this table, new records are added or existing records are changed. What kind of solution do I need to take here. I know I should use only a streaming table with apply_changes, below is my code but it does not work

SOURCE_FULL is my bronze_table

import dlt
from pyspark.sql.functions import col

----- Define the source streaming table

@dlt.table(
    name="current_product_records",
    comment="Current records only from product_history (SCD2 table)"
)
def current_product_records():
    return (
        spark.readStream.table(SOURCE_FULL)
        .filter(col("__END_AT").isNull())
------- The above code will read the table from bronze and will take only values with null in __END_AT column###
-------- I need to create a streaming table with Apply_changes. I tried below but not working.###
def current_product():
    dlt.create_streaming_table( name = "current_product",
    table_properties={"delta.enableChangeDataFeed": "false"}
)
    return dlt.apply_changes(
        target="current_product",
        source="current_product_records",
        keys=["product_key"],
        sequence_by=col("db_rowload_ts_est"),
        stored_as_scd_type="1",
        except_column_list=["__START_AT", "__END_AT"]
)
This is giving error "Cannot have multiple queries named catalog.schema.current_product" for catalog.schema.current_product" Additional queries on that table must be named. Note that unnamed queries default to the same name as the table.
 
1 REPLY 1

ilir_nuredini
Honored Contributor

Hi @km1837 ,

Instead of trying to implement a stream table on a stream table, for your use case I think using Materialized View on next child table would be the best choice.
For e.g.:

 

@dlt.table(name="workspace.silver.current_product")
def sample_trips_stream():
    return dlt.read(SOURCE_FULL).filter("__end_at IS NULL")

 

Materialized views are refreshed using one of two methods.

  • Incremental refresh - The system evaluates the view's query to identify changes that happened after the last update and merges only the new or modified data.
  • Full refresh - If an incremental refresh can't be performed, the system runs the entire query and replaces the existing data in the materialized view with the new results.




I have also tried to replicate your solution by defining a SCD type 2 in bronze and SCD type 1 in silver, it doesn't work, it throws an error regarding the SCD. So not sure if that would be possible to implemented it that way in DTL.


Hope that helps. Let me know how it goes!

Best, Ilir
 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now