raphaelblg
Databricks Employee
Databricks Employee

Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp column for the latest row-level changes:

 
import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​

It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post