I am using Autoloader to ingest source data into Bronze layer Delta tables. The source files are JSON, and I rely on schema inference along with schema evolution (using mode: addNewColumns). To handle errors triggered by schema updates in the stream, I wrap the streaming query inside a while loop (code shown below). I also use query.awaitTermination() to enable sequential execution of subsequent commands and catch exceptions raised during streaming.
However, when Autoloader finishes, it raises the following error:
Some streams terminated before this command could finish! with details:
org.apache.spark.sql.catalyst.util.UnknownFieldException: [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] Encountered unknown fields during parsing, which can be fixed by an automatic retry: true.
What confuses me is:
This error appears after the streaming query has terminated (the log line following query.awaitTermination() is already printed).
It is not caught in my exception handling block.
I have already implemented logic to handle this scenario, so I’m unsure why this error is still being raised.
Could you help clarify why this error might still occur despite my handling, and why it happens after the query termination?
from pyspark.sql.functions import col
while True:
try:
df = (
spark.readStream.format("cloudFiles")
.options(**reader_options)
.load(source_path)
)
streaming_query = (
df.writeStream
.format("delta")
.options(**writer_options)
.outputMode("append")
.trigger(**set_trigger_kwargs())
.table(my_table)
)
streaming_query.awaitTermination()
# If the query terminated without raising an error, exit the loop
break
except Exception as e:
error_msg = str(e)
if "UnknownFieldException" in error_msg and "automatic retry: true" in error_msg:
print("Schema evolution detected. Retrying with updated schema...")
else:
# Log and raise other exceptions
print(f"Streaming query exception: {error_msg}")
raise e
print("Streaming job ended.")