cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

maxFilesPerTrigger not working while loading data from Unity Catalogue table

sanjay
Valued Contributor II

Hi,

I am using streaming on unity catalogue tables and trying to limit the number of records read in each batch. Here is my code but its not respecting maxFilesPerTrigger, instead reads all available data.

 

(spark.readStream
.option("skipChangeCommits","true")
.option("useNotifications","true")
.option("includeExistingFiles","true")
.option("allowOverwrites",True)
.option("ignoreMissingFiles",True)
.option("maxFilesPerTrigger", 2)
.table(table_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.foreachBatch(foreachBatchFunction)
.start()
.awaitTermination())
 
Appreciate any help here.
1 ACCEPTED SOLUTION

Accepted Solutions

sanjay
Valued Contributor II

am able to resolve the issue. not sure what was the issue. Its working now without any code change.

View solution in original post

5 REPLIES 5

Witold
Contributor III

How does your table structure look like, can you confirm that all your data is actually in more than just two files?

sanjay
Valued Contributor II

My table has multiple rows e.g. lets take simple table as employee with emd_id & emp_name columns and use streaming to process any updates to this table. In case there are more than 2 inserts, i want to process max 2 rows at a time

sanjay
Valued Contributor II

am able to resolve the issue. not sure what was the issue. Its working now without any code change.

Witold
Contributor III

I believe you misunderstand the fundamentals of delta tables. `maxFilesPerTrigger` has nothing to do with how many rows you will process at the same time. And if you really want to control the number of records per file, then you need to adapt the writer accordingly. Besides that fact that having two records per file is a very bad idea for multiple reasons. Depending on your load, you will end up with thousand of small files and dozens of useless history entries in the delta table log.

Of course I don't know your use case behind it, but I'm pretty sure that you don't want to process two rows at the same. Keep in mind that you're in a big data environment, and it's designed to process millions of rows at the same time.

sanjay
Valued Contributor II

Thank you Witold, 2 was just an example. I am having thousands of files coming every second and want to limit files per batch otherwise process gets struct if there are too many files in given batch.

I am able to limit the batch size while running single job but not as continuous job. 

(spark.readStream
.option("skipChangeCommits","true")
.option("useNotifications","true")
.option("includeExistingFiles","true")
.option("allowOverwrites",True)
.option("ignoreMissingFiles",True)
.option("maxFilesPerTrigger"2)
.table(table_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime="1 second")
.foreachBatch(foreachBatchFunction)
.start())

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group