cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables Incremental Batch Loads & Failure Recovery

Valentin1
New Contributor III

Hello Databricks community,

I'm working on a pipeline and would like to implement a common use case using Delta Live Tables. The pipeline should include the following steps:

  1. Incrementally load data from Table A as a batch.
  2. If the pipeline has previously failed, include older batches that were not processed due to the failure.
  3. Perform some transformations or processing on the data.
  4. Write the output to a destination table.

The motivation behind this implementation is to handle new data as a Spark batch because Spark Streaming does not support many commonly required aggregations. Additionally, this approach is intended to handle pipeline failures that may arise due to new deployments or unexpected changes in the data. These changes can potentially break the transformations or processing on the data, resulting in downtime. After deploying fixes, the pipeline should recover by loading and processing failed batches without recomputing everything historically. This recovery mechanism helps avoid incurring huge costs when dealing with large volumes of data.

I am seeking guidance on the best practices for implementing this scenario using Delta Live Tables. In particular, how can I ensure that the pipeline correctly handles previously failed batches and processes them along with new data, while also providing a robust recovery mechanism?

Any help or insights would be greatly appreciated!

Thank you in advance!

6 REPLIES 6

Anonymous
Not applicable

@Valentin Rosca​ :

Delta Live Tables can be used to implement the scenario you described in the following way:

  1. Incrementally load data from Table A as a batch: You can use Delta Live Tables' built-in capabilities for reading data from Delta tables, including support for incremental loading. You can specify the batch mode while reading data from Table A using the readStream method in Delta Live Tables, and configure the batch mode settings such as the maximum number of rows per batch, the maximum duration per batch, and the mode of data ordering.
  2. Handling previously failed batches: Delta Live Tables allows you to specify the start position for reading data from a Delta table using the startingVersion option. You can use this option to specify the version or timestamp of the previously failed batch as the start position for reading data. This ensures that the failed batch is included in the data being processed in the current run of the pipeline.
  3. Performing transformations or processing on the data: Once you have loaded the data from Table A as a batch, you can use Delta Live Tables' DataFrame API to perform any required transformations or processing on the data. You can apply Spark operations such as filtering, aggregations, joins, and custom functions to transform the data as needed.
  4. Writing output to a destination table: After processing the data, you can use Delta Live Tables' capabilities for writing data to Delta tables to write the output to a destination table. You can specify the mode of writing data, such as overwrite, append, or ignore, depending on the requirements of your pipeline.
  5. Recovery mechanism: Delta Live Tables provides built-in recovery mechanisms for handling failures during data processing. If an error occurs during data processing, Delta Live Tables automatically retries the failed batch according to the configured retry settings. You can configure the retry settings, such as the maximum number of retries and the delay between retries, to suit your needs. Additionally, Delta Live Tables allows you to monitor the progress of your pipeline, view the status of each batch, and track any failures or errors using the Live Tables UI or through the Delta Live Tables REST API.

By following these best practices and utilizing the features of Delta Live Tables, you can implement a robust pipeline that handles new data as a Spark batch, recovers from failures, and provides reliable data processing for your use case.

