cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Table Merge/Upserts

RLH
New Contributor

Hello,

I am trying to create a basic DLT pipeline which does an incremental load. First time it runs perfectly without any issues. However when there are records to be updated, the pipeline fails with the following error:

"Flow silver has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table silver_Employee to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table silver. A Full Refresh will attempt to clear all data from table silver and then load all data from the streaming source. The non-append change can be found at version 2. Operation: WRITE Username: [Not specified] Source table name: bronze"

The code I am using for the DLT script is below:

@dlt.create_table(
        name=bronze_tablename,
        comment="Raw historical transactions",
        path=f"{path_target_bronze}/{system}/{tablename}"        
    )
    def bronze_incremental():
        df = spark.read.format('csv').options(header=True, inferSchema = True, delimiter =';', ignoreChanges = True).load(f"{path_source}/")
        return df
 
 dlt.create_target_table(
        name=silver_tablename,
        path=f"{path_target_silver}/{system}/{tablename}",        
    )
    
    #sequence_by and keys are mandatory fields for apply_changes
    dlt.apply_changes(
        source=bronze_tablename,
        target=silver_tablename,
        keys=["EmpId"],
        sequence_by=col("modified_date"),        
        stored_as_scd_type=1
    )

Could you please let me know how we can perform an upserts or merge ?

Thanks in advance.

RLH

2 REPLIES 2

Anonymous
Not applicable

@Ram LH​ :

The error message suggests that there were updates or deletions made to the source table, which is not supported in streaming tables. Streaming tables in Databricks are meant to be append-only and any updates or deletions to the source table can result in data inconsistencies in the streaming table.

To resolve this issue, you can perform a full refresh of the target table instead of an incremental load. Also, you should convert the target table to a live table instead of a streaming live table if you expect updates or deletions to the source table in the future.

You can also consider implementing a change data capture (CDC) mechanism to capture only the changed data from the source table and apply those changes to the target table incrementally.

Please ensure source table is append only and there are no updates and deletions done.

Anonymous
Not applicable

Hi @Ram LH​ 

Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.

Please help us select the best solution by clicking on "Select As Best" if it does.

Your feedback will help us ensure that we are providing the best possible service to you. Thank you!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group