Autoloader cloudFiles.maxFilesPerTrigger ignored with .trigger(availableNow=True)?
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday
Hi,
I'm using the Auto Loader feature to read streaming data from Delta Lake files and process them in a batch. The trigger is set to availableNow to include all new data from the checkpoint offset but I limit the amount of delta files for the batch to be 10 using the cloudFiles.maxFilesPerTrigger option. However, the `process_batch` function always reports that it receives the default 1000 files for its batch. Am I misinterpreting the options here?
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
def process_batch(df: DataFrame, batch_id: int) -> None:
batch_id: int = batch_id
num_files: int = df.select("source_file").distinct().count()
num_rows_total: int = df.count()
print(
f"Batch: '{batch_id}' - Processing {num_files:,} delta files with {num_rows_total:,} rows."
)
spark_session: SparkSession = SparkSession.getActiveSession()
checkpoint_path: str = "/Volumes/checkpoint_path"
table_path: str = "/Volumes/table_path"
df: DataFrame = (
spark_session.readStream.format(source="delta")
.option(key="cloudFiles.format", value="delta")
.option(key="cloudFiles.schemaLocation", value=checkpoint_path)
.option(key="cloudFiles.maxFilesPerTrigger", value=10)
.load(path=table_path)
.select("*", col("_metadata.file_path").alias("source_file"))
)
df.writeStream.trigger(
availableNow=True
).foreachBatch(func=process_batch).start()
1 REPLY 1
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday
It works when changing "cloudFiles.maxFilesPerTrigger" to "maxFilesPerTrigger". But this is unexpected..

