06-05-2025 12:14 PM
Hi All,
I'm working on a databricks delta live table(DLT) pipe line where we receive daily fully sanshot csv files in azure cloud storage .These files contain HR data (eg.employee file) and i'm using autoloader to ingest them into bronze layer DLT table.
My goal is to:
1.Detect changes in this snapshot data (Inserts/deletes/updates) at the bronze layer..and every insert and update should be appended to another table .(to append i'm using apply changes api here ,scd1,by using all columns as key columns ) so whatever changes its treat it as new record
2.On top of appended table wanted to create scd type1 and scd type 2 dimensions table using apply chnages api.
question:Can i reference an apply changes into(First) output as input to apply changes into (Second) in the same pipeline/or need to use different piepline suggest a better solution
06-06-2025 11:06 AM - edited 06-06-2025 11:11 AM
Hi @Ranga_naik1180, adding to the above point:
Since we have enabled the apply changes on the silver layer, it is updating the records successfully coming from the bronze table.Now, in the silver layer, that record is marked as an UPDATE instead of an INSERT, so we need to enable change data feed in the source table(silver layer table) of the gold layer.
To resolve this, we can go with the below solution:
To enable it, follow the process below:
dlt.create_target_table(
name="silver",
table_properties = {"delta.enableChangeDataFeed": "true"}
)
@dlt.view
def intermediate_update_view():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.filter(col("_change_type").isin(["update_postimage", "insert", "delete"]))
)
06-05-2025 12:45 PM
Yes, You Can Chain APPLY CHANGES INTO Operations in the Same Pipeline.
You can reference the output of one APPLY CHANGES INTO operation as input to another APPLY CHANGES INTO operation within the same DLT pipeline. This is a supported pattern and actually quite common for building layered SCD architectures.
06-06-2025 11:06 AM - edited 06-06-2025 11:11 AM
Hi @Ranga_naik1180, adding to the above point:
Since we have enabled the apply changes on the silver layer, it is updating the records successfully coming from the bronze table.Now, in the silver layer, that record is marked as an UPDATE instead of an INSERT, so we need to enable change data feed in the source table(silver layer table) of the gold layer.
To resolve this, we can go with the below solution:
To enable it, follow the process below:
dlt.create_target_table(
name="silver",
table_properties = {"delta.enableChangeDataFeed": "true"}
)
@dlt.view
def intermediate_update_view():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.filter(col("_change_type").isin(["update_postimage", "insert", "delete"]))
)
06-06-2025 01:17 PM
HI @nikhilj0421 ,I get full snapshot file daily. I'm using auto loader to load file to bronze DLT table after that i'm applying apply changes to implement scd1(all columsn as key so update/delete may get inserted), changed records and store all delta records (Append source) to silver table ...after that i will use apply changes again to create gold tables(Dimesnisons)
This is my requirement . 1.need to capture daily changes in once table and use that table as append source ..and implement scd/scd1 using apply changes ..so if i use change data feed on silver table will it work ? require more detailed understanding on this if possible, please
06-06-2025 01:19 PM
apply changes on the silver layer, it is updating the records successfully coming from the bronze table.
As you mentioned above this is what happening ..
06-06-2025 06:30 PM
Hi @Ranga_naik1180, let's take an example to understand this:
Flow of the pipeline: Bronze -> Silver -> Gold
In Storage: You have 2 files, 1.json is the original file, and 2.json is updating the value of b in the new file from b to b_new.
1.json -> {A: “a”, B: “b”}
2.json -> {A: “a”, B: “b_new”}
Then, changeFeed is a solution
_delta_log -> This will have the entries as updates for the updated entry, but _change_feed will take that as an insert operation. Since the streaming supports append-only, that's why we need to read from the _change_feed because it has both operations as INSERT.
Abc1.parquet
{A: “a”, B: “b”} (INSERT)
Abc2.parquet
{A: “a”, B: “b_new”} (UPDATE)
_change_feed
abc1_change.parquet (INSERT)
{A: “a”, B: “b”, “change_type”: “INSERT”}
abc2_change.parquet (INSERT)
{A: “a”, B: “b”, “change_type”: “UPDATE”, “change_image”: “pre_image”}
{A: “a”, B: “b_new”, “change_type”: “UPDATE”, “change_image”: “post_image”}
06-08-2025 01:41 PM
Hi @nikhilj0421 Thanks for your clarification,if i use python dlt code for me solution is working ..but i just need another help
for below code what will be the equivalent SQL code .. when i'm trying to use
create or replace temporary view intermediate_view as select * from table_chnage(source,2) where _change_type("insert");
apply_chnages into
target_table="golde_layer_tbl"
source_table="intemediate_view" if give like this getting an error that view is not a stream table so ..how i can tackle this ..
@dlt.view def intermediate_update_view(): return ( spark.readStream .option("readChangeFeed", "true") .table("source_table") .filter(col("_change_type").isin(["update_postimage", "insert", "delete"])) )
06-12-2025 10:28 AM
Hi @Ranga_naik1180,
There is no need to create an intermediate view in SQL. You can directly read the change data feed from silver into the gold table. You can use the code something like below:
CREATE STREAMING LIVE TABLE gold_table
AS SELECT *
FROM table_changes('live.silver_table', 0); -- Reads change feed starting from version 0
Please let me know if this works for you.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now