Structured Streaming Auto Loader UnknownFieldsException and Workflow Retries
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-21-2023 03:05 PM
Hi.
I am using structured streaming and auto loader to read json files, and it is automated by Workflow. I am having difficulties with the job failing as schema changes are detected, but not retrying. Hopefully someone can point me in the right direction?
From what I understand from the documentation (Configure schema inference and evolution in Auto Loader - Azure Databricks | Microsoft Learn) is that the failure is expected behaviour. I have enabled retries on the Job Task, but it does not appear that the retries are happening. What could be a key point, is that the job task runs a notebook, which in turn runs another notebook containing the auto loader code by
dbutils.notebook.run()
Could the notebook.run() approach be conflicting with the retry setting on the Task, and therefore no retry?
My stream reader has these options defined:
options = {
"cloudFiles.useNotifications": "false",
"cloudFiles.schemaLocation": <variable to checkpoint location here>,
"cloudFiles.format": "json",
"cloudFiles.includeExistingFiles": True,
"cloudFiles.inferColumnTypes": True,
"cloudFiles.useIncrementalListing": "true"
}
I notice that my options do not include
cloudFiles.schemaEvolutionMode
, but according to the docs that is default.
My stream writer has this option defined:
.option("mergeSchema", "true")
Does anyone have any ideas please?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2024 07:59 PM - edited 01-23-2024 08:01 PM
Hi Kaniz,
Thank you for your comprehensive response, I appreciate it. I have not resolved the issue in my situation yet, but I am perhaps a little closer.
Basically, my Job is a 3-step chain of Tasks::
- Step 1 is a "set up" Task that queries metadata and so on to define the next step of tasks.
- Step 2 is 1 - n Tasks that run ingestion of groups of objects in parallel. Depends on step 1. The common Notebook in these Tasks uses dbutils.notebook.run() to call the Notebook containing the auto loader logic.
- Step 3 is basically the finishing Task to update logs and so on. Depends on step 2. This is designed to fail if a previous step 2 Task fails.
In my last test, I confirmed I had retries configured on every Task in the Job.
I was running ingestion for a new source, and expected schema changes between older files and now.
After it had run for some time, I saw that the 3rd finalising task was retrying, and that one of the step 2 tasks (where the auto loader code is) had failed - and had not retried.
So, I suspect the issue lies somewhere in the code around using notebook.run(), or within the Notebook called by it. Above you mention :
Notebook Execution and Retries:
- The notebook.run() approach should not inherently conflict with retries.
- However, consider the following:
- If the notebook containing the Auto Loader code fails, the retry behaviour depends on the overall job configuration.
- Verify that the retries are set at the job level and not overridden within the notebooks.
I wonder if that is what is happening here.
The implementation with notebook.run() follows, in pseudo code:
Step 2 "parent" task:
try:
runResult = dbutils.notebook.run( #parameters here )
except:
# code to log exception here
and then in the called notebook, there is this around .writeStream:
try:
streamWriter = (df.writeStream ...
.outputMode("append")
.option("mergeSchema", "true")
...
)
streamWriter.awaitTermination()
except:
#code to log exception here ...
dbutils.notebook.exit(json.dumps(logger.log_info))
Not sure if there is something significant in there; or that I know enough about the nuances of calling and exiting notebook runs to see what could be causing a problem here.
Appreciate it if anyone can provide any pearls of wisdom 😉
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-24-2024 12:25 PM - edited 01-24-2024 12:28 PM
Another point I have realised, is that the task and the parent notebook (which then calls the child notebook that runs the auto loader part) does not fail if the schema-changed failure occurs during the auto loader process. It's the child notebook and the job created by notebook.run() that fails. Which suggests to me that this task needs to fail to trigger the automatic retry. Except, the design here is that the parent notebook is an iterative orchestrator, it initiates the ingestion for a group of objects. So if the first object fails, it needs to keep running to the next one, then the next. 🤔
It's an issue that makes a paradigm change to DLT have more merit, as automatic retries are just part of it and doesn't need thinking about or coding around.

