Setup for Unity Catalog, autoloader, three-level namespace, SCD2
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2023 05:45 PM
I am trying to setup delta live tables pipelines to ingest data to bronze and silver tables. Bronze and Silver are separate schema.
This will be triggered by a daily job. It appears to run fine when set as continuous, but fails when triggered.
Tables:
datasource.bronze.customer
datasource.silver.customerview
datasource.silver.customer
Currently the Unity Catalog integration with Delta Live Tables requires there to be separate pipelines for the bronze schema and the silver schema. Therefore I start by copying the data from bronze (datasource.bronze.customer) to silver (datasource.silver.customerview) as a pipeline cannot directly reference a table that was created outside of the pipeline. From that table (datasource.silver.customerview) I'm applying the changes to the silver table (datasource.silver.customer)
My setup is failing in the silver pipeline with the following error:
“Flow customer has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table customer to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table customer. A Full Refresh will attempt to clear all data from table customer and then load all data from the streaming source.
The non-append change can be found at version 16.
Operation: WRITE
Username: [Not specified]
Source table name: customerview”
Any suggestions on this error or ways to correctly setup this dlt pipeline?
Or even just an example/template/demo of how to set this up with unity catalog pipelines and the three-level-namespace would be much appreciated.
CODE:
# Create temp silver table for UC workaround until three-level namespace is available
@dlt.table(
name = viewNameSilver
)
def create_silver_temp_view():
return spark.table(f'datasource.bronze.{tableNameBronze}')
# Create the target table definition
#create_streaming_live_table() - deprecated
#create_target_table() - deprecated
dlt.create_streaming_table(
name = tableNameSilver,
comment = f"Clean, merged {tableNameSilver}",
table_properties = {
"quality": "silver",
"pipelines.autoOptimize.managed": "true"
}
)
# apply scd2 to silver table
dlt.apply_changes(
target = tableNameSilver,
source = viewNameSilver,
keys = primaryKeyCol,
sequence_by = col('__EffectiveStartDate'),
except_column_list = ['__EffectiveStartDate'],
stored_as_scd_type = sCDType
)
- Labels:
-
Catalog
-
Delta
-
DLT Pipeline
-
PROBLEM
-
Schema
-
Unity Catalog
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-02-2023 07:07 AM
@Jennette Shepard as per below thread looks merges and deletes are not supported only append is supported and seems to be limitation, can you please follow recommendations based on below thread
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-02-2023 07:48 AM
Thanks. Since I'm running this in a triggered mode it feels like its reprocessing existing files. I wonder if I'm missing something in my definition of the bronze table? Shouldn't it only process new files (as long as I'm not doing a full refresh)? The existing files are not being changed, it should just read in the new file that was received each day.
Here is my code for Bronze:
@dlt.table(
name=tableNameBronze,
comment = "Raw data ingested from bronze",
table_properties = {
"myCompanyPipeline.quality" : "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def create_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.inferColumnTypes","false")
.option("cloudFiles.schemaHints", schemaHintsBronze)
.option("cloudFiles.format", sourceFormat)
.option("cloudFiles.schemaEvolutionMode","rescue")
.option("cloudFiles.rescuedDataColumn","__RescuedData")
.option("pathGlobfilter", fileNamePrefix)
.load(dataPathBronze)
.select(
"*"
,col("_metadata.file_name").alias("__SourceFile")
,current_timestamp().alias("__IngestionDate")
,to_date(substring(col("_metadata.file_name"), -21, 8),'yyyyMMdd').alias("__EffectiveStartDate")
)
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-14-2023 12:17 AM
Hi @Jennette Shepard
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!

