cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Databricks Free Edition Help
Engage in discussions about the Databricks Free Edition within the Databricks Community. Share insights, tips, and best practices for getting started, troubleshooting issues, and maximizing the value of your trial experience to explore Databricks' capabilities effectively.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Handling New Columns Using Auto Loader Rescue Mode but how will get newly added column

SantiNath_Dey
Contributor

Schema evolution (rescue mode) is being triggered for a complex JSON file. However, when a new column is added in the source, it does not appear in the target table automatically for multi-level nested JSON. How can we add newly added nested columns to the target table when using Databricks Auto Loader with rescue mode?

9 REPLIES 9

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @SantiNath_Dey,

In rescue mode, Auto Loader intentionally freezes the schema. This is by design. New columns, whether top-level or nested, will not appear in the target, they land in _rescued_data as a JSON string. That is the mode's contract. 

Refer to this page.

Ashwin_DSA_0-1776888090816.png

You'll have to switch the evolution mode to resolve this...

While you can consider other options like pre-declaring the fields you need using something like schemahints, this only works when you know which fields you are sure about. It's not something you'd want to use for complex json structures. Rescue mode is best thought of as a safety net for unexpected data, not as an evolution strategy. 

Is there a specific reason why you'd want to do that?

If this answer resolves your question, could you mark it as โ€œAccept as Solutionโ€? That helps other users quickly find the correct fix.

 

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

Hi Ashwin, Thanks for quick response . Could you please share sample code to handle new column additions and data type changes for multi-level nested JSON files?

Thank you

 

Hi @SantiNath_Dey,

There are two ways you can do it. The pattern below should work. 

source_path     = "s3://your-bucket/raw/json"
schema_location = "s3://your-bucket/_schemas/my_stream"
checkpoint_path = "s3://your-bucket/_checkpoints/my_stream"
target_table    = "catalog.db.nested_events"

df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")  
        .option("cloudFiles.inferColumnTypes", "true")              # infer nested structure
        .option("rescuedDataColumn", "_rescued_data")               # capture mismatches/new unexpected fields
        .load(source_path)
)

(df.writeStream
   .format("delta")
   .option("checkpointLocation", checkpoint_path)
   .option("mergeSchema", "true")   # apply evolved schema (incl. nested) to Delta table
   .toTable(target_table))

The above uses  .option("cloudFiles.schemaEvolutionMode", "addNewColumns"). With this option, when a new column appears, stream fails with UnknownFieldException, Auto Loader infers the updated schema (including the new column), writes it to schemaLocation, and you restart the stream. On restart you must use mergeSchema so the Delta table picks up the new column. However, when a columnโ€™s data type changes (e.g. INT โ†’ LONG)...the schema is not evolved. Mismatched values are set to NULL and the original values go into _rescued_data.

A modified pattern is... instead of using .option("cloudFiles.schemaEvolutionMode", "addNewColumns"), use 

.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")

The difference is that this mode can deal with both new columns and type changes. For new columns, it results in the same behaviour as addNewColumns (added to the schema when the stream restarts). In addition to that it also supports widenable type changes (e.g. INT โ†’ LONG, FLOAT โ†’ DOUBLE, smaller โ†’ larger DECIMAL). Auto Loader automatically widens the column type in the schema instead of treating the new values as mismatches, so they land in the main column rather than _rescued_data. The caveat with this option is that this is in public preview and needs DBR โ‰ฅ 16.4. So start with the addNewColumns and move to addNewColumnsWithTypeWidening at later time.

Hope this helps.

If this answer resolves your question, could you mark it as โ€œAccept as Solutionโ€? That helps other users quickly find the correct fix.

 

 
Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

Thank you quick response . I will check and get back you.

SantiNath_Dey
Contributor
The solution using the option .option("cloudFiles.schemaEvolutionMode", "addNewColumns") is working as expected. With this configuration, when a new column appears in the source, the stream fails with an UnknownFieldException. However, we would like to capture any newly added columns using the schema drift (rescue column) mechanism. If this is not possible using the schema drift option, an alternative approach is required to print or log the names of the newly added columns so that business users can be notified.

Hi @SantiNath_Dey,

No... With cloudFiles.schemaEvolutionMode="addNewColumns", new columns are never put into the rescuedDataColumn. They always trigger an UnknownFieldException, Auto Loader writes an updated schema to schemaLocation, and then you restart the stream. There is no mode that both (a) evolves the schema and (b) rescues new columns instead of failing.

So you need a separate detection/logging step. A simple approach is to let Auto Loader run with addNewColumns (as you do now). After the schema has evolved and the table has been updated (after restart with mergeSchema), diff the table schema against a stored "previous schema snapshot" and log any new paths.

You can notify business users via email/alerts/dashboard, which can be built from the diff table created above. However, thereโ€™s no supported combination where addNewColumns will both evolve the schema and also send those new fields into the rescue column if that's what you are expecting.

If this answer resolves your question, could you mark it as โ€œAccept as Solutionโ€? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

Thank you for your response. Do you have ant sample of schema comapre code beween old and New for complex and nested json file. That will be very helpful.

Hi @SantiNath_Dey,

I don't have sample code to share, but everything you need is in the ideas and docs already posted. Give it a go and see how far you get. If you hit a specific error or get stuck on a particular step, drop the code snippet and the error here, and I'll take a look.

Writing the full implementation for scenarios like this is beyond what I can scope over this community forum, but I'm happy to unblock you once you've got something started. Curious to see what you come up with!

Good luck!

If this answer resolves your question, could you mark it as โ€œAccept as Solutionโ€? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

SantiNath_Dey
Contributor

Thank you quick response. We will check and get back you ...