cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Structured Streaming Auto Loader UnknownFieldsException and Workflow Retries

ilarsen
Contributor

Hi.

 

I am using structured streaming and auto loader to read json files, and it is automated by Workflow.  I am having difficulties with the job failing as schema changes are detected, but not retrying.  Hopefully someone can point me in the right direction?

 

From what I understand from the documentation (Configure schema inference and evolution in Auto Loader - Azure Databricks | Microsoft Learn) is that the failure is expected behaviour.  I have enabled retries on the Job Task, but it does not appear that the retries are happening.  What could be a key point, is that the job task runs a notebook, which in turn runs another notebook containing the auto loader code by 

dbutils.notebook.run()

Could the notebook.run() approach be conflicting with the retry setting on the Task, and therefore no retry?

 

My stream reader has these options defined:

options = {
        "cloudFiles.useNotifications": "false",
        "cloudFiles.schemaLocation": <variable to checkpoint location here>,
        "cloudFiles.format": "json",
        "cloudFiles.includeExistingFiles": True,
        "cloudFiles.inferColumnTypes": True,
        "cloudFiles.useIncrementalListing": "true"
    }

I notice that my options do not include 

 

cloudFiles.schemaEvolutionMode

 

, but according to the docs that is default.

 

My stream writer has this option defined:

.option("mergeSchema", "true")

 

Does anyone have any ideas please?

3 REPLIES 3

Kaniz_Fatma
Community Manager
Community Manager

Hi @ilarsen , Letโ€™s troubleshoot the issue with schema changes and retries in your structured streaming job using Auto Loader.

 

Schema Inference and Evolution:

  • Auto Loader allows you to automatically detect the schema of loaded data, eliminating the need for explicit schema declaration.
  • When reading data for the first time, Auto Loader samples the first 50 GB or 1000 files (whichever limit is crossed first) to infer the schema.
  • The inferred schema is stored in a directory called _schemas at the configured cloudFiles.schemaLocation.
  • As new columns are introduced, Auto Loader updates the schema by merging new columns to the end of the existing schema.
  • Schema evolution ensures that your table schema evolves over time without manual intervention.

Retry Behavior:

  • You mentioned enabling retries on the Job Task, but it seems the retries are not happening.
  • The retries are typically managed at the job level, not within individual notebooks.
  • Ensure that the retry settings are correctly configured for the entire job, including any notebooks invoked using dbutils.notebook.run().

Notebook Execution and Retries:

  • The notebook.run() approach should not inherently conflict with retries.
  • However, consider the following:
    • If the notebook containing the Auto Loader code fails, the retry behaviour depends on the overall job configuration.
    • Verify that the retries are set at the job level and not overridden within the notebooks.

Schema Evolution Mode:

  • You mentioned that your options do not include cloudFiles.schemaEvolutionMode.
  • By default, Auto Loader uses the appropriate schema evolution mode based on the file format.
  • If you want to explicitly set the schema evolution mode, you can add it to your options.
  • Example: .option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Stream Writer Options:

  • The .option("mergeSchema", "true") setting in your stream writer ensures that schema merging occurs when writing data.
  • This option is relevant when appending data to an existing table.
  • Confirm that this behavior aligns with your use case.

Debugging Steps:

  • Check the job logs for any specific error messages related to schema changes.
  • Inspect the _schemas directory to see if it reflects the expected schema evolution.
  • Review the overall job configuration, including retries and checkpoint locations.

Remember that structured streaming jobs can have nuances, and itโ€™s essential to verify each componentโ€™s behaviour. 

Hi Kaniz,

 

Thank you for your comprehensive response, I appreciate it.  I have not resolved the issue in my situation yet, but I am perhaps a little closer.

 

Basically, my Job is a 3-step chain of Tasks::

  1. Step 1 is a "set up" Task that queries metadata and so on to define the next step of tasks.
  2. Step 2 is 1 - n Tasks that run ingestion of groups of objects in parallel.  Depends on step 1.  The common Notebook in these Tasks uses dbutils.notebook.run() to call the Notebook containing the auto loader logic.
  3. Step 3 is basically the finishing Task to update logs and so on.  Depends on step 2.  This is designed to fail if a previous step 2 Task fails.

In my last test, I confirmed I had retries configured on every Task in the Job.

I was running ingestion for a new source, and expected schema changes between older files and now.

After it had run for some time, I saw that the 3rd finalising task was retrying, and that one of the step 2 tasks (where the auto loader code is) had failed - and had not retried.

So, I suspect the issue lies somewhere in the code around using notebook.run(), or within the Notebook called by it.  Above you mention :

Notebook Execution and Retries:

  • The notebook.run() approach should not inherently conflict with retries.
  • However, consider the following:
    • If the notebook containing the Auto Loader code fails, the retry behaviour depends on the overall job configuration.
    • Verify that the retries are set at the job level and not overridden within the notebooks.

 

I wonder if that is what is happening here.

The implementation with notebook.run() follows, in pseudo code:

Step 2 "parent" task:

 

 

try:
runResult = dbutils.notebook.run( #parameters here )

except:
# code to log exception here

 

 

 and then in the called notebook, there is this around .writeStream:

 

 

try:
streamWriter = (df.writeStream ...
.outputMode("append")
.option("mergeSchema", "true")
...
)
streamWriter.awaitTermination()

except:
#code to log exception here ...
dbutils.notebook.exit(json.dumps(logger.log_info))

 

 

Not sure if there is something significant in there; or that I know enough about the nuances of calling and exiting notebook runs to see what could be causing a problem here.

 

Appreciate it if anyone can provide any pearls of wisdom ๐Ÿ˜‰

ilarsen
Contributor

Another point I have realised, is that the task and the parent notebook (which then calls the child notebook that runs the auto loader part) does not fail if the schema-changed failure occurs during the auto loader process.  It's the child notebook and the job created by notebook.run() that fails.  Which suggests to me that this task needs to fail to trigger the automatic retry.  Except, the design here is that the parent notebook is an iterative orchestrator, it initiates the ingestion for a group of objects.  So if the first object fails, it needs to keep running to the next one, then the next.  ๐Ÿค”

 

It's an issue that makes a paradigm change to DLT have more merit, as automatic retries are just part of it and doesn't need thinking about or coding around.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group