Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.
df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')
This reading of JSON file is taking ~25 minutes for a 33GiB file.
My Cluster Configuration Summary:
The schema for the JSON file is the following
root
|-- reporting_entity_name: string (nullable = true)
|-- reporting_entity_type: string (nullable = true)
|-- reporting_structure: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- allowed_amount_files: struct (nullable = true)
| | | |-- description: string (nullable = true)
| | | |-- location: string (nullable = true)
| | |-- in_network_files: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- description: string (nullable = true)
| | | | |-- location: string (nullable = true)
| | |-- reporting_plans: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- plan_id: string (nullable = true)
| | | | |-- plan_id_type: string (nullable = true)
| | | | |-- plan_market_type: string (nullable = true)
| | | | |-- plan_name: string (nullable = true)
Because the data is a nested JSON file, I have used the following function code to flatten the data.
from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Calling the flatten function
df_flatten = flatten(df)
This gives the output as follows
Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>
I then tried to display the flattened dataframe df_flatten
df_flatten.display()
This gives the following error after ~50 minutes of execution
FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded
If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.
df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')