cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

One-time backfill for DLT streaming table before apply_changes

Anske
New Contributor III

Hi,

absolute Databricks noob here, but I'm trying to set up a DLT pipeline that processes cdc records from an external sql server instance to create a mirrored table in my databricks delta lakehouse. 

For this, I need to do some initial one-time backfill of my final target table that changes are merged into, as cdc was only turned on in the source recently, that I'm removing from the notebook after the initial run. 

However, when I start the pipeline, the first run fails with the message: "Target 'cdctest' had both an APPLY CHANGES flow 'cdctest' and a non-APPLY CHANGES flow 'snapshot_inifill'. APPLY CHANGES targets may only have APPLY CHANGES flows."

Does anyone know or can anyone point me to some documentation that shows how to initially fill the target of the apply changes table? Or should I hack the existing records into the source as inserts somehow (append them to the source with a small sequence_by value, so they are inserted first off, and remove this code after initial run)?

p.s. in the notebook, it looks like this (in the notebook, preceding below code there is some initial creation of dlt tables in order to create the source table for the apply_changes operation:  cdctest_cdc_enriched):

 

dlt.create_streaming_table(
    name="cdctest",
    comment="the table that mirrors the dbo.cdctest table in the application database. CDC records obtained from the source are merged into this table to keep it in synch.",
)
@dlt.append_flow(target="cdctest")
def snapshot_inifill():
    return spark.readStream.table("hive_metastore.default.cdctest_hist")
 
dlt.apply_changes(
    target = "cdctest",
    source = "cdctest_cdc_enriched",
    keys = ["ID"],
    sequence_by = col("tran_begin_time"),
    apply_as_deletes = expr("operation = 1"),
    except_column_list = ["operation","tran_begin_time"],
    stored_as_scd_type = 1
)
1 REPLY 1

Anske
New Contributor III

So since nobody responded, I decided to try my own suggestion and hack the snapshot data into the table that gathers the change data capture. After some straying I ended up with the notebook as attached.

The notebook first creates 2 dlt tables (lookup_time_table and cdctest_cdc_raw) reflecting the cdc data captured by sql server for source table dbo.cdctest (cdc.lsn_time_mapping and dbo_cdctest_CT). It then defines another dlt table named "cdctest_cdc_enriched" as a join of these two initial tables. When ran for the first time, the notebook then appends some previously existing data (that was already in dbo.cdctest before cdc was turned on) by using the @Dlt.append_flow(target="cdctest_cdc_enriched"). It then creates a streaming table called cdctest, in which changes from cdctest_cdc_enriched are applied.

This pipeline runs fine the first time (or when fully refreshed), and also updates successfully when no DML is done on the source table dbo.cdctest in sql server db after the initial run/full refresh.

However, when I did some DML on the dbo.cdctest table in sql server (causing appends on used tables cdc.lsn_time_mapping and cdc.dbo_cdctest_CT, but of course no deletes or updates are done in these tables), and then ran the pipeline again (to process the changes), the pipeline fails with the following error:

Flow 'cdctest_cdc_enriched' has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table cdctest_cdc_enriched to a materialized view instead of a streaming table. If you are fine to skip changes, set the option 'skipChangeCommits' to 'true'......The non-append change can be found at version 28. Operation: WRITE Username: ... Source table name: lookup_time_table

The problem with this is, that I already set skipChangeCommits for the two source tables (lookup_time_table and cdctest_cdc_raw) to true, and also that I'm pretty sure there are no actual updates or deletes in the source at the moment. I set this option to true because in a future update run, there might be deletes caused by the fact that cdc records are only kept in the cdc tables (by default sql server behaviour) for three days).
I tried having table cdctest_cdc_enriched as a materialized view, but then append_flow fails.

So two questions,
One is where is the failure coming from as there are no updates in the source (yet)?
Two, how to get the dlt pipeline to ignore that updates and just process the inserts to the cdc tables?