Hi everyone,
I am seeing an unexpected behavior with Auto Loader when using Managed File Events on Classic Compute.
The error message itself seems inconsistent with the behavior I am seeing:
[FAILED_READ_FILE.DBR_FILE_NOT_EXIST] Error while reading file ../TEMP_FILE.snappy.parquet. [CLOUD_FILE_SOURCE_FILE_NOT_FOUND] TEMP_FILE.snappy.parquet but it does not exist anymore. Please ensure that files are not deleted before they are processed. To continue your stream, you can set the Spark SQL configuration spark.sql.files.ignoreMissingFiles to true.
In this case, even after applying the configuration suggested in the exception message, the error still persists. This seems inconsistent, since the behavior does not change even with ignoreMissingFiles set to true.
I tested the setting in all of the following ways:
- ignoreMissingFiles = true in the Auto Loader options
- spark.sql.files.ignoreMissingFiles set at notebook level with spark.conf.set(...)
- spark.sql.files.ignoreMissingFiles also configured directly in the cluster Spark config
In my test, I am using Managed File Events with both:
- cloudFiles.format = binaryFile
- cloudFiles.format = parquet
and the same error happens in both cases.
The test flow is basically this:
- Run Auto Loader successfully
- Create a new parquet file in the monitored folder
- Wait a few seconds so the event can be picked up
- Delete the file
- Run the same query again with the same checkpoint
I expected the second run to ignore the missing file, but on Classic Compute it fails with a missing file error.
I tested this on Classic Compute with:
- DBR 18.2 Beta
- DBR 18.1
- DBR 17.3 LTS
I also tested both with Photon enabled and without Photon, and the behavior was the same in all cases.
For comparison, I ran the same test on:
- Serverless Compute v4
- Serverless Compute v5
and in Serverless it works as expected. The missing file is ignored.
So at the moment, this seems to happen only on Classic Compute, and only in this setup with Managed File Events.
Correct handling of file deletion is also a requirement for our data platform, so this behavior is important for us.
Below is the code used in the tests.
Has anyone seen this before?
Is this a known limitation, or does it look like a bug?
Any feedback would be greatly appreciated. Thanks!
import time
from datetime import datetime
from pyspark.sql import functions as F
# ------------------------------------------------------------------------------
# Test purpose
# ------------------------------------------------------------------------------
# This script reproduces an issue where Auto Loader detects a new file event,
# but the file is deleted before the next execution.
#
# Expected behavior:
# Auto Loader should ignore the missing file because ignoreMissingFiles=true.
#
# Observed behavior:
# The second execution fails on classic compute when processing the deleted file.
# ------------------------------------------------------------------------------
# Base paths
ROOT_PATH = "/Volumes/devqa_catalog/staging/v_staging/temp/autoloader_ignore_missing_file_issue"
SOURCE_PATH = f"{ROOT_PATH}/parquet_files"
CHECKPOINT_PATH = f"{ROOT_PATH}/checkpoints"
SCHEMA_PATH = f"{ROOT_PATH}/schema"
# Source file used to generate a new file event
SOURCE_FILE_NAME = "LOAD0001.snappy.parquet"
SOURCE_FILE_PATH = f"{SOURCE_PATH}/{SOURCE_FILE_NAME}"
# Output table used only to persist the processed metadata
OUTPUT_TABLE = "operations.autoloader_ignore_missing_file_issue"
def cleanup_test_artifacts():
"""Removes test table and streaming state from previous executions."""
print("Cleaning up previous test artifacts...")
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_TABLE}")
dbutils.fs.rm(CHECKPOINT_PATH, recurse=True)
dbutils.fs.rm(SCHEMA_PATH, recurse=True)
print("Cleanup finished.")
def run_autoloader():
"""
Runs Auto Loader once using availableNow=True.
It reads the source folder with cloudFiles + binaryFile and appends the
discovered file metadata directly into a Delta table.
"""
print("Starting Auto Loader run...")
stream_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("cloudFiles.useManagedFileEvents", "true")
.option("cloudFiles.schemaLocation", SCHEMA_PATH)
.option("cloudFiles.includeExistingFiles", "true")
.option("ignoreMissingFiles", "true")
.option("cloudFiles.maxFilesPerTrigger", "1")
.load(SOURCE_PATH)
.select(
F.col("path"),
F.col("modificationTime"),
F.col("length")
)
)
query = (
stream_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_PATH)
.trigger(availableNow=True)
.toTable(OUTPUT_TABLE)
)
query.awaitTermination()
print("Auto Loader run finished.")
def create_then_delete_temp_parquet_file(wait_seconds=5):
"""
Creates a temporary parquet file by copying an existing file, waits for a few
seconds so the file event can be registered, and then deletes the file.
This simulates a file arrival followed by deletion before the next run.
"""
print("Creating a temporary parquet file and deleting it shortly after...")
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
temp_file_name = f"TEMP_{SOURCE_FILE_NAME.split('.')[0]}_{timestamp}.snappy.parquet"
temp_file_path = f"{SOURCE_PATH}/{temp_file_name}"
dbutils.fs.cp(SOURCE_FILE_PATH, temp_file_path)
print(f"Temporary file created: {temp_file_path}")
time.sleep(wait_seconds)
dbutils.fs.rm(temp_file_path)
print(f"Temporary file deleted: {temp_file_path}")
def main():
# Step 1: Reset previous state
cleanup_test_artifacts()
# Step 2: Initial run
# Expected: succeeds without errors
run_autoloader()
# Step 3: Create a new file event, then remove the file
create_then_delete_temp_parquet_file(wait_seconds=5)
# Step 4: Run Auto Loader again
# Expected: missing file should be ignored because ignoreMissingFiles=true
# Observed: this fails on classic compute
time.sleep(5)
run_autoloader()
print("Test completed.")
main()