cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to limit number of files in each batch in streaming batch processing

sanjay
Valued Contributor II

Hi,

I am running batch job which processes incoming files. I am trying to limit number of files in each batch process so added maxFilesPerTrigger option. But its not working. It processes all incoming files at once.

(spark.readStream.format("delta").load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.option("maxFilesPerTrigger", 200)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

)

Please suggest.

Regards,

Sanjay

1 ACCEPTED SOLUTION

Accepted Solutions

Sandeep
Contributor III

@Sanjay Jain​ sorry missed one thing. .trigger(once=True)​ doesn't support rate limiters. You can use .trigger(availableNow=True)​ instead.

ref: https://docs.databricks.com/structured-streaming/triggers.html#configuring-incremental-batch-process...

spark.readStream.format("delta")

.option("maxFilesPerTrigger", 200)

.load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(availableNow=True)

.foreachBatch(foreachBatchFunction)

.start()

View solution in original post

20 REPLIES 20

-werners-
Esteemed Contributor III

can you try with trigger = availablenow

sanjay
Valued Contributor II

Tried available now, but its also processing all data available for processing. I want to process in batch, max 200 files i each batch though I have 1,000 files to process.

-werners-
Esteemed Contributor III

ok, how do you know that 1000 files are selected?

I ask because delta lake (your source) also stores old versions of data, which will not be sent to the stream. Physically your delta lake might have 1000 files but the current state is maybe only 150 files -> 1 microbatch.

Is that possible?

sanjay
Valued Contributor II

I have send 1000 files to process in previous layer and I don't want to process all in one go. I can see all 1000 received in current batch

-werners-
Esteemed Contributor III

I think I found the issue.

The maxfilespertrigger option has to be set on the source, not on the sink (as you do).

Try to move the option before the load statement..

so readstream.option().load()...

sanjay
Valued Contributor II

Still getting all 1000 files.

(spark.readStream.format("delta").option("maxFilesPerTrigger", 100).load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

)

-werners-
Esteemed Contributor III

spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(<table>)

.writeStream

.format("delta")

.outputMode("append")

.option("checkpointLocation", "...")

.table(<table2>)

sanjay
Valued Contributor II

Sorry not an expert in this. But how to process my custom code..

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination()

-werners-
Esteemed Contributor III

Sorry, it is basically only the part up to load() that is important.

Also try to enter the number of files as a string instead of an int.

sanjay
Valued Contributor II

Still getting full load.

df = (spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(silver_path)

)

(df.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

.awaitTermination())

-werners-
Esteemed Contributor III

Can you try to also set the maxFilespertrigger in the sink?

spark.readStream.format("delta")

.option("maxFilesPerTrigger", "100")

.load(silver_path)

.writeStream

.option("checkpointLocation", gold_checkpoint_path)

.option("maxFilesPerTrigger", "100")

.trigger(once=True)

.foreachBatch(foreachBatchFunction)

.start()

-werners-
Esteemed Contributor III

sanjay
Valued Contributor II

no, still getting all 1000 files

-werners-
Esteemed Contributor III

strange, it should work.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.