Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp
column for the latest row-level changes:
import dlt
from pyspark.sql.functions import col, expr
source = f"catalog.schema.cdf_enabled_table"
bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"
@dlt.view(
name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
return spark.readStream.option("readChangeFeed", "true").table(source)
dlt.create_streaming_table(name=silver)
dlt.apply_changes(
target = silver,
source = bronze,
keys = ["id"],
sequence_by = col("_commit_timestamp"),
stored_as_scd_type = 1
)โ
It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.
Best regards,
Raphael Balogo
Sr. Technical Solutions Engineer
Databricks