cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline with unknown deleted source data

janglais
New Contributor
Hello.. I need help.
 
So the context is : 
- ERP data for company in my group is stored in sql tables 
- Currently, once per day we copy the last 2 months of data (creation date) from each table into our datalake landing zone (we can however do full copies if needed)
- A databricks DLT pipeline then ingests this data : append into bronze using readstream autoloader and then apply_changes upsert scd type 1 into silver
 
This was working fine until we realised there were rows being deleted from the source data and no flag and no reference table - i.e. the only way for us to know that a line has been deleted is to compare with the data we have. 
So we want to update the silver with this information by adding a is_deleted = True flag to the rows that have been deleted. 
 
My basic plan is based around do a weekly catch up. Every saturday we collect a full copy of all the tables. 
There have then been two main attemps. 
 
1. Compare this latest data ingestion with the current silver table 
-> Immediate no go : Cannot read silver table that is the target table of the current pipeline. 
 
2. Compare the latest data ingestion with the previous bronze data. 
- At start of silver pipeline -> load bronze -> split into latest ingestion and the rest (**)
- If a line is found in previous data but not in the latest full ingestion = it has been deleted
- I added a parameter to the function such that if delete_missing_rows  (signifying a full catch up)= True, it does this comparison, if= False, it does like before and just upserts the last 2 months of data. 
 
Then i started hitting problems. 
My first attempt at doing a full catch up where delete_missing_rows= True failed because the pipeline claimed that there was a schema change because there was the column "__enzyme__row__id" in the target silver table and not in the 
To bypass this i do a full refresh. 
This worked and i was really happy. My silver table now has flagged rows that had been deleted from the source data. 
However a new problem came when i tested going back to doing a normal run (the full catch up is only once per week, the rest of the week the table is upserted into with 2 months of data).
When going back to a normal update i get the problem seen in the attachment image.png (which from what i can understand comes from the fact that ther source table which is the non temporary staging dlt.table) has changed - since there is not the same comparison and addition of deleted = true flags
 
I have attached the function. 
 
Hopefully you have enough info.
Thanks in advance !
1 ACCEPTED SOLUTION

Accepted Solutions

mark_ott
Databricks Employee
Databricks Employee

The issue you're experiencing is a common challenge in Delta Live Tables (DLT) when implementing mixed refresh patterns (weekly full refresh + daily incremental updates) with schema evolution. The "__enzyme__row__id" column and schema mismatch errors are indicators of DLT's internal tracking mechanisms conflicting with your conditional logic.

Root Cause Analysis

The core problem stems from DLT's expectation of consistent schema across pipeline runs. When you switch between full catch-up mode (with is_deleted flag logic) and normal incremental mode, you're essentially creating two different schemas:​

  1. Full catch-up mode: Includes additional columns and logic for deleted record detection

  2. Normal mode: Standard upsert without deleted record tracking

The "__enzyme__row__id" column is DLT's internal row tracking mechanism , and schema mismatches occur when DLT expects consistent column structures between runs.​

Recommended Solutions

Solution 1: Consistent Schema Approach (Recommended)

Always include the is_deleted column in your silver table schema, regardless of the refresh type:

python
@Dlt.table def silver_table(): df = dlt.read("bronze_table") # Always add is_deleted column, defaulting to False for normal runs if not spark.conf.get("delete_missing_rows", "false").lower() == "true": df = df.withColumn("is_deleted", lit(False)) else: # Your existing full catch-up logic here pass return df

Solution 2: Separate Tables Strategy

Create separate silver tables for different refresh patterns:

  • silver_table_incremental: For daily updates

  • silver_table_full: For weekly full refreshes with delete tracking

Then use a view or downstream table to union them appropriately.​

Solution 3: Schema Evolution Configuration

Enable automatic schema evolution in your DLT pipeline configuration:

python
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

However, use this cautiously as it can mask unintended schema changes.​

Best Practices for Your Use Case

Weekly Full Refresh Strategy

  1. Use DLT's built-in full refresh capability rather than custom logic​

  2. Schedule separate pipelines for full vs incremental refreshes

  3. Leverage Change Data Feed (CDF) for deleted record tracking​

Handling Deleted Records

Consider implementing a more robust deleted record detection strategy:

python
@Dlt.table def silver_table_with_deletes(): current_data = dlt.read("bronze_latest") if is_full_refresh_run(): # Compare with previous state previous_data = dlt.read("bronze_previous") deleted_records = detect_deleted_records(previous_data, current_data) return current_data.union(deleted_records.withColumn("is_deleted", lit(True))) else: return current_data.withColumn("is_deleted", lit(False))

Implementation Recommendations

Option A: Redesign with Consistent Schema

  • Always include is_deleted column in silver table

  • Use pipeline parameters to control delete detection logic

  • Maintain schema consistency across all runs

Option B: Use DLT's Native Capabilities

  • Leverage apply_changes with apply_as_deletes parameter​

  • Enable Change Data Feed on source tables

  • Use SCD Type 1 for complete record removal

