cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Workflow Automatically Marked as Failed When Autoloader Stream Fails in a Task

r_g_s_cn
New Contributor II

Issue: I want my Databricks Task/Workflow, which is running a pytest test, to not be automatically marked as "Failed" when an Autoloader stream shuts down due to an issue. It seems that if an Autoloader / Structured Streaming stream fails, it will automatically mark the whole Databricks Task as Failed, even if the failure of the stream is handled via catching the exception

Context:

- I have a pytest test where the code looks like the below

```

# Loading data with expected schema and running autoloader

load_data_with_expected_schema()

query_one = run_autoloader_pipeline()

query_one.awaitTermination()

 

# Loading data with additional column and running autoloader again

load_data_with_unexpected_schema()

try:

     # This run fails due to additional column in new data

      query_two = run_autoloader_pipeline()

      query_two.awaitTermination()

except Exception as e:

     # This run succeeds because it has picked up the new schema chanes

     query_three = run_autoloader_pipeline()

     query_three.awaitTermination()

 

assert ...

```

- My test passes, but the actual job is marked as a failure with this error:

```

ERROR: Some streams terminated before this command could finish!

org.apache.spark.sql.catalyst.util.UnknownFieldException: [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] Encountered unknown fields during parsing: {"some_new_field":"some_new_top_level_field"}, which can be fixed by an automatic retry: true

```

- I would like the Databricks job to not be marked as failed in cases like the above where I am purposefully failing an autoloader pipeline

 

What I have Tried:

- Catching the exception for query_two

- Using time.sleep(...) instead of awaitTermiation() for query_2

- Using dbutils.notebook.exit to gracefully exit the notebook where the pytests are running

 

Any help would be much appreciated! If this is the wrong place to post this, please direct me to the correct location

2 REPLIES 2

SP_6721
Contributor III

Hi @r_g_s_cn ,

When a streaming query (like Auto Loader) fails in Databricks, especially due to a schema mismatch, the job or task is automatically marked as FAILED, even if you catch the exception in your code. That’s because the failure is detected at the engine level, outside the Python try/except block.
To avoid this, set:
cloudFiles.schemaEvolutionMode = "rescue"

This makes Auto Loader handle unexpected columns by placing them in the rescued_data column, so your job continues running without being marked as failed.

r_g_s_cn
New Contributor II

Thanks @SP_6721 but I'm not trying to stop the job from failing, I'm trying to not have the Databricks workflow/task be marked as Failed despite the failing streaming query.

Is there any way to override the failure on the engine level? Or some option I can configure such that a failing streaming query doesn't get reported to the engine?