03-29-2023 11:59 PM
Hi,
I am running batch job which processes incoming files. I am trying to limit number of files in each batch process so added maxFilesPerTrigger option. But its not working. It processes all incoming files at once.
(spark.readStream.format("delta").load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.option("maxFilesPerTrigger", 200)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
)
Please suggest.
Regards,
Sanjay
04-03-2023 03:46 AM
@Sanjay Jain sorry missed one thing. .trigger(once=True) doesn't support rate limiters. You can use .trigger(availableNow=True) instead.
spark.readStream.format("delta")
.option("maxFilesPerTrigger", 200)
.load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(availableNow=True)
.foreachBatch(foreachBatchFunction)
.start()
03-30-2023 01:53 AM
can you try with trigger = availablenow
03-30-2023 02:20 AM
Tried available now, but its also processing all data available for processing. I want to process in batch, max 200 files i each batch though I have 1,000 files to process.
03-30-2023 02:36 AM
ok, how do you know that 1000 files are selected?
I ask because delta lake (your source) also stores old versions of data, which will not be sent to the stream. Physically your delta lake might have 1000 files but the current state is maybe only 150 files -> 1 microbatch.
Is that possible?
03-30-2023 03:00 AM
I have send 1000 files to process in previous layer and I don't want to process all in one go. I can see all 1000 received in current batch
03-30-2023 03:15 AM
I think I found the issue.
The maxfilespertrigger option has to be set on the source, not on the sink (as you do).
Try to move the option before the load statement..
so readstream.option().load()...
03-30-2023 03:22 AM
Still getting all 1000 files.
(spark.readStream.format("delta").option("maxFilesPerTrigger", 100).load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
)
03-30-2023 03:29 AM
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(<table>)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.table(<table2>)
03-30-2023 03:33 AM
Sorry not an expert in this. But how to process my custom code..
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
03-30-2023 03:35 AM
Sorry, it is basically only the part up to load() that is important.
Also try to enter the number of files as a string instead of an int.
03-30-2023 03:46 AM
Still getting full load.
df = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(silver_path)
)
(df.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination())
03-30-2023 03:49 AM
Can you try to also set the maxFilespertrigger in the sink?
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.option("maxFilesPerTrigger", "100")
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
03-30-2023 03:51 AM
here is also a SO topic on how you can test it:
https://stackoverflow.com/questions/70134468/spark-structured-streaming-rate-limit
03-30-2023 03:52 AM
no, still getting all 1000 files
03-30-2023 03:57 AM
strange, it should work.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group