Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.
df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')
This reading of JSON file is taking ~25 minutes for a 33GiB file.
My Cluster Configuration Summary:
 The schema for the JSON file is the following
The schema for the JSON file is the following
root
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- reporting_structure: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- allowed_amount_files: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |-- in_network_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- location: string (nullable = true)
 |    |    |-- reporting_plans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- plan_id: string (nullable = true)
 |    |    |    |    |-- plan_id_type: string (nullable = true)
 |    |    |    |    |-- plan_market_type: string (nullable = true)
 |    |    |    |    |-- plan_name: string (nullable = true)
Because the data is a nested JSON file, I have used the following function code to flatten the data.
from pyspark.sql.types import *
from pyspark.sql.functions import *
 
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df
Calling the flatten function
df_flatten = flatten(df)
This gives the output as follows
Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>
I then tried to display the flattened dataframe df_flatten
df_flatten.display()
This gives the following error after ~50 minutes of execution
FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded
 If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.
If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.
df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')