cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Structured Streaming Auto Loader UnknownFieldsException and Workflow Retries

ilarsen
Contributor

Hi.

 

I am using structured streaming and auto loader to read json files, and it is automated by Workflow.  I am having difficulties with the job failing as schema changes are detected, but not retrying.  Hopefully someone can point me in the right direction?

 

From what I understand from the documentation (Configure schema inference and evolution in Auto Loader - Azure Databricks | Microsoft Learn) is that the failure is expected behaviour.  I have enabled retries on the Job Task, but it does not appear that the retries are happening.  What could be a key point, is that the job task runs a notebook, which in turn runs another notebook containing the auto loader code by 

dbutils.notebook.run()

Could the notebook.run() approach be conflicting with the retry setting on the Task, and therefore no retry?

 

My stream reader has these options defined:

options = {
        "cloudFiles.useNotifications": "false",
        "cloudFiles.schemaLocation": <variable to checkpoint location here>,
        "cloudFiles.format": "json",
        "cloudFiles.includeExistingFiles": True,
        "cloudFiles.inferColumnTypes": True,
        "cloudFiles.useIncrementalListing": "true"
    }

I notice that my options do not include 

 

cloudFiles.schemaEvolutionMode

 

, but according to the docs that is default.

 

My stream writer has this option defined:

.option("mergeSchema", "true")

 

Does anyone have any ideas please?

2 REPLIES 2

Hi Kaniz,

 

Thank you for your comprehensive response, I appreciate it.  I have not resolved the issue in my situation yet, but I am perhaps a little closer.

 

Basically, my Job is a 3-step chain of Tasks::

  1. Step 1 is a "set up" Task that queries metadata and so on to define the next step of tasks.
  2. Step 2 is 1 - n Tasks that run ingestion of groups of objects in parallel.  Depends on step 1.  The common Notebook in these Tasks uses dbutils.notebook.run() to call the Notebook containing the auto loader logic.
  3. Step 3 is basically the finishing Task to update logs and so on.  Depends on step 2.  This is designed to fail if a previous step 2 Task fails.

In my last test, I confirmed I had retries configured on every Task in the Job.

I was running ingestion for a new source, and expected schema changes between older files and now.

After it had run for some time, I saw that the 3rd finalising task was retrying, and that one of the step 2 tasks (where the auto loader code is) had failed - and had not retried.

So, I suspect the issue lies somewhere in the code around using notebook.run(), or within the Notebook called by it.  Above you mention :

Notebook Execution and Retries:

  • The notebook.run() approach should not inherently conflict with retries.
  • However, consider the following:
    • If the notebook containing the Auto Loader code fails, the retry behaviour depends on the overall job configuration.
    • Verify that the retries are set at the job level and not overridden within the notebooks.

 

I wonder if that is what is happening here.

The implementation with notebook.run() follows, in pseudo code:

Step 2 "parent" task:

 

 

try:
runResult = dbutils.notebook.run( #parameters here )

except:
# code to log exception here

 

 

 and then in the called notebook, there is this around .writeStream:

 

 

try:
streamWriter = (df.writeStream ...
.outputMode("append")
.option("mergeSchema", "true")
...
)
streamWriter.awaitTermination()

except:
#code to log exception here ...
dbutils.notebook.exit(json.dumps(logger.log_info))

 

 

Not sure if there is something significant in there; or that I know enough about the nuances of calling and exiting notebook runs to see what could be causing a problem here.

 

Appreciate it if anyone can provide any pearls of wisdom ๐Ÿ˜‰

ilarsen
Contributor

Another point I have realised, is that the task and the parent notebook (which then calls the child notebook that runs the auto loader part) does not fail if the schema-changed failure occurs during the auto loader process.  It's the child notebook and the job created by notebook.run() that fails.  Which suggests to me that this task needs to fail to trigger the automatic retry.  Except, the design here is that the parent notebook is an iterative orchestrator, it initiates the ingestion for a group of objects.  So if the first object fails, it needs to keep running to the next one, then the next.  ๐Ÿค”

 

It's an issue that makes a paradigm change to DLT have more merit, as automatic retries are just part of it and doesn't need thinking about or coding around.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group