12-07-2022 04:22 AM
Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.
df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')
This reading of JSON file is taking ~25 minutes for a 33GiB file.
My Cluster Configuration Summary:
The schema for the JSON file is the following
root
|-- reporting_entity_name: string (nullable = true)
|-- reporting_entity_type: string (nullable = true)
|-- reporting_structure: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- allowed_amount_files: struct (nullable = true)
| | | |-- description: string (nullable = true)
| | | |-- location: string (nullable = true)
| | |-- in_network_files: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- description: string (nullable = true)
| | | | |-- location: string (nullable = true)
| | |-- reporting_plans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- plan_id: string (nullable = true)
| | | | |-- plan_id_type: string (nullable = true)
| | | | |-- plan_market_type: string (nullable = true)
| | | | |-- plan_name: string (nullable = true)
Because the data is a nested JSON file, I have used the following function code to flatten the data.
from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Calling the flatten function
df_flatten = flatten(df)
This gives the output as follows
Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>
I then tried to display the flattened dataframe df_flatten
df_flatten.display()
This gives the following error after ~50 minutes of execution
FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded
If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.
df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')
12-07-2022 05:37 AM
Do you have access to the data at source? See if you can pull the data down into storage in pieces instead of it being such a huge single file.
If you have a single JSON file, everything will be allocated to a single core and the decompression is just going to nuke your executor memory - hence the garbage collector not being able to free up space and dying eventually. It won't really matter how big the cluster is either with such a file. Will need to chop that file up into chunks before feeding to spark so the entire cluster can take a small chunk of it and process in parallel.
12-07-2022 05:41 AM
Hi @Sameer Khalid ,
check this notebook on how to work with nested json : https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-datafra...
what you need to look at is how to work with:
I might be able to look more closely to the logic of the flatten function later.
01-31-2023 08:20 AM
Hi Sameer, please refer to following documents on how to work with nested json:
https://docs.databricks.com/optimizations/semi-structured.html
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group