cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader cloudFiles.maxFilesPerTrigger ignored with .trigger(availableNow=True)?

johschmidt42
New Contributor II

Hi, 

I'm using the Auto Loader feature to read streaming data from Delta Lake files and process them in a batch. The trigger is set to availableNow to include all new data from the checkpoint offset but I limit the amount of delta files for the batch to be 10 using the cloudFiles.maxFilesPerTrigger option. However, the  `process_batch` function always reports that it receives the default 1000 files for its batch. Am I misinterpreting the options here?

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col


def process_batch(df: DataFrame, batch_id: int) -> None:
    batch_id: int = batch_id
    num_files: int = df.select("source_file").distinct().count()
    num_rows_total: int = df.count()

    print(
        f"Batch: '{batch_id}' - Processing {num_files:,} delta files with {num_rows_total:,} rows."
    )


spark_session: SparkSession = SparkSession.getActiveSession()

checkpoint_path: str = "/Volumes/checkpoint_path"
table_path: str = "/Volumes/table_path"

df: DataFrame = (
    spark_session.readStream.format(source="delta")
    .option(key="cloudFiles.format", value="delta")
    .option(key="cloudFiles.schemaLocation", value=checkpoint_path)
    .option(key="cloudFiles.maxFilesPerTrigger", value=10)
    .load(path=table_path)
    .select("*", col("_metadata.file_path").alias("source_file"))
)

df.writeStream.trigger(
    availableNow=True
).foreachBatch(func=process_batch).start()

 

4 REPLIES 4

johschmidt42
New Contributor II

It works when changing "cloudFiles.maxFilesPerTrigger" to "maxFilesPerTrigger". But this is unexpected..

p_romm
New Contributor III

Juan
New Contributor II

For others who run into this issue:

Changing `cloudFiles.maxFilesPerTrigger` to `maxFilesPerTrigger` is not the solution. Check your checkpoint state first.

If a previous Auto Loader run failed or was cancelled after files had already been discovered/planned, the checkpoint can retain that state. On a later run, changing `cloudFiles.maxFilesPerTrigger` may appear to be ignored because the stream is still working through the files that were already discovered under the previous configuration.

For example, if a run used the default/high value for `cloudFiles.maxFilesPerTrigger`, and then failed inside `foreachBatch` or was cancelled, the checkpoint may already contain a planned batch of files. If you later change the configuration to:

```python
.option("cloudFiles.maxFilesPerTrigger", "1")
```

the next run may still process the previously discovered files as one larger batch, making it look like `cloudFiles.maxFilesPerTrigger` is not being respected.

The cleanest fix is to use a new checkpoint.

If using a new checkpoint is not an option because it would cause a large historical reprocess, another workaround is:

1. Move the affected files out of the Auto Loader source path.
2. Rename them so they will be treated as new files later.
3. Run the stream once with the existing checkpoint. It should complete without processing those files.
4. Move the renamed files back into the source path.
5. Run the stream again with the desired setting, for example:

```python
.option("cloudFiles.maxFilesPerTrigger", "1")
```

At that point, Auto Loader should discover the renamed files as new files and respect the configured rate limit.

ShamenParis
New Contributor II

Hi @johschmidt42 ,

This is a great question, but the mystery actually lies in the very first line of your read configuration: spark_session.readStream.format(source="delta")

Because you are using .format("delta") instead of .format("cloudFiles"), you are actually using native Delta Structured Streaming, not Auto Loader!

Here is exactly why you saw that behavior:

  • Why cloudFiles was ignored: Spark silently ignores options that don't apply to the chosen format.

  • Why maxFilesPerTrigger worked: That is the correct, native option for controlling rate limits in a standard Delta stream.

The good news? You accidentally did it the right way! Since your source data is already in Delta format, using native Delta streaming (.format("delta")) is much more efficient than using Auto Loader (which is meant for raw files like CSV/JSON).

Option 1 : To clean up your code, you can safely remove the cloudFiles options entirely. Here is the idiomatic way to write it:

df: DataFrame = (
    spark_session.readStream
    .format("delta")
    .option("maxFilesPerTrigger", 10)
    .load(table_path)
    .select("*", col("_metadata.file_path").alias("source_file"))
)

df.writeStream \
  .trigger(availableNow=True) \
  .foreachBatch(process_batch) \
  .start()

Option 2: Reading raw files using Auto Loader

df: DataFrame = (
    spark_session.readStream
    .format("cloudFiles") # This invokes Auto Loader
    .option("cloudFiles.format", "parquet") # Must be a raw file format
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("cloudFiles.maxFilesPerTrigger", 10) # Now this works!
    .load(raw_files_path)
    .select("*", col("_metadata.file_path").alias("source_file"))
)