โ07-15-2024 12:30 PM
I am using Autoloader with Schema Inference to automatically load some data into S3.
I have one column that is a Map which is overwhelming Autoloader (it tries to infer it as struct -> creating a struct with all keys as properties), so I just use a schema hint for that column.
My output data frame / Delta Table looks exactly as expected, so schema hint works great in that regard.
The only problem that I am facing is that on schema inference the schema hint does not seem to be taken into account. The stage of the Spark job is very slow and the schema file that Autoloader produces is still insanely huge causing driver OOMs. Has anyone faced something similar before?
The code is very simple:
spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaHints", SCHEMA_HINT) \
.option("cloudFiles.schemaLocation", f"{target_s3_bucket}/_schema/{source_table_name}") \
.load(f"{source_s3_bucket}/{source_table_name}") \
.writeStream \
.trigger(availableNow=True) \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{target_s3_bucket}/_checkpoint/{source_table_name}") \
.option("streamName", source_table_name) \
.start(f"{target_s3_bucket}/{target_table_name}")
โ07-16-2024 03:40 AM
As I said, the resulting data table has the right schema, the only problem is the schema file produced by autoloader, which still has the wrong schema. Since it is type map, the schema file gets huge
โ07-24-2024 02:27 AM
@Retired_mod how would you recommend to deal with Map types when using Autoloader and it infers it as struct and the keys as millions of different types?
โ07-24-2024 02:41 AM
Have you tried to pass the schema of the target table instead?
Something like:
(
spark.readStream
.format('cloudFiles')
.schema(spark.read.load(f'{target_s3_bucket}/{target_table_name}').schema)
...
)
โ07-24-2024 02:45 AM
@lsoewito How would this work when for example the schema of the source table changes? Or on first run?
โ07-24-2024 04:06 AM
How would this work when for example the schema of the source table changes
Schema evolution is a huge topic, and can't be answered with a simple "do it like this". Do you want to support it all, how do you deal with breaking and non-breaking changes? Features like rescuing data of Auto Loader or Type Widening of Delta Lake will support you, but essentially you need a whole concept.
Or on first run
It doesn't matter in a well-designed solution, as tables always exist during runtime.
โ07-24-2024 04:19 AM
@Witold So my current solution is working for all other tables really well.
I am using Auto Loader with Schema Inference on incoming JSON files and writing the data as is to Delta. Since AutoLoader supports schema evolution, the Delta Table has all the incoming columns.
The only problem is with Schema Inference on Map Types. All the keys of the map become properties of a struct type, making the schema huge. The Databricks documentation advises to use schema hints in that case, which works for the output table. The only problem is that the schema file produced by the inference is not changed according to the hints, leaking data and making it too huge to work with on subsequent runs.
It feels like this is rather a bug of Autoloader Schema Inference, than a poorly designed solution
โ07-24-2024 06:38 AM
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group