03-17-2025 05:26 AM
Hi,
I'm using the Auto Loader feature to read streaming data from Delta Lake files and process them in a batch. The trigger is set to availableNow to include all new data from the checkpoint offset but I limit the amount of delta files for the batch to be 10 using the cloudFiles.maxFilesPerTrigger option. However, the `process_batch` function always reports that it receives the default 1000 files for its batch. Am I misinterpreting the options here?
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
def process_batch(df: DataFrame, batch_id: int) -> None:
batch_id: int = batch_id
num_files: int = df.select("source_file").distinct().count()
num_rows_total: int = df.count()
print(
f"Batch: '{batch_id}' - Processing {num_files:,} delta files with {num_rows_total:,} rows."
)
spark_session: SparkSession = SparkSession.getActiveSession()
checkpoint_path: str = "/Volumes/checkpoint_path"
table_path: str = "/Volumes/table_path"
df: DataFrame = (
spark_session.readStream.format(source="delta")
.option(key="cloudFiles.format", value="delta")
.option(key="cloudFiles.schemaLocation", value=checkpoint_path)
.option(key="cloudFiles.maxFilesPerTrigger", value=10)
.load(path=table_path)
.select("*", col("_metadata.file_path").alias("source_file"))
)
df.writeStream.trigger(
availableNow=True
).foreachBatch(func=process_batch).start()
03-17-2025 01:55 PM
It works when changing "cloudFiles.maxFilesPerTrigger" to "maxFilesPerTrigger". But this is unexpected..
03-26-2025 08:40 AM
In doc it is: "cloudFiles.maxFilesPerTrigger" 😕
https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/options
yesterday
For others who run into this issue:
Changing `cloudFiles.maxFilesPerTrigger` to `maxFilesPerTrigger` is not the solution. Check your checkpoint state first.
If a previous Auto Loader run failed or was cancelled after files had already been discovered/planned, the checkpoint can retain that state. On a later run, changing `cloudFiles.maxFilesPerTrigger` may appear to be ignored because the stream is still working through the files that were already discovered under the previous configuration.
For example, if a run used the default/high value for `cloudFiles.maxFilesPerTrigger`, and then failed inside `foreachBatch` or was cancelled, the checkpoint may already contain a planned batch of files. If you later change the configuration to:
```python
.option("cloudFiles.maxFilesPerTrigger", "1")
```
the next run may still process the previously discovered files as one larger batch, making it look like `cloudFiles.maxFilesPerTrigger` is not being respected.
The cleanest fix is to use a new checkpoint.
If using a new checkpoint is not an option because it would cause a large historical reprocess, another workaround is:
1. Move the affected files out of the Auto Loader source path.
2. Rename them so they will be treated as new files later.
3. Run the stream once with the existing checkpoint. It should complete without processing those files.
4. Move the renamed files back into the source path.
5. Run the stream again with the desired setting, for example:
```python
.option("cloudFiles.maxFilesPerTrigger", "1")
```
At that point, Auto Loader should discover the renamed files as new files and respect the configured rate limit.
yesterday
Hi @johschmidt42 ,
This is a great question, but the mystery actually lies in the very first line of your read configuration: spark_session.readStream.format(source="delta")
Because you are using .format("delta") instead of .format("cloudFiles"), you are actually using native Delta Structured Streaming, not Auto Loader!
Here is exactly why you saw that behavior:
Why cloudFiles was ignored: Spark silently ignores options that don't apply to the chosen format.
Why maxFilesPerTrigger worked: That is the correct, native option for controlling rate limits in a standard Delta stream.
The good news? You accidentally did it the right way! Since your source data is already in Delta format, using native Delta streaming (.format("delta")) is much more efficient than using Auto Loader (which is meant for raw files like CSV/JSON).
Option 1 : To clean up your code, you can safely remove the cloudFiles options entirely. Here is the idiomatic way to write it:
df: DataFrame = (
spark_session.readStream
.format("delta")
.option("maxFilesPerTrigger", 10)
.load(table_path)
.select("*", col("_metadata.file_path").alias("source_file"))
)
df.writeStream \
.trigger(availableNow=True) \
.foreachBatch(process_batch) \
.start()Option 2: Reading raw files using Auto Loader
df: DataFrame = (
spark_session.readStream
.format("cloudFiles") # This invokes Auto Loader
.option("cloudFiles.format", "parquet") # Must be a raw file format
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.maxFilesPerTrigger", 10) # Now this works!
.load(raw_files_path)
.select("*", col("_metadata.file_path").alias("source_file"))
)