Valentin1
New Contributor III
  1. Incrementally load data from Table A as a batch: You can use Delta Live Tables' built-in capabilities for reading data from Delta tables, including support for incremental loading. -> Which build-in capabilities are you talking about? From my understanding of the documentation, the only way you can load the table incrementally is by using readStream (you said it so yourself: readStream method in Delta Live Tables), and that provides a streaming DataFrame, not batch. As foreachbatch cannot be called in delta live tables, the only way is to use @apply_change_data and accumulate it in another table and then use it from there. Is there another way? Can you provide an example of how to load incremental data as a batch? You also mentioned configure the batch mode settings such as the maximum number of rows per batch, the maximum duration per batch, and the mode of data ordering, but this only configures settings for micro-batches in streaming, it does not allow us to handle the streaming pipeline at once or do transformations per micro batch using batch spark operations.
  2. Handling previously failed batches: Delta Live Tables allows you to specify the start position for reading data from a Delta table using the startingVersion option. <- For this you need to build your own check-pointing logic. This also means that you create a circular dependency as at the end of the pipeline you have to write somewhere the version of the last successful run, then load it at startup. You also have to somehow involve the TRUNCATE operation if you are using @apply_change_data so that we can reset incremental loads if pipeline was successful. This logic is very cumbersome to implement (I am currently testing this as a solution to what I want to do, but this seems more like a hack than a proper solution. If for any reason you want to run the pipelines on a different start/end versions of the input tables you will lose the accumulated data. There might be workarounds, but I get the feeling that I am just hacking around limitations in delta live tables)
  3. Yea. This we know. Problem is that if in the previous pipeline you receive a streaming DataFrame, you are limited in what aggregation you can do, either by memory limitation if historical computation is required (watermark will be big) or just by the fact that any aggregation in streaming requires a partitioning by time, limiting what you can do.
  4. You are very limited in what output modes you can actually use as in delta live tables you shouldn't call write or writeStream directly. We do have a lot of upserts into our pipeline logic so that we only care about rows and aggregations which change. As @apply_change_data does not allow custom merge (or even at least just detecting that a row has not changed), you have to join with the existing data yourself and basically reimplement the upsert functionality yourself. This amounts to you basically rebuilding the wheel that Databricks already built.
  5. Do checkpoints in streams get rolled back if a task has failed? The recovery mechanism mentioned is just a retry mechanism, not a recovery one. Recovery mechanism should support rollback, not only retry. From my understanding of the documentation, there is no rollback mechanism in the pipelines in case of failure, but I can be wrong about this. Is there a part in the documentation that says anithing about rolling back checkpoints and data?

By following these best practices and utilizing the features of Delta Live Tables, you can implement a robust pipeline that handles new data as a Spark batch, recovers from failures, and provides reliable data processing for your use case. -> We would love to but it seems we are very limited in what we can do with best practices alone, especially if the documentation lacks proper examples for said best practices.

Anonymous
Not applicable

Hi @Valentin Rosca​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Valentin1
New Contributor III

Hello, no, I did find a solution to this (I am experimenting with something for now) and there is no answer yet. More help is always welcome.

Valentin1
New Contributor III

Some ways to do this is if you build your own checkpointing logic and ​load data based on your own updated_at or similar field / delta versions and readChangeFeed, although the latter I did not test yet. The checkpointing logic should be added to dlt by reading the dependencies with limit 0, so better stick to non-DLT implementations if you require this as this is a hack. If I will continue with this route I will make sure to provide some code as well.

lprevost
Contributor

I totally agree that this is a gap in the Databricks solution.  This gap exists between a static read and real time streaming.   My problem (and suspect there are many use cases) is that I have slowly changing data coming into structured folders via csv that are updated monthly.    Most of the documentation is either for a static read of that or overly complex stream processing using watermarks, batch processing, etc in minutes, seconds, or real time.   

With that said, here are a few thoughts for you:

  1. DBRX medallion architecture and DLT provides a useful model where you use autoloader to incrementally stream to a delta table.  I have found that if you don't do too much transform to it, it makes it easier to do subsequent transforms or aggregations in the subsequent silver or gold tables.
  2. you are correct there are many limitations for what you can do with streaming aggregations.  But, I have found that one can do a LOT with streaming aggregations using groupby.agg.   Yes, you are limited with distinct counts but counts and other aggs work.  Rather than count distinct, you can use apporoximate_count_distinct which may work fine for some workflows.     I use approximate_count_distinct and then use a test process to ensure that the count is withing some tolerance level (ie. 10%).   
  3. I have found you can do most anything you need on streaming transforms.   And dlt takes care of state processing for you.
  4. Untested thoughts:   I believe there may be some ways to do this using either watermarking (my watermark would be day or month, not "10 minutes" though.   Or doing this using forEach or forEachBatch processing where you basically write a sink handler for each batch.   The idempotent table writes are interesting to me (subset of foreachBatch).  Perhaps that may be the key?  https://docs.databricks.com/en/structured-streaming/delta-lake.html#idempotent-table-writes-in-forea...
  5. To your idea of building your own checkpoint logic -- Although DBRX cautions against using for loops as this defeats the purpose of parallel processing, I've considered building (have not tested this) my own state machine such that:
    1. For the source data, I filter it on each window and then write to my delta table if that window name doesn't exist in the table  (ie. window1, window2, window3).   
    2. This could possibly be done in conjunction with item 4?
  6. The materialized views are a way in the medallion architecture to do any aggregation on a streaming table.   Just stream it to bronze level table when you are loading it.  Then read it (vs. readStream) and do your aggs.   This rebuilds the agg each time but this is in background.    

Just my two cents.

Lee

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group