cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

how to process a streaming lakeflow declarative pipeline in batches

Michaล‚
Visitor

Hi, 

I've got a problem and I have run out of ideas as to what else I can try. Maybe you can help? 

I've got a delta table with hundreds millions of records on which I have to perform relatively expensive operations. I'd like to be able to process some of the records, stop the process, then restart it from where it left - the usual streaming thing. My problem is, that it appears that the first time the pipeline runs, it has to process successfully all of the records for any of the outputs to be persisted in the target table. 

I tried setting spark options, limiting max number of files to read, max data to read, always with the same behaviour - that all or nothing processing on the first run. 

Could you point me to a reliable resource documenting how to control batch size of lakeflow declarative pipelines?

2 REPLIES 2

szymon_dybczak
Esteemed Contributor III

Hi @Michaล‚ ,

I'm assuming you're talking about streaming table here. I think what you're trying to achieve is not possible. It's not limitation of lakeflow declarative pipelines per se but rather how spark structured streaming works. All rows will be processed on first run and you can influence micro-batch size  with options such as maxBytesPerTrigger or maxFilesPerTrigger.


So, you can't set it up like this:

- load only 10 files and shutdown

- then on the next load consume another 10 files and shutdown

What you can do is to setup size of a micro-batch to i.e 10 files per tirgger:

- all available data will be consumed and at most 10 file will be an input for micro-batch

One more note here. In the case of a declarative pipeline, the remaining question for verification is whether in declarative pipelines you can set properties such as maxFilesPerTrigger or maxBytesPerTrigger. Since it is a declarative framework, it is possible that in this case the options are limited

Michaล‚
Visitor

Thanks @szymon_dybczak. From my experiments so far, you can set `maxFilesPerTrigger`, `maxBytesPerTrigger` and other settings in both Python and SQL code when you declare streaming tables in declarative pipelines.,However, I don't see any evidence they are actually taken into account. 

The way I read Structured Streaming Programming Guide - Spark 4.0.0 Documentation was that in case of a failure (I'm not stopping the runs manually, it happens as a result of a failure) the progress of micro batches processed to date should be preserved. Running individual micro-batches was a way to test it and replicate a problem I had in our production system. But your explanation makes sense, and also I have run some local spark tests to better understand the behaviour. Thanks.

So perhaps my question should have been about checkpoints. I assumed that declarative pipelines are using checkpoints behind the scenes - I don't think we can set them explicitly in declarative pipelines - but the behaviour of my pipelines suggests they are not set or not working as I would expect. If I have a pipeline that runs for hours, and eventually fails, I end up with no data in my sink tables despite the first task in the pipeline have read by that point hundreds of GB of data, and the fact that the processing is relatively simply, one row at a time, no aggregation, no ordering, nothing that would require to analyse the full source before writing. What am I missing? 

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now