cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

urgent: apply changes into delta live tables

Faisal
Contributor

Hi @Kaniz , @Avnish_Jain 

How can I implement cdc in sql dlt pipelines with a live table (not streaming) . I am trying to implement below where i am reading from external tables, loading data into bronze layer and then want to apply these changes into silver table but I am getting error with the apply changes into syntax.

 

create
or refresh live table bronze_dtl (
  name string,
  age string,
  trk_nbr string,
  perd_nbr string,
  ld_timestamp timestamp
) comment "raw data" tblproperties (
  'quality' = 'silver',
  'delta.enableChangeDataFeed' = 'true'
) partitioned by (perd_nbr) as (
  select 
  name,
  age,
  trk_nbr,
  perd_nbr,
  current_timestamp() as ld_timestamp
);

create
or refresh live table silver_dtl (
  name string,
  age string,
  trk_nbr string,
  perd_nbr string,
  ld_timestamp timestamp
) comment "curated data" tblproperties (
  'quality' = 'silver',
  'delta.enableChangeDataFeed' = 'true'
) partitioned by (perd_nbr);

apply changes into live.silver_dtl
from
  live.bronze_dtl keys (trk_nbr, perd_nbr) sequence by ld_timestamp stored as scd type 1;

 

 Please let me know wheres the issue, and any shortcomings in the above example specifically in cdc part

4 REPLIES 4

Faisal
Contributor

Below is the error I am getting, I am extracting data using queries into a live table (MV) and then intend to use it to merge changes into a streaming silver table, need to know the correct approach, the official documentation is unclear

Error details
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query terminated with exception: Detected a data update in the source table at version 3. This is currently not supported. If you'd like to ignore updates, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

Avnish_Jain
Moderator
Moderator

Hi Faisal, can you please elaborate on the reason why the bronze_dtl table is a live table and not a streaming live table?

Faisal
Contributor

reason being data at source is subject to restatements and ST is not allowing to merge changes using appl changes into

Avnish_Jain
Moderator
Moderator

Hi Faisal, APPLY CHANGES INTO does not support a materialized view as a source, this must be a streaming table.

Ideally, your bronze tables are append-only with the source providing data incrementally. If you do get revisions on previous records in your data, then these should be appended as separate rows into your bronze table which you can then use APPLY CHANGES INTO your silver role to maintain the accurate/most-up-to date version of a record.

For example:

Assume we have already received and ingested values for IDs 1-3, and Source receives changes for these records:

ID,Value
1,Updated_V1
2,Updated_V2
3,Updated_V3

Our Bronze table would look like:

ID,Value,Insert_Ts,...
1,Old_V1,Some_Past_Date,...
2,Old_V2,Some_Past_Date,...
3,Old_V3,Some_Past_Date,...
4,Old_V4,Some_Past_Date,...
1,Updated_V1,Today_Date,...
2,Updated_V2,Today_Date,...
3,Updated_V3,Today_Date,...

You can then use Apply Changes into the Silver table which will yield the below if using SCD 1.

ID,Value,Insert_Ts,...
4,Old_V4,Some_Past_Date,...
1,Updated_V1,Today_Date,...
2,Updated_V2,Today_Date,...
3,Updated_V3,Today_Date,...

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.