- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-02-2024 05:42 AM
I have want to use the apply_changes feature from a bronze table to a silver table.The bronze table have no "natural" sequence_by column. Therefore, I want to use the CDF column "_commit_timestamp" as the sequence_by.
How do I retrieve the columns in a dlt setup?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-03-2024 11:54 AM
Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp
column for the latest row-level changes:
import dlt
from pyspark.sql.functions import col, expr
source = f"catalog.schema.cdf_enabled_table"
bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"
@dlt.view(
name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
return spark.readStream.option("readChangeFeed", "true").table(source)
dlt.create_streaming_table(name=silver)
dlt.apply_changes(
target = silver,
source = bronze,
keys = ["id"],
sequence_by = col("_commit_timestamp"),
stored_as_scd_type = 1
)
It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.
Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2024 02:48 AM
Thank you @raphaelblg!
I chose to write an article on the subject after this discussion: https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-03-2024 11:54 AM
Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp
column for the latest row-level changes:
import dlt
from pyspark.sql.functions import col, expr
source = f"catalog.schema.cdf_enabled_table"
bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"
@dlt.view(
name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
return spark.readStream.option("readChangeFeed", "true").table(source)
dlt.create_streaming_table(name=silver)
dlt.apply_changes(
target = silver,
source = bronze,
keys = ["id"],
sequence_by = col("_commit_timestamp"),
stored_as_scd_type = 1
)
It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.
Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-03-2024 12:47 PM
Delta Live Tables Python language reference: https://docs.databricks.com/en/delta-live-tables/python-ref.html
Delta Live Tables SQL language reference: https://docs.databricks.com/en/delta-live-tables/sql-ref.html
Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2024 02:48 AM
Thank you @raphaelblg!
I chose to write an article on the subject after this discussion: https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=...

