Wednesday
Schema evolution (rescue mode) is being triggered for a complex JSON file. However, when a new column is added in the source, it does not appear in the target table automatically for multi-level nested JSON. How can we add newly added nested columns to the target table when using Databricks Auto Loader with rescue mode?
Wednesday
Hi @SantiNath_Dey,
In rescue mode, Auto Loader intentionally freezes the schema. This is by design. New columns, whether top-level or nested, will not appear in the target, they land in _rescued_data as a JSON string. That is the mode's contract.
Refer to this page.
You'll have to switch the evolution mode to resolve this...
While you can consider other options like pre-declaring the fields you need using something like schemahints, this only works when you know which fields you are sure about. It's not something you'd want to use for complex json structures. Rescue mode is best thought of as a safety net for unexpected data, not as an evolution strategy.
Is there a specific reason why you'd want to do that?
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
Thursday
Hi Ashwin, Thanks for quick response . Could you please share sample code to handle new column additions and data type changes for multi-level nested JSON files?
Thank you
Thursday
Hi @SantiNath_Dey,
There are two ways you can do it. The pattern below should work.
source_path = "s3://your-bucket/raw/json"
schema_location = "s3://your-bucket/_schemas/my_stream"
checkpoint_path = "s3://your-bucket/_checkpoints/my_stream"
target_table = "catalog.db.nested_events"
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema_location)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true") # infer nested structure
.option("rescuedDataColumn", "_rescued_data") # capture mismatches/new unexpected fields
.load(source_path)
)
(df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true") # apply evolved schema (incl. nested) to Delta table
.toTable(target_table))
The above uses .option("cloudFiles.schemaEvolutionMode", "addNewColumns"). With this option, when a new column appears, stream fails with UnknownFieldException, Auto Loader infers the updated schema (including the new column), writes it to schemaLocation, and you restart the stream. On restart you must use mergeSchema so the Delta table picks up the new column. However, when a columnโs data type changes (e.g. INT โ LONG)...the schema is not evolved. Mismatched values are set to NULL and the original values go into _rescued_data.
A modified pattern is... instead of using .option("cloudFiles.schemaEvolutionMode", "addNewColumns"), use
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
The difference is that this mode can deal with both new columns and type changes. For new columns, it results in the same behaviour as addNewColumns (added to the schema when the stream restarts). In addition to that it also supports widenable type changes (e.g. INT โ LONG, FLOAT โ DOUBLE, smaller โ larger DECIMAL). Auto Loader automatically widens the column type in the schema instead of treating the new values as mismatches, so they land in the main column rather than _rescued_data. The caveat with this option is that this is in public preview and needs DBR โฅ 16.4. So start with the addNewColumns and move to addNewColumnsWithTypeWidening at later time.
Hope this helps.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
Thursday
Thank you quick response . I will check and get back you.
Friday
Friday
Hi @SantiNath_Dey,
No... With cloudFiles.schemaEvolutionMode="addNewColumns", new columns are never put into the rescuedDataColumn. They always trigger an UnknownFieldException, Auto Loader writes an updated schema to schemaLocation, and then you restart the stream. There is no mode that both (a) evolves the schema and (b) rescues new columns instead of failing.
So you need a separate detection/logging step. A simple approach is to let Auto Loader run with addNewColumns (as you do now). After the schema has evolved and the table has been updated (after restart with mergeSchema), diff the table schema against a stored "previous schema snapshot" and log any new paths.
You can notify business users via email/alerts/dashboard, which can be built from the diff table created above. However, thereโs no supported combination where addNewColumns will both evolve the schema and also send those new fields into the rescue column if that's what you are expecting.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
Friday
Friday
Hi @SantiNath_Dey,
I don't have sample code to share, but everything you need is in the ideas and docs already posted. Give it a go and see how far you get. If you hit a specific error or get stuck on a particular step, drop the code snippet and the error here, and I'll take a look.
Writing the full implementation for scenarios like this is beyond what I can scope over this community forum, but I'm happy to unblock you once you've got something started. Curious to see what you come up with!
Good luck!
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
Saturday
Thank you quick response. We will check and get back you ...