cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Setup for Unity Catalog, autoloader, three-level namespace, SCD2

js54123875
New Contributor III

I am trying to setup delta live tables pipelines to ingest data to bronze and silver tables. Bronze and Silver are separate schema.

This will be triggered by a daily job. It appears to run fine when set as continuous, but fails when triggered.

Tables:

datasource.bronze.customer

datasource.silver.customerview

datasource.silver.customer

Currently the Unity Catalog integration with Delta Live Tables requires there to be separate pipelines for the bronze schema and the silver schema. Therefore I start by copying the data from bronze (datasource.bronze.customer) to silver (datasource.silver.customerview) as a pipeline cannot directly reference a table that was created outside of the pipeline. From that table (datasource.silver.customerview) I'm applying the changes to the silver table (datasource.silver.customer)

My setup is failing in the silver pipeline with the following error:

“Flow customer has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table customer to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table customer. A Full Refresh will attempt to clear all data from table customer and then load all data from the streaming source.

The non-append change can be found at version 16.

Operation: WRITE

Username: [Not specified]

Source table name: customerview”

Any suggestions on this error or ways to correctly setup this dlt pipeline?

Or even just an example/template/demo of how to set this up with unity catalog pipelines and the three-level-namespace would be much appreciated.

CODE:

# Create temp silver table for UC workaround until three-level namespace is available
@dlt.table(
  name = viewNameSilver
)
def create_silver_temp_view():
  return spark.table(f'datasource.bronze.{tableNameBronze}')
 
# Create the target table definition
#create_streaming_live_table() - deprecated
#create_target_table() - deprecated
dlt.create_streaming_table(
  name = tableNameSilver,
  comment = f"Clean, merged {tableNameSilver}",
  table_properties = {
    "quality": "silver",
    "pipelines.autoOptimize.managed": "true"
  }
)
 
# apply scd2 to silver table
dlt.apply_changes(
  target = tableNameSilver,
  source = viewNameSilver,
  keys = primaryKeyCol,
  sequence_by = col('__EffectiveStartDate'),
  except_column_list = ['__EffectiveStartDate'],
  stored_as_scd_type = sCDType
)

3 REPLIES 3

karthik_p
Esteemed Contributor

@Jennette Shepard​ as per below thread looks merges and deletes are not supported only append is supported and seems to be limitation, can you please follow recommendations based on below thread

https://community.databricks.com/s/question/0D53f00001sRRiGCAW/getting-error-when-using-cdc-in-delta...

js54123875
New Contributor III

Thanks. Since I'm running this in a triggered mode it feels like its reprocessing existing files. I wonder if I'm missing something in my definition of the bronze table? Shouldn't it only process new files (as long as I'm not doing a full refresh)? The existing files are not being changed, it should just read in the new file that was received each day.

Here is my code for Bronze:

@dlt.table(
      name=tableNameBronze,
      comment = "Raw data ingested from bronze",
      table_properties = {
        "myCompanyPipeline.quality" : "bronze",
        "pipelines.autoOptimize.managed": "true"
      }
    )
    def create_bronze():
      return (
        spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.inferColumnTypes","false")
          .option("cloudFiles.schemaHints", schemaHintsBronze)
          .option("cloudFiles.format", sourceFormat)
          .option("cloudFiles.schemaEvolutionMode","rescue")
          .option("cloudFiles.rescuedDataColumn","__RescuedData")
          .option("pathGlobfilter", fileNamePrefix) 
          .load(dataPathBronze)
          .select(
            "*"
            ,col("_metadata.file_name").alias("__SourceFile")
            ,current_timestamp().alias("__IngestionDate")
            ,to_date(substring(col("_metadata.file_name"), -21, 8),'yyyyMMdd').alias("__EffectiveStartDate")
          )
        )

Anonymous
Not applicable

Hi @Jennette Shepard​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!