cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Schema evolution for JSON files with AutoLoader

yit
New Contributor III

 

I am using Auto Loader to ingest JSON files into a managed table. Auto Loader saves only the first-level fields as new columns, while nested structs are stored as values within those columns.

My goal is to support schema evolution when loading new files. However, Auto Loader only detects changes at the top-level columns. What are possible solutions to track and handle schema evolution for nested JSON structures?

Here's the code that I'm using:

df = (
spark.readStream
.format("cloudFiles") 
.option("trigger","true")
.option("multiLine", "false")
.option("cloudFiles.format", "json") 
.option("cloudFiles.inferColumnTypes", "true") 
.option("recursiveFileLookup", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("readerCaseSensitive","false")
.option('cloudFiles.schemaLocation', checkpoint_path)
.load(source_path)
)
(
df.writeStream
.format("delta")
.option("mergeSchema", "true") 
.option("checkpointLocation", checkpoint_path)
.outputMode("append")
.trigger(availableNow=True) 
.table(target_table) 
)
1 ACCEPTED SOLUTION

Accepted Solutions

yit
New Contributor III

@BS_THE_ANALYST @szymon_dybczak thank you both for your replies.

The solution is already implemented in Auto Loader — it does exactly what I needed, even for nested fields.

The behavior that confused me was that when the evolution mode is set to addNewColumns and a file with new schema fields is processed, an error is initially raised for the new fields. At that same moment, however, the new merged schema is saved to the schema location. If we then reprocess the file (by running the stream again), it is processed correctly and schema evolution succeeds. The new fields are added and populated with null values for existing records.

View solution in original post

8 REPLIES 8

BS_THE_ANALYST
Honored Contributor III

@yit perhaps it's worth investigating this as a potential route:
https://community.databricks.com/t5/knowledge-sharing-hub/handling-complex-nested-json-in-databricks... 

It's an interesting problem!

This route does seem supported in the documentation as well: https://docs.databricks.com/aws/en/dlt/from-json-schema-evolution 

All the best,
BS

yit
New Contributor III

Thanks for the suggestions @BS_THE_ANALYST .

My schema is dynamic so I can't use schema hints. The goal is when there is new nested field in the new coming JSON files, to merge it with the existing schema. 
Most of the time, the problem raises when there is {"metadata":{} } as nested field. Any thoughts?

BS_THE_ANALYST
Honored Contributor III

Could you build out the schema dynamically by using something like this:

BS_THE_ANALYST_0-1752761847085.png

https://docs.databricks.com/aws/en/sql/language-manual/functions/schema_of_json 

Perhaps there's a way to store your JSON in a variable and inject it into the function. I'm hopeful there'll be other routes aswell.

All the best,
BS

szymon_dybczak
Esteemed Contributor III

Hi @yit ,

Maybe give a try to a new VARIANT data type? VARIANT is flexible to schema and type changes and maintains case sensitivity and NULL values present in the data source, this pattern is robust to most ingestion scenarios (at lease according to documentation 🙂 

Ingest data as semi-structured variant type | Databricks Documentation

BS_THE_ANALYST
Honored Contributor III

@yit I've just been introduced to the VARIANT datatype on the course I'm doing. @szymon_dybczak  is bang on the mark here. This seems perfect!
https://www.youtube.com/watch?v=fWdxF7nL3YI  
https://www.databricks.com/blog/introducing-open-variant-data-type-delta-lake-and-apache-spark 

Really impressed with the speed benchmarking on the 2nd link! Can you let us know if this resolves the problem. Very interested 👌.

All the best,
BS

yit
New Contributor III

@BS_THE_ANALYST @szymon_dybczak thank you both for your replies.

The solution is already implemented in Auto Loader — it does exactly what I needed, even for nested fields.

The behavior that confused me was that when the evolution mode is set to addNewColumns and a file with new schema fields is processed, an error is initially raised for the new fields. At that same moment, however, the new merged schema is saved to the schema location. If we then reprocess the file (by running the stream again), it is processed correctly and schema evolution succeeds. The new fields are added and populated with null values for existing records.

szymon_dybczak
Esteemed Contributor III

Thanks for sharing the solution @yit !

BS_THE_ANALYST
Honored Contributor III

@yit awesome. Glad that you got this solved. I look forward to the next problem 😏.

All the best,
BS

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now