cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta live table: Retrieve CDF columns

LauJohansson
Contributor

I have want to use the apply_changes feature from a bronze table to a silver table.The bronze table have no "natural" sequence_by column. Therefore, I want to use the CDF column "_commit_timestamp" as the sequence_by.

How do I retrieve the columns in a dlt setup? 


2 ACCEPTED SOLUTIONS

Accepted Solutions

raphaelblg
Databricks Employee
Databricks Employee

Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp column for the latest row-level changes:

 
import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​

It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

LauJohansson
Contributor
3 REPLIES 3

raphaelblg
Databricks Employee
Databricks Employee

Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp column for the latest row-level changes:

 
import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​

It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Delta Live Tables Python language reference: https://docs.databricks.com/en/delta-live-tables/python-ref.html

Delta Live Tables SQL language reference: https://docs.databricks.com/en/delta-live-tables/sql-ref.html

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

LauJohansson
Contributor

Thank you @raphaelblg!

I chose to write an article on the subject after this discussion: https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group