How to efficiently process a 100GiB JSON nested file and store it in Delta?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-07-2022 04:22 AM
Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.
df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')
This reading of JSON file is taking ~25 minutes for a 33GiB file.
My Cluster Configuration Summary:
The schema for the JSON file is the following
root
|-- reporting_entity_name: string (nullable = true)
|-- reporting_entity_type: string (nullable = true)
|-- reporting_structure: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- allowed_amount_files: struct (nullable = true)
| | | |-- description: string (nullable = true)
| | | |-- location: string (nullable = true)
| | |-- in_network_files: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- description: string (nullable = true)
| | | | |-- location: string (nullable = true)
| | |-- reporting_plans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- plan_id: string (nullable = true)
| | | | |-- plan_id_type: string (nullable = true)
| | | | |-- plan_market_type: string (nullable = true)
| | | | |-- plan_name: string (nullable = true)
Because the data is a nested JSON file, I have used the following function code to flatten the data.
from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Calling the flatten function
df_flatten = flatten(df)
This gives the output as follows
Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>
I then tried to display the flattened dataframe df_flatten
df_flatten.display()
This gives the following error after ~50 minutes of execution
FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded
If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.
df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')
- Labels:
-
Apache spark
-
Azure
-
Delta
-
Nested json
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-07-2022 05:37 AM
Do you have access to the data at source? See if you can pull the data down into storage in pieces instead of it being such a huge single file.
If you have a single JSON file, everything will be allocated to a single core and the decompression is just going to nuke your executor memory - hence the garbage collector not being able to free up space and dying eventually. It won't really matter how big the cluster is either with such a file. Will need to chop that file up into chunks before feeding to spark so the entire cluster can take a small chunk of it and process in parallel.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-07-2022 05:41 AM
Hi @Sameer Khalid ,
check this notebook on how to work with nested json : https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-datafra...
what you need to look at is how to work with:
- Struct - col_name.*
- arrays - explode
I might be able to look more closely to the logic of the flatten function later.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-31-2023 08:20 AM
Hi Sameer, please refer to following documents on how to work with nested json:
https://docs.databricks.com/optimizations/semi-structured.html

