cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

how to process a streaming lakeflow declarative pipeline in batches

Michaล‚
New Contributor III

Hi, 

I've got a problem and I have run out of ideas as to what else I can try. Maybe you can help? 

I've got a delta table with hundreds millions of records on which I have to perform relatively expensive operations. I'd like to be able to process some of the records, stop the process, then restart it from where it left - the usual streaming thing. My problem is, that it appears that the first time the pipeline runs, it has to process successfully all of the records for any of the outputs to be persisted in the target table. 

I tried setting spark options, limiting max number of files to read, max data to read, always with the same behaviour - that all or nothing processing on the first run. 

Could you point me to a reliable resource documenting how to control batch size of lakeflow declarative pipelines?

5 REPLIES 5

szymon_dybczak
Esteemed Contributor III

Hi @Michaล‚ ,

I'm assuming you're talking about streaming table here. I think what you're trying to achieve is not possible. It's not limitation of lakeflow declarative pipelines per se but rather how spark structured streaming works. All rows will be processed on first run and you can influence micro-batch size  with options such as maxBytesPerTrigger or maxFilesPerTrigger.


So, you can't set it up like this:

- load only 10 files and shutdown

- then on the next load consume another 10 files and shutdown

What you can do is to setup size of a micro-batch to i.e 10 files per tirgger:

- all available data will be consumed and at most 10 file will be an input for micro-batch

One more note here. In the case of a declarative pipeline, the remaining question for verification is whether in declarative pipelines you can set properties such as maxFilesPerTrigger or maxBytesPerTrigger. Since it is a declarative framework, it is possible that in this case the options are limited

Michaล‚
New Contributor III

Thanks @szymon_dybczak. From my experiments so far, you can set `maxFilesPerTrigger`, `maxBytesPerTrigger` and other settings in both Python and SQL code when you declare streaming tables in declarative pipelines.,However, I don't see any evidence they are actually taken into account. 

The way I read Structured Streaming Programming Guide - Spark 4.0.0 Documentation was that in case of a failure (I'm not stopping the runs manually, it happens as a result of a failure) the progress of micro batches processed to date should be preserved. Running individual micro-batches was a way to test it and replicate a problem I had in our production system. But your explanation makes sense, and also I have run some local spark tests to better understand the behaviour. Thanks.

So perhaps my question should have been about checkpoints. I assumed that declarative pipelines are using checkpoints behind the scenes - I don't think we can set them explicitly in declarative pipelines - but the behaviour of my pipelines suggests they are not set or not working as I would expect. If I have a pipeline that runs for hours, and eventually fails, I end up with no data in my sink tables despite the first task in the pipeline have read by that point hundreds of GB of data, and the fact that the processing is relatively simply, one row at a time, no aggregation, no ordering, nothing that would require to analyse the full source before writing. What am I missing? 

 

mmayorga
Databricks Employee
Databricks Employee

Hi @Michaล‚ ,

One detail/feature to consider when working with Declarative Pipelines is that they manage and auto-tune configuration aspects, including rate limiting (maxBytesPerTrigger or maxFilesPerTrigger). Perhaps that's why you could not see this configured behavior; outside Declarative Pipelines, it should be fine to do so.

Yes, Declarative Pipelines use checkpoints for each flow writing to streaming tables. Unfortunately, these are not accessible as the service handles them for usHere are more details about "Recover a Lakeflow Declarative Pipelines from streaming checkpoint failure.

Also, remember that with Declarative Pipelines, you get an event_log table, which you can configure in the advanced settings. This will get you further details about the specific flow between your tables and more (auditing, quality, etc). Hopefully, this can help you determine what happened with your pipeline on your first step/task. Perhaps the data is not following your schema, even with that minimal processing that you mentioned? 

We recommend following the medallion architecture, where the bronze table(raw data) will allow you to examine the data as it comes, then apply transformations into the silver layer and capture these possible issues.

I hope this helps!

 

Michaล‚
New Contributor III

Thank you both for the answers. 

@mmayorga the data is fine, architecture is fine. After thinking about it, despite how I phrased the initial question, what I appear to be missing, is to do something better than all or nothing approach during initial processing of hundreds of millions of rows from a streaming source. 

Regardless what the problem is, when I'm processing for example 400 million rows, and after a few days of processing there is a problem with processing row 399,999,123 I'd like an ability to fix the problem and restart processing, for example from row 399,000,001 rather from the very beginning. Is there a way to do it? 

mmayorga
Databricks Employee
Databricks Employee

hi @Michaล‚ 

with Autoloader is designed to provide that exactly-once ingestion and can resume processing from the last successful checkpoint after a failure happens without starting from the beginning. So when you restart the stream it continues where it left off (unless you run a full refresh); just make sure to define your "checkpointLocation" option.

As far as I understand, there is no support for row-level restarts, as an alternative you can leverage the option "modifiedAfter". More details can be found in the Recovery Options in the documentation.

I hope this helps! 

Thank you!