cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Schema evolution for JSON files with AutoLoader

yit
Contributor

 

I am using Auto Loader to ingest JSON files into a managed table. Auto Loader saves only the first-level fields as new columns, while nested structs are stored as values within those columns.

My goal is to support schema evolution when loading new files. However, Auto Loader only detects changes at the top-level columns. What are possible solutions to track and handle schema evolution for nested JSON structures?

Here's the code that I'm using:

df = (
spark.readStream
.format("cloudFiles") 
.option("trigger","true")
.option("multiLine", "false")
.option("cloudFiles.format", "json") 
.option("cloudFiles.inferColumnTypes", "true") 
.option("recursiveFileLookup", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("readerCaseSensitive","false")
.option('cloudFiles.schemaLocation', checkpoint_path)
.load(source_path)
)
(
df.writeStream
.format("delta")
.option("mergeSchema", "true") 
.option("checkpointLocation", checkpoint_path)
.outputMode("append")
.trigger(availableNow=True) 
.table(target_table) 
)
1 ACCEPTED SOLUTION

Accepted Solutions

yit
Contributor

@BS_THE_ANALYST @szymon_dybczak thank you both for your replies.

The solution is already implemented in Auto Loader — it does exactly what I needed, even for nested fields.

The behavior that confused me was that when the evolution mode is set to addNewColumns and a file with new schema fields is processed, an error is initially raised for the new fields. At that same moment, however, the new merged schema is saved to the schema location. If we then reprocess the file (by running the stream again), it is processed correctly and schema evolution succeeds. The new fields are added and populated with null values for existing records.

View solution in original post

8 REPLIES 8

BS_THE_ANALYST
Esteemed Contributor

@yit perhaps it's worth investigating this as a potential route:
https://community.databricks.com/t5/knowledge-sharing-hub/handling-complex-nested-json-in-databricks... 

It's an interesting problem!

This route does seem supported in the documentation as well: https://docs.databricks.com/aws/en/dlt/from-json-schema-evolution 

All the best,
BS

Thanks for the suggestions @BS_THE_ANALYST .

My schema is dynamic so I can't use schema hints. The goal is when there is new nested field in the new coming JSON files, to merge it with the existing schema. 
Most of the time, the problem raises when there is {"metadata":{} } as nested field. Any thoughts?

BS_THE_ANALYST
Esteemed Contributor

Could you build out the schema dynamically by using something like this:

BS_THE_ANALYST_0-1752761847085.png

https://docs.databricks.com/aws/en/sql/language-manual/functions/schema_of_json 

Perhaps there's a way to store your JSON in a variable and inject it into the function. I'm hopeful there'll be other routes aswell.

All the best,
BS

szymon_dybczak
Esteemed Contributor III

Hi @yit ,

Maybe give a try to a new VARIANT data type? VARIANT is flexible to schema and type changes and maintains case sensitivity and NULL values present in the data source, this pattern is robust to most ingestion scenarios (at lease according to documentation šŸ™‚ 

Ingest data as semi-structured variant type | Databricks Documentation

BS_THE_ANALYST
Esteemed Contributor

@yit I've just been introduced to the VARIANT datatype on the course I'm doing. @szymon_dybczak  is bang on the mark here. This seems perfect!
https://www.youtube.com/watch?v=fWdxF7nL3YI  
https://www.databricks.com/blog/introducing-open-variant-data-type-delta-lake-and-apache-spark 

Really impressed with the speed benchmarking on the 2nd link! Can you let us know if this resolves the problem. Very interested šŸ‘Œ.

All the best,
BS

Try it out today on Databricks: https://docs.databricks.com/en/semi-structured/variant.html Read more about it on our blog: https://www.databricks.com/blog/introducing-open-variant-data-type-delta-lake-and-apache-spark If you're curious about the implementation check out the talk: ...

yit
Contributor

@BS_THE_ANALYST @szymon_dybczak thank you both for your replies.

The solution is already implemented in Auto Loader — it does exactly what I needed, even for nested fields.

The behavior that confused me was that when the evolution mode is set to addNewColumns and a file with new schema fields is processed, an error is initially raised for the new fields. At that same moment, however, the new merged schema is saved to the schema location. If we then reprocess the file (by running the stream again), it is processed correctly and schema evolution succeeds. The new fields are added and populated with null values for existing records.

szymon_dybczak
Esteemed Contributor III

Thanks for sharing the solution @yit !

BS_THE_ANALYST
Esteemed Contributor

@yit awesome. Glad that you got this solved. I look forward to the next problem šŸ˜.

All the best,
BS