Autoloader cloudFiles.maxFilesPerTrigger ignored with .trigger(availableNow=True)?

johschmidt42
New Contributor III

Hi, 

I'm using the Auto Loader feature to read streaming data from Delta Lake files and process them in a batch. The trigger is set to availableNow to include all new data from the checkpoint offset but I limit the amount of delta files for the batch to be 10 using the cloudFiles.maxFilesPerTrigger option. However, the  `process_batch` function always reports that it receives the default 1000 files for its batch. Am I misinterpreting the options here?

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col


def process_batch(df: DataFrame, batch_id: int) -> None:
    batch_id: int = batch_id
    num_files: int = df.select("source_file").distinct().count()
    num_rows_total: int = df.count()

    print(
        f"Batch: '{batch_id}' - Processing {num_files:,} delta files with {num_rows_total:,} rows."
    )


spark_session: SparkSession = SparkSession.getActiveSession()

checkpoint_path: str = "/Volumes/checkpoint_path"
table_path: str = "/Volumes/table_path"

df: DataFrame = (
    spark_session.readStream.format(source="delta")
    .option(key="cloudFiles.format", value="delta")
    .option(key="cloudFiles.schemaLocation", value=checkpoint_path)
    .option(key="cloudFiles.maxFilesPerTrigger", value=10)
    .load(path=table_path)
    .select("*", col("_metadata.file_path").alias("source_file"))
)

df.writeStream.trigger(
    availableNow=True
).foreachBatch(func=process_batch).start()