cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to efficiently process a 100GiB JSON nested file and store it in Delta?

Sameer_876675
New Contributor III

Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.

df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')

This reading of JSON file is taking ~25 minutes for a 33GiB file.

My Cluster Configuration Summary:

Cluster SummaryThe schema for the JSON file is the following

root
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- reporting_structure: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- allowed_amount_files: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |-- in_network_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- location: string (nullable = true)
 |    |    |-- reporting_plans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- plan_id: string (nullable = true)
 |    |    |    |    |-- plan_id_type: string (nullable = true)
 |    |    |    |    |-- plan_market_type: string (nullable = true)
 |    |    |    |    |-- plan_name: string (nullable = true)

Because the data is a nested JSON file, I have used the following function code to flatten the data.

from pyspark.sql.types import *
from pyspark.sql.functions import *
 
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

Calling the flatten function

df_flatten = flatten(df)

This gives the output as follows

Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>

I then tried to display the flattened dataframe df_flatten

df_flatten.display()

This gives the following error after ~50 minutes of execution

FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded

OOM ErrorIf I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.

df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')

3 REPLIES 3

AndriusVitkausk
New Contributor III

Do you have access to the data at source? See if you can pull the data down into storage in pieces instead of it being such a huge single file.

If you have a single JSON file, everything will be allocated to a single core and the decompression is just going to nuke your executor memory - hence the garbage collector not being able to free up space and dying eventually. It won't really matter how big the cluster is either with such a file. Will need to chop that file up into chunks before feeding to spark so the entire cluster can take a small chunk of it and process in parallel.

Pat
Honored Contributor III

Hi @Sameer Khalid​ ,

check this notebook on how to work with nested json : https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-datafra...

what you need to look at is how to work with:

  • Struct - col_name.*
  • arrays - explode

I might be able to look more closely to the logic of the flatten function later.

Annapurna_Hiriy
New Contributor III
New Contributor III
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.