cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live table

Ranga_naik1180
New Contributor III

Hi All,

I'm working on a databricks delta live table(DLT) pipe line where we receive daily fully sanshot csv files in azure cloud storage .These files contain HR data (eg.employee file) and i'm using autoloader to ingest them into bronze layer DLT table.

My goal is to:

1.Detect changes in this snapshot data (Inserts/deletes/updates) at the bronze layer..and every insert and update should be appended to another table .(to append i'm using apply changes api here ,scd1,by using all columns as key columns ) so whatever changes its treat it as new record 

2.On top of appended table wanted to create scd type1 and scd type 2 dimensions table using apply chnages api.

 

question:Can i reference an apply changes into(First) output as input to apply changes into (Second) in the same pipeline/or need to use different piepline suggest a better solution

1 ACCEPTED SOLUTION

Accepted Solutions

nikhilj0421
Databricks Employee
Databricks Employee

Hi @Ranga_naik1180, adding to the above point: 

Since we have enabled the apply changes on the silver layer, it is updating the records successfully coming from the bronze table.Now, in the silver layer, that record is marked as an UPDATE instead of an INSERT, so we need to enable change data feed in the source table(silver layer table) of the gold layer. 

To resolve this, we can go with the below solution:

 To enable it, follow the process below:

  • Enable the change data feed in the silver table:
dlt.create_target_table(

name="silver",

table_properties = {"delta.enableChangeDataFeed": "true"}

)
  • Read change data feed from source table:
@dlt.view

def intermediate_update_view():

  return (

    spark.readStream

    .option("readChangeFeed", "true")

    .table("source_table")

    .filter(col("_change_type").isin(["update_postimage", "insert", "delete"]))

  )

 

View solution in original post

7 REPLIES 7

lingareddy_Alva
Honored Contributor II

Hi @Ranga_naik1180 

Yes, You Can Chain APPLY CHANGES INTO Operations in the Same Pipeline.

You can reference the output of one APPLY CHANGES INTO operation as input to another APPLY CHANGES INTO operation within the same DLT pipeline. This is a supported pattern and actually quite common for building layered SCD architectures.

 

LR

nikhilj0421
Databricks Employee
Databricks Employee

Hi @Ranga_naik1180, adding to the above point: 

Since we have enabled the apply changes on the silver layer, it is updating the records successfully coming from the bronze table.Now, in the silver layer, that record is marked as an UPDATE instead of an INSERT, so we need to enable change data feed in the source table(silver layer table) of the gold layer. 

To resolve this, we can go with the below solution:

 To enable it, follow the process below:

  • Enable the change data feed in the silver table:
dlt.create_target_table(

name="silver",

table_properties = {"delta.enableChangeDataFeed": "true"}

)
  • Read change data feed from source table:
@dlt.view

def intermediate_update_view():

  return (

    spark.readStream

    .option("readChangeFeed", "true")

    .table("source_table")

    .filter(col("_change_type").isin(["update_postimage", "insert", "delete"]))

  )

 

Ranga_naik1180
New Contributor III

HI @nikhilj0421 ,I get full snapshot file daily. I'm using auto loader to load file to bronze DLT table after that i'm applying apply changes to implement scd1(all columsn as key so update/delete may get inserted), changed records and store all delta records (Append source) to silver table ...after that i will use apply changes again to create gold tables(Dimesnisons)

 

This is my requirement . 1.need to capture daily changes in once table and use that table as append source ..and implement scd/scd1 using apply changes ..so if i use change data feed on silver table will it work ? require more detailed understanding on this if possible, please

Ranga_naik1180
New Contributor III

apply changes on the silver layer, it is updating the records successfully coming from the bronze table.

As you mentioned above this is what happening ..

nikhilj0421
Databricks Employee
Databricks Employee

Hi @Ranga_naik1180, let's take an example to understand this:

Flow of the pipeline: Bronze -> Silver -> Gold

In Storage: You have 2 files, 1.json is the original file, and 2.json is updating the value of b in the new file from b to b_new. 

1.json -> {A: “a”, B: “b”}

2.json -> {A: “a”, B: “b_new”}

Then, changeFeed is a solution

_delta_log -> This will have the entries as updates for the updated entry, but _change_feed will take that as an insert operation. Since the streaming supports append-only, that's why we need to read from the  _change_feed because it has both operations as INSERT.

Abc1.parquet

{A: “a”, B: “b”} (INSERT)

Abc2.parquet

{A: “a”, B: “b_new”} (UPDATE)

_change_feed

abc1_change.parquet (INSERT)

{A: “a”, B: “b”, “change_type”: “INSERT”}

abc2_change.parquet (INSERT)

{A: “a”, B: “b”, “change_type”: “UPDATE”, “change_image”: “pre_image”}

{A: “a”, B: “b_new”, “change_type”: “UPDATE”, “change_image”: “post_image”}

 

Ranga_naik1180
New Contributor III

Hi @nikhilj0421  Thanks for your clarification,if i use python dlt code for me solution is working ..but i just need another help 

for below code what will be the equivalent SQL code .. when i'm trying to use 

create or replace temporary view intermediate_view as select * from table_chnage(source,2) where _change_type("insert");

apply_chnages into 

target_table="golde_layer_tbl"

source_table="intemediate_view" if give like this getting an error that view is not a stream table so ..how i can tackle this ..

 

@dlt.view

def intermediate_update_view():

  return (

    spark.readStream

    .option("readChangeFeed", "true")

    .table("source_table")

    .filter(col("_change_type").isin(["update_postimage", "insert", "delete"]))

  )

 

nikhilj0421
Databricks Employee
Databricks Employee

Hi @Ranga_naik1180,

There is no need to create an intermediate view in SQL. You can directly read the change data feed from silver into the gold table. You can use the code something like below:

CREATE STREAMING LIVE TABLE gold_table
AS SELECT * 
FROM table_changes('live.silver_table', 0); -- Reads change feed starting from version 0

Please let me know if this works for you.