cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Urgent Help Needed - Databricks Notebook Failure Handle for Incremental Processing

rrajan
New Contributor II

I have created a notebook which helps in creating three different gold layer objects from one single silver table. All these tables are processed incremently. I want to develop the failure handling scenario in case if the pipeline fails after loading few of the records in the first table or if one of the gold table loaded successfully and the second failed.

In this case while re-running the pipeline from scratch I don't want to insert already inserted records again in one of  the gold table. How to handle this type of scenario ?

4 REPLIES 4

Walter_C
Databricks Employee
Databricks Employee

To handle the scenario where your pipeline fails after loading some records into the first gold table or if one gold table loads successfully while the second fails, you can implement a failure handling mechanism that ensures already inserted records are not reprocessed when the pipeline is re-run. Here are some steps you can follow:

  1. Use Delta Lake for ACID Transactions: Delta Lake provides ACID transactions, which can help ensure that your data is consistent and reliable. If a failure occurs, you can use Delta Lake's transaction log to identify which records have already been processed.

  2. Implement Checkpoints: Use checkpoints to save the state of your data processing at various stages. This way, if a failure occurs, you can restart the pipeline from the last successful checkpoint rather than from scratch.

  3. Idempotent Writes: Ensure that your write operations are idempotent. This means that re-running the same operation multiple times will not result in duplicate records. You can achieve this by using upsert operations (merge) instead of insert operations.

  4. Delta Live Tables (DLT): Consider using Delta Live Tables, which provide built-in capabilities for handling incremental data processing and failure recovery. DLT can automatically manage the state of your data pipeline and ensure that only new or changed data is processed.

  5. Repair and Rerun: Utilize the "Repair and Rerun" feature in Databricks jobs. This feature allows you to rerun only the tasks that were impacted by a failure, without reprocessing the entire pipeline. This can save time and resources. You can find more details about this feature in the Databricks blog post titled "Save Time and Money on Data and ML Workflows With 'Repair and Rerun'".

rrajan
New Contributor II

Hi @Walter_C ,
Thanks for your suggestion.

Can you help me with all the possible failure scenario need to be handled while doing incremental load ? Gold table is delta only. The data coming into Gold table is from DLT silver table. We are not suppose to use the DLT in the gold layer because of complex transformations. We have create a single pyspark notebook to read the silver table and then extracted the incremental data from it based on last run timestamp. Now performing the transformation on the dataframe . Once this is done then loading the all three final objects. We have a single task so repair run is not possible here.

filipniziol
Esteemed Contributor

Hi @rrajan ,

The simplest solution is to check the max timestamp in each gold table when processing incrementally to get source data. Here's how you can handle this (this would be your source in MERGE statement with every rum:

 

SELECT * FROM silver AS s
WHERE s.last_update_timestamp 
   > (SELECT MAX(last_update_timestamp) FROM gold_table_1)

 

Similarly for the other gold tables.

This way:

  • If one gold table fails, other tables' timestamps remain unchanged
  • On retry, you'll only process records newer than what's already in each gold table
  • No need for additional tracking tables or complex logic

For more robust pipelines, you could consider:

1. A separate metadata table to avoid scanning full gold tables for timestamps

 

CREATE TABLE metadata_control (
    table_name STRING,
    last_processed_timestamp TIMESTAMP
)

 

In this scenario, when loading each gold table, you filter your silver source data based on the last processed timestamp for that specific gold table:

 

SELECT * FROM silver 
WHERE last_update_timestamp > (
    SELECT last_processed_timestamp 
    FROM metadata_control 
    WHERE table_name = 'gold_table_1'
)

 

After successful processing of each gold table, you update its timestamp in the metadata table. This ensures that if a later table fails, you won't reprocess already loaded data on retry.

2. You could also consider using Structured Streaming with separate checkpoints for each gold table as an alternative approach. This provides automatic failure handling and exactly-once guarantees. See the Databricks documentation on streaming writes for details: https://docs.databricks.com/en/structured-streaming/delta-lake.html

CamdenJacobs
New Contributor II

Thank you so much for the suggestion.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group