Delta live table: Retrieve CDF columns

LauJohansson
Databricks Partner

I have want to use the apply_changes feature from a bronze table to a silver table.The bronze table have no "natural" sequence_by column. Therefore, I want to use the CDF column "_commit_timestamp" as the sequence_by.

How do I retrieve the columns in a dlt setup? 


raphaelblg
Databricks Employee
Databricks Employee

Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp column for the latest row-level changes:

 
import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​

It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

Delta Live Tables Python language reference: https://docs.databricks.com/en/delta-live-tables/python-ref.html

Delta Live Tables SQL language reference: https://docs.databricks.com/en/delta-live-tables/sql-ref.html

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

LauJohansson
Databricks Partner