Option C: Dual Pipeline Approach

  • Separate pipeline for weekly full refresh with delete detection

  • Daily incremental pipeline for standard upserts

  • Downstream merge process to combine results

Avoiding Full Refresh Requirements

To prevent the need for full refreshes due to schema changes:

  1. Define complete schema upfront including optional columns

  2. Use schema evolution settings appropriate for your use case​

  3. Test schema changes in development before production deployment

  4. Consider using streaming tables instead of materialized views for more flexibility​

The most sustainable approach would be Option A (consistent schema) combined with DLT's native delete handling capabilities, as it maintains schema consistency while providing the flexibility you need for both refresh patterns.

View solution in original post

2 REPLIES 2

madams
Contributor III

Your solution #1 is very frustrating to me as well, for a number of reasons.  Simply put, we have to be able to compare incoming data to target data for normal ETL operations. 

One way around this is to create a view of your target silver table, outside of your pipeline (this part is key), and compare against that view.  The pipeline will give you a warning telling you that you're being naughty, but it will let you do the comparison anyways.  We have had to do this and it does work, although it's clearly not a desirable solution to have to maintain this set of views, but it would allow you to perform the deletion check.

Example: `create or replace view silver.vw_target_table as select * from silver.target_table;`

Someone else might be able to chime in with a more elegant solution, but that's what I've got for you at least!

mark_ott
Databricks Employee
Databricks Employee

The issue you're experiencing is a common challenge in Delta Live Tables (DLT) when implementing mixed refresh patterns (weekly full refresh + daily incremental updates) with schema evolution. The "__enzyme__row__id" column and schema mismatch errors are indicators of DLT's internal tracking mechanisms conflicting with your conditional logic.

Root Cause Analysis

The core problem stems from DLT's expectation of consistent schema across pipeline runs. When you switch between full catch-up mode (with is_deleted flag logic) and normal incremental mode, you're essentially creating two different schemas:​

  1. Full catch-up mode: Includes additional columns and logic for deleted record detection

  2. Normal mode: Standard upsert without deleted record tracking

The "__enzyme__row__id" column is DLT's internal row tracking mechanism , and schema mismatches occur when DLT expects consistent column structures between runs.​

Recommended Solutions

Solution 1: Consistent Schema Approach (Recommended)

Always include the is_deleted column in your silver table schema, regardless of the refresh type:

python
@Dlt.table def silver_table(): df = dlt.read("bronze_table") # Always add is_deleted column, defaulting to False for normal runs if not spark.conf.get("delete_missing_rows", "false").lower() == "true": df = df.withColumn("is_deleted", lit(False)) else: # Your existing full catch-up logic here pass return df

Solution 2: Separate Tables Strategy

Create separate silver tables for different refresh patterns:

  • silver_table_incremental: For daily updates

  • silver_table_full: For weekly full refreshes with delete tracking

Then use a view or downstream table to union them appropriately.​

Solution 3: Schema Evolution Configuration

Enable automatic schema evolution in your DLT pipeline configuration:

python
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

However, use this cautiously as it can mask unintended schema changes.​

Best Practices for Your Use Case

Weekly Full Refresh Strategy

  1. Use DLT's built-in full refresh capability rather than custom logic​

  2. Schedule separate pipelines for full vs incremental refreshes

  3. Leverage Change Data Feed (CDF) for deleted record tracking​

Handling Deleted Records

Consider implementing a more robust deleted record detection strategy:

python
@Dlt.table def silver_table_with_deletes(): current_data = dlt.read("bronze_latest") if is_full_refresh_run(): # Compare with previous state previous_data = dlt.read("bronze_previous") deleted_records = detect_deleted_records(previous_data, current_data) return current_data.union(deleted_records.withColumn("is_deleted", lit(True))) else: return current_data.withColumn("is_deleted", lit(False))

Implementation Recommendations

Option A: Redesign with Consistent Schema

  • Always include is_deleted column in silver table

  • Use pipeline parameters to control delete detection logic

  • Maintain schema consistency across all runs

Option B: Use DLT's Native Capabilities

  • Leverage apply_changes with apply_as_deletes parameter​

  • Enable Change Data Feed on source tables

  • Use SCD Type 1 for complete record removal

Option C: Dual Pipeline Approach

  • Separate pipeline for weekly full refresh with delete detection

  • Daily incremental pipeline for standard upserts

  • Downstream merge process to combine results

Avoiding Full Refresh Requirements

To prevent the need for full refreshes due to schema changes:

  1. Define complete schema upfront including optional columns

  2. Use schema evolution settings appropriate for your use case​

  3. Test schema changes in development before production deployment

  4. Consider using streaming tables instead of materialized views for more flexibility​

The most sustainable approach would be Option A (consistent schema) combined with DLT's native delete handling capabilities, as it maintains schema consistency while providing the flexibility you need for both refresh patterns.