โ09-02-2024 05:42 AM
I have want to use the apply_changes feature from a bronze table to a silver table.The bronze table have no "natural" sequence_by column. Therefore, I want to use the CDF column "_commit_timestamp" as the sequence_by.
How do I retrieve the columns in a dlt setup?
โ09-03-2024 11:54 AM
Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp
column for the latest row-level changes:
import dlt
from pyspark.sql.functions import col, expr
source = f"catalog.schema.cdf_enabled_table"
bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"
@dlt.view(
name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
return spark.readStream.option("readChangeFeed", "true").table(source)
dlt.create_streaming_table(name=silver)
dlt.apply_changes(
target = silver,
source = bronze,
keys = ["id"],
sequence_by = col("_commit_timestamp"),
stored_as_scd_type = 1
)โ
It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.
โ09-04-2024 02:48 AM
Thank you @raphaelblg!
I chose to write an article on the subject after this discussion: https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=...
โ09-03-2024 11:54 AM
Hi @LauJohansson , Here's an example source code for a DLT pipeline that reads from a bronze table in CDF and uses the apply_changes function to upsert to your silver table, sequencing the upsert rows in order by the _commit_timestamp
column for the latest row-level changes:
import dlt
from pyspark.sql.functions import col, expr
source = f"catalog.schema.cdf_enabled_table"
bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"
@dlt.view(
name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
return spark.readStream.option("readChangeFeed", "true").table(source)
dlt.create_streaming_table(name=silver)
dlt.apply_changes(
target = silver,
source = bronze,
keys = ["id"],
sequence_by = col("_commit_timestamp"),
stored_as_scd_type = 1
)โ
It seems that you're using this setup for data deduplication, it should work but please consider the APPLY_CHANGES limitations and CDF limitations while designing your pipeline.
โ09-03-2024 12:47 PM
Delta Live Tables Python language reference: https://docs.databricks.com/en/delta-live-tables/python-ref.html
Delta Live Tables SQL language reference: https://docs.databricks.com/en/delta-live-tables/sql-ref.html
โ09-04-2024 02:48 AM
Thank you @raphaelblg!
I chose to write an article on the subject after this discussion: https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=...
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group