cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Table Merge/Upserts

RLH
New Contributor

Hello,

I am trying to create a basic DLT pipeline which does an incremental load. First time it runs perfectly without any issues. However when there are records to be updated, the pipeline fails with the following error:

"Flow silver has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table silver_Employee to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table silver. A Full Refresh will attempt to clear all data from table silver and then load all data from the streaming source. The non-append change can be found at version 2. Operation: WRITE Username: [Not specified] Source table name: bronze"

The code I am using for the DLT script is below:

@dlt.create_table(
        name=bronze_tablename,
        comment="Raw historical transactions",
        path=f"{path_target_bronze}/{system}/{tablename}"        
    )
    def bronze_incremental():
        df = spark.read.format('csv').options(header=True, inferSchema = True, delimiter =';', ignoreChanges = True).load(f"{path_source}/")
        return df
 
 dlt.create_target_table(
        name=silver_tablename,
        path=f"{path_target_silver}/{system}/{tablename}",        
    )
    
    #sequence_by and keys are mandatory fields for apply_changes
    dlt.apply_changes(
        source=bronze_tablename,
        target=silver_tablename,
        keys=["EmpId"],
        sequence_by=col("modified_date"),        
        stored_as_scd_type=1
    )

Could you please let me know how we can perform an upserts or merge ?

Thanks in advance.

RLH

2 REPLIES 2

Anonymous
Not applicable

@Ram LH​ :

The error message suggests that there were updates or deletions made to the source table, which is not supported in streaming tables. Streaming tables in Databricks are meant to be append-only and any updates or deletions to the source table can result in data inconsistencies in the streaming table.

To resolve this issue, you can perform a full refresh of the target table instead of an incremental load. Also, you should convert the target table to a live table instead of a streaming live table if you expect updates or deletions to the source table in the future.

You can also consider implementing a change data capture (CDC) mechanism to capture only the changed data from the source table and apply those changes to the target table incrementally.

Please ensure source table is append only and there are no updates and deletions done.

Anonymous
Not applicable

Hi @Ram LH​ 

Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.

Please help us select the best solution by clicking on "Select As Best" if it does.

Your feedback will help us ensure that we are providing the best possible service to you. Thank you!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.