Hello,
I am trying to create a basic DLT pipeline which does an incremental load. First time it runs perfectly without any issues. However when there are records to be updated, the pipeline fails with the following error:
"Flow silver has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table silver_Employee to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table silver. A Full Refresh will attempt to clear all data from table silver and then load all data from the streaming source. The non-append change can be found at version 2. Operation: WRITE Username: [Not specified] Source table name: bronze"
The code I am using for the DLT script is below:
@dlt.create_table(
name=bronze_tablename,
comment="Raw historical transactions",
path=f"{path_target_bronze}/{system}/{tablename}"
)
def bronze_incremental():
df = spark.read.format('csv').options(header=True, inferSchema = True, delimiter =';', ignoreChanges = True).load(f"{path_source}/")
return df
dlt.create_target_table(
name=silver_tablename,
path=f"{path_target_silver}/{system}/{tablename}",
)
#sequence_by and keys are mandatory fields for apply_changes
dlt.apply_changes(
source=bronze_tablename,
target=silver_tablename,
keys=["EmpId"],
sequence_by=col("modified_date"),
stored_as_scd_type=1
)
Could you please let me know how we can perform an upserts or merge ?
Thanks in advance.
RLH