cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Schema evolution issue

Confused
New Contributor III

Hi All

I am loading some data using auto loader but am having trouble with Schema evolution.

A new column has been added to the data I am loading and I am getting the following error:

StreamingQueryException: Encountered unknown field(s) during parsing: {"SomeField":{}}

I'm not 100% sure if this error is being thrown by autoloader or by structured streaming, but I am not specifying a schema on the CloudFiles config (Just a schema location) and I am setting the following option on the writeStream

.option("mergeSchema", "true")

Does anyone have any thoughts on this?

Cheers

Mat

7 REPLIES 7

Anonymous
Not applicable

It's not on the writer that you need to evolve the schema, it's on the read size that you're running into the problem. The docs here describe how to adjust the autoloader.

Confused
New Contributor III

Hi Josephk

I had read that doc but I don't see where I am having an issue.

Per the first example it says I should be doing tthis:

spark.readStream.format("cloudFiles") \

.option("cloudFiles.format", "json") \

.option("cloudFiles.schemaLocation", "<path_to_schema_location>") \

.load("<path_to_source_data>") \

.writeStream \

.option("mergeSchema", "true") \

.option("checkpointLocation", "<path_to_checkpoint>") \

.start("<path_to_target")

So I have a few more cloudFiles options as I'm reading file notifications from a queue, but basically I am doing the same as above, not specifying a schema in the read, and setting mergeSchema in the write.

Anonymous
Not applicable

You'll need to add the option on the reader for add new columns. It's:

.option("cloudFiles.schemaEvolutionMode","addNewColumns").

Confused
New Contributor III

Hmmmm, I hadn't added it as that doc says it is a default when you don't provide a schema.

addNewColumns

: The default mode when a schema is not provided to Auto Loader. 

I will give it a try though, thanks.

Confused
New Contributor III

Yeah I get the same error, ran the job twice per the docs as the first should fail then second succeed and identical error.

Soma
Valued Contributor

Hi all this is due to empty struct column which autoloader is confusing with a struct with some schema.

If we know the struct schema based on past give schema hint to autoloader for the struct or read this column as string and then parse it later using from_json or regexp_extract

https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-schema.html#schema-hints

rgrosskopf
New Contributor II

I agree that hints are the way to go if you have the schema available but the whole point of schema evolution is that you might not always know the schema in advance.

I received a similar error with a similar streaming query configuration. The issue was that the read schema is derived from a limited sample of the files to be imported (configurable but 1000 files by default). The new field wasn't in the sample so it errored out when it ran into the new field later in the ingest process.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.