cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader: Unexpected UnknownFieldException after streaming query termination

yit
Contributor

I am using Autoloader to ingest source data into Bronze layer Delta tables. The source files are JSON, and I rely on schema inference along with schema evolution (using mode: addNewColumns). To handle errors triggered by schema updates in the stream, I wrap the streaming query inside a while loop (code shown below). I also use query.awaitTermination() to enable sequential execution of subsequent commands and catch exceptions raised during streaming.

However, when Autoloader finishes, it raises the following error:

Some streams terminated before this command could finish! with details:
 
org.apache.spark.sql.catalyst.util.UnknownFieldException: [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] Encountered unknown fields during parsing, which can be fixed by an automatic retry: true.

What confuses me is:

  • This error appears after the streaming query has terminated (the log line following query.awaitTermination() is already printed).

  • It is not caught in my exception handling block.

  • I have already implemented logic to handle this scenario, so I’m unsure why this error is still being raised.

Could you help clarify why this error might still occur despite my handling, and why it happens after the query termination?

 

from pyspark.sql.functions import col

while True:
    try:
        df = (
            spark.readStream.format("cloudFiles")      
            .options(**reader_options)
            .load(source_path)
        )

        streaming_query = (
            df.writeStream
            .format("delta")
            .options(**writer_options)
            .outputMode("append")
            .trigger(**set_trigger_kwargs())
            .table(my_table)
        )

        streaming_query.awaitTermination()

        # If the query terminated without raising an error, exit the loop
        break

    except Exception as e:
        error_msg = str(e)
        if "UnknownFieldException" in error_msg and "automatic retry: true" in error_msg:
            print("Schema evolution detected. Retrying with updated schema...")
        else:
            # Log and raise other exceptions
            print(f"Streaming query exception: {error_msg}")
            raise e

print("Streaming job ended.")

 

2 REPLIES 2

szymon_dybczak
Esteemed Contributor III

Hi @yit ,

This is expected behaviour of Auto Loader with schema evolution enabled. Default mode is addNewColumns which causes stream fail. 
As documentation says:

"Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. Databricks recommends configuring Auto Loader streams with Lakeflow Jobs to restart automatically after such schema changes."

And since you didn't specify mode, by deafult addNewColumns will be used:

szymon_dybczak_0-1755000042822.png

 

 

 @szymon_dybczak Yes, I’m aware of that behavior and actually expect it. That’s why I’m handling it explicitly in the exception clause: 

 if "UnknownFieldException" in error_msg and "automatic retry: true" in error_msg: print("Schema evolution detected. Retrying with updated schema...")

This works perfectly - it allows the loop to continue and rerun the autoloader with the updated schema. (It actually enters many times in the exception because of this, and continues with the updated schema, so it works fine.)

What’s strange is that the upper error message ("Some streams terminated before this command could finish!") appears after the autoloader has finished running. The logs even show ‘Streaming job ended.’, but the notebook cell still displays an ERROR. This cell contains only the autoloader code.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now