โ03-29-2023 11:59 PM
Hi,
I am running batch job which processes incoming files. I am trying to limit number of files in each batch process so added maxFilesPerTrigger option. But its not working. It processes all incoming files at once.
(spark.readStream.format("delta").load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.option("maxFilesPerTrigger", 200)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
)
Please suggest.
Regards,
Sanjay
โ04-03-2023 03:46 AM
@Sanjay Jainโ sorry missed one thing. .trigger(once=True)โ doesn't support rate limiters. You can use .trigger(availableNow=True)โ instead.
spark.readStream.format("delta")
.option("maxFilesPerTrigger", 200)
.load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(availableNow=True)
.foreachBatch(foreachBatchFunction)
.start()
โ03-30-2023 01:53 AM
can you try with trigger = availablenow
โ03-30-2023 02:20 AM
Tried available now, but its also processing all data available for processing. I want to process in batch, max 200 files i each batch though I have 1,000 files to process.
โ03-30-2023 02:36 AM
ok, how do you know that 1000 files are selected?
I ask because delta lake (your source) also stores old versions of data, which will not be sent to the stream. Physically your delta lake might have 1000 files but the current state is maybe only 150 files -> 1 microbatch.
Is that possible?
โ03-30-2023 03:00 AM
I have send 1000 files to process in previous layer and I don't want to process all in one go. I can see all 1000 received in current batch
โ03-30-2023 03:15 AM
I think I found the issue.
The maxfilespertrigger option has to be set on the source, not on the sink (as you do).
Try to move the option before the load statement..
so readstream.option().load()...
โ03-30-2023 03:22 AM
Still getting all 1000 files.
(spark.readStream.format("delta").option("maxFilesPerTrigger", 100).load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
)
โ03-30-2023 03:29 AM
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(<table>)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "...")
.table(<table2>)
โ03-30-2023 03:33 AM
Sorry not an expert in this. But how to process my custom code..
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination()
โ03-30-2023 03:35 AM
Sorry, it is basically only the part up to load() that is important.
Also try to enter the number of files as a string instead of an int.
โ03-30-2023 03:46 AM
Still getting full load.
df = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(silver_path)
)
(df.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination())
โ03-30-2023 03:49 AM
Can you try to also set the maxFilespertrigger in the sink?
spark.readStream.format("delta")
.option("maxFilesPerTrigger", "100")
.load(silver_path)
.writeStream
.option("checkpointLocation", gold_checkpoint_path)
.option("maxFilesPerTrigger", "100")
.trigger(once=True)
.foreachBatch(foreachBatchFunction)
.start()
โ03-30-2023 03:51 AM
here is also a SO topic on how you can test it:
https://stackoverflow.com/questions/70134468/spark-structured-streaming-rate-limit
โ03-30-2023 03:52 AM
no, still getting all 1000 files
โ03-30-2023 03:57 AM
strange, it should work.