I am trying to setup delta live tables pipelines to ingest data to bronze and silver tables. Bronze and Silver are separate schema.
This will be triggered by a daily job. It appears to run fine when set as continuous, but fails when triggered.
Tables:
datasource.bronze.customer
datasource.silver.customerview
datasource.silver.customer
Currently the Unity Catalog integration with Delta Live Tables requires there to be separate pipelines for the bronze schema and the silver schema. Therefore I start by copying the data from bronze (datasource.bronze.customer) to silver (datasource.silver.customerview) as a pipeline cannot directly reference a table that was created outside of the pipeline. From that table (datasource.silver.customerview) I'm applying the changes to the silver table (datasource.silver.customer)
My setup is failing in the silver pipeline with the following error:
โFlow customer has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table customer to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table customer. A Full Refresh will attempt to clear all data from table customer and then load all data from the streaming source.
The non-append change can be found at version 16.
Operation: WRITE
Username: [Not specified]
Source table name: customerviewโ
Any suggestions on this error or ways to correctly setup this dlt pipeline?
Or even just an example/template/demo of how to set this up with unity catalog pipelines and the three-level-namespace would be much appreciated.
CODE:
# Create temp silver table for UC workaround until three-level namespace is available
@dlt.table(
name = viewNameSilver
)
def create_silver_temp_view():
return spark.table(f'datasource.bronze.{tableNameBronze}')
# Create the target table definition
#create_streaming_live_table() - deprecated
#create_target_table() - deprecated
dlt.create_streaming_table(
name = tableNameSilver,
comment = f"Clean, merged {tableNameSilver}",
table_properties = {
"quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
# apply scd2 to silver table
dlt.apply_changes(
target = tableNameSilver,
source = viewNameSilver,
keys = primaryKeyCol,
sequence_by = col('__EffectiveStartDate'),
except_column_list = ['__EffectiveStartDate'],
stored_as_scd_type = sCDType
)