Hi
I have a bronze table with Product_id, *, start_at, end_At which is a streaming and SCD Type 2 Table, which means any change in product_attributes would insert a new row with end_at as null. So when we take this table with end_at as null , the table would become a full product table without any duplicates.
I want to do incremental loading for this table, new records are added or existing records are changed. What kind of solution do I need to take here. I know I should use only a streaming table with apply_changes, below is my code but it does not work
SOURCE_FULL is my bronze_table
import dlt
from pyspark.sql.functions import col
----- Define the source streaming table
@dlt.table(
name="current_product_records",
comment="Current records only from product_history (SCD2 table)"
)
def current_product_records():
return (
spark.readStream.table(SOURCE_FULL)
.filter(col("__END_AT").isNull())
------- The above code will read the table from bronze and will take only values with null in __END_AT column###
-------- I need to create a streaming table with Apply_changes. I tried below but not working.###
def current_product():
dlt.create_streaming_table( name = "current_product",
table_properties={"delta.enableChangeDataFeed": "false"}
)
return dlt.apply_changes(
target="current_product",
source="current_product_records",
keys=["product_key"],
sequence_by=col("db_rowload_ts_est"),
stored_as_scd_type="1",
except_column_list=["__START_AT", "__END_AT"]
)
This is giving error "Cannot have multiple queries named catalog.schema.current_product" for catalog.schema.current_product" Additional queries on that table must be named. Note that unnamed queries default to the same name as the table.