cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader Schema Hint are not taken into consideration in schema file

robertkoss
New Contributor III

I am using Autoloader with Schema Inference to automatically load some data into S3.

I have one column that is a Map which is overwhelming Autoloader (it tries to infer it as struct -> creating a struct with all keys as properties), so I just use a schema hint for that column.

My output data frame / Delta Table looks exactly as expected, so schema hint works great in that regard.

The only problem that I am facing is that on schema inference the schema hint does not seem to be taken into account. The stage of the Spark job is very slow and the schema file that Autoloader produces is still insanely huge causing driver OOMs. Has anyone faced something similar before?

The code is very simple:

spark \
            .readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format", "json") \
            .option("cloudFiles.inferColumnTypes", "true") \
            .option("cloudFiles.schemaHints", SCHEMA_HINT) \
            .option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \
            .load(f"{source_s3_bucket}/{source_table_name}") \
            .writeStream \
            .trigger(availableNow=True) \
            .format("delta") \
            .option("mergeSchema", "true") \
            .option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \
            .option("streamName", source_table_name) \
            .start(f"{target_s3_bucket}/{target_table_name}")
8 REPLIES 8

Kaniz_Fatma
Community Manager
Community Manager

Hi @robertkoss, Make sure you're using the correct syntax for your schema hints and explicitly define the problematic column as a Map type in your schema hint.

 

As I said, the resulting data table has the right schema, the only problem is the schema file produced by autoloader, which still has the wrong schema. Since it is type map, the schema file gets huge

@Kaniz_Fatma how would you recommend to deal with Map types when using Autoloader and it infers it as struct and the keys as millions of different types?

Witold
Contributor III

@robertkoss 

Have you tried to pass the schema of the target table instead?

Something like:

(
  spark.readStream
  .format('cloudFiles')
  .schema(spark.read.load(f'{target_s3_bucket}/{target_table_name}').schema)
  ...
)

robertkoss
New Contributor III

@lsoewito How would this work when for example the schema of the source table changes? Or on first run?

Witold
Contributor III
How would this work when for example the schema of the source table changes

Schema evolution is a huge topic, and can't be answered with a simple "do it like this". Do you want to support it all, how do you deal with breaking and non-breaking changes? Features like rescuing data of Auto Loader or Type Widening of Delta Lake will support you, but essentially you need a whole concept.

 


Or on first run

It doesn't matter in a well-designed solution, as tables always exist during runtime.

robertkoss
New Contributor III

@Witold So my current solution is working for all other tables really well.

I am using Auto Loader with Schema Inference on incoming JSON files and writing the data as is to Delta. Since AutoLoader supports schema evolution, the Delta Table has all the incoming columns.

The only problem is with Schema Inference on Map Types. All the keys of the map become properties of a struct type, making the schema huge. The Databricks documentation advises to use schema hints in that case, which works for the output table. The only problem is that the schema file produced by the inference is not changed according to the hints, leaking data and making it too huge to work with on subsequent runs.

It feels like this is rather a bug of Autoloader Schema Inference, than a poorly designed solution 

Witold
Contributor III
Sorry, I didn't mean that your solution is poorly designed. I was only referring to the one of the main definitions of your bronze layer: You want to have a defined and optimized data layout, which is  source-driven at the same time. In other words: A pre-defined table with a schema, constraints, various other delta-table configurations.

Saying that, we in almost all cases avoid schema inference, because we know the source schema, and don't want to let Spark "guess" it. If you do it otherwise, then things like in your case might simply happen: Spark "guessed" wrong. Especially nested types (structs, maps, arrays), it might be actually quite tricky.

Long story short, we actually never had good experience with schemaHints, it never worked the way you expect. I honestly can't tell if it's just the way it works or a bug. Instead we follow a proper schema evolution concept with pre-defined schema and rescuing data.

Btw, I'm not sure if this is a proper alternative for you, but Databricks introduced the VARIANT data type in the latest runtime (will be actually part of Spark 4.0). You basically store your JSON structure in one column.

I hope it helps!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group