cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to limit number of files in each batch in streaming batch processing

sanjay
Valued Contributor II

Hi,

I am running batch job which processes incoming files. I am trying to limit number of files in each batch process so added maxFilesPerTrigger option. But its not working. It processes all incoming files at once.

(spark.readStream.format("delta").load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.option("maxFilesPerTrigger", 200)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

)

Please suggest.

Regards,

Sanjay

1 ACCEPTED SOLUTION

Accepted Solutions

Sandeep
Contributor III

@Sanjay Jain​ sorry missed one thing. .trigger(once=True)​ doesn't support rate limiters. You can use .trigger(availableNow=True)​ instead.

ref: https://docs.databricks.com/structured-streaming/triggers.html#configuring-incremental-batch-process...

spark.readStream.format("delta")

.option("maxFilesPerTrigger", 200)

.load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(availableNow=True)

.foreachBatch(foreachBatchFunction)

.start()

View solution in original post

20 REPLIES 20

-werners-
Esteemed Contributor III

can you try with trigger = availablenow

sanjay
Valued Contributor II

Tried available now, but its also processing all data available for processing. I want to process in batch, max 200 files i each batch though I have 1,000 files to process.

-werners-
Esteemed Contributor III

ok, how do you know that 1000 files are selected?

I ask because delta lake (your source) also stores old versions of data, which will not be sent to the stream. Physically your delta lake might have 1000 files but the current state is maybe only 150 files -> 1 microbatch.

Is that possible?

sanjay
Valued Contributor II

I have send 1000 files to process in previous layer and I don't want to process all in one go. I can see all 1000 received in current batch

-werners-
Esteemed Contributor III

I think I found the issue.

The maxfilespertrigger option has to be set on the source, not on the sink (as you do).

Try to move the option before the load statement..

so readstream.option().load()...

sanjay
Valued Contributor II

Still getting all 1000 files.

(spark.readStream.format("delta").option("maxFilesPerTrigger", 100).load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

)

-werners-
Esteemed Contributor III

spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(<table>)

.writeStream

.format("delta")

.outputMode("append")

.option("checkpointLocation", "...")

.table(<table2>)

sanjay
Valued Contributor II

Sorry not an expert in this. But how to process my custom code..

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

-werners-
Esteemed Contributor III

Sorry, it is basically only the part up to load() that is important.

Also try to enter the number of files as a string instead of an int.

sanjay
Valued Contributor II

Still getting full load.

df = (spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(silver_path)

)

(df.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination())

-werners-
Esteemed Contributor III

Can you try to also set the maxFilespertrigger in the sink?

spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.option("maxFilesPerTrigger", "100")

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

-werners-
Esteemed Contributor III

sanjay
Valued Contributor II

no, still getting all 1000 files

-werners-
Esteemed Contributor III

strange, it should work.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group