cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to efficiently process a 100GiB JSON nested file and store it in Delta?

Sameer_876675
New Contributor III

Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.

df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')

This reading of JSON file is taking ~25 minutes for a 33GiB file.

My Cluster Configuration Summary:

Cluster SummaryThe schema for the JSON file is the following

root
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- reporting_structure: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- allowed_amount_files: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |-- in_network_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- location: string (nullable = true)
 |    |    |-- reporting_plans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- plan_id: string (nullable = true)
 |    |    |    |    |-- plan_id_type: string (nullable = true)
 |    |    |    |    |-- plan_market_type: string (nullable = true)
 |    |    |    |    |-- plan_name: string (nullable = true)

Because the data is a nested JSON file, I have used the following function code to flatten the data.

from pyspark.sql.types import *
from pyspark.sql.functions import *
 
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df

Calling the flatten function

df_flatten = flatten(df)

This gives the output as follows

Processing :reporting_structure Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_allowed_amount_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_in_network_files Type : <class 'pyspark.sql.types.StructType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.ArrayType'>
Processing :reporting_structure_reporting_plans Type : <class 'pyspark.sql.types.StructType'>

I then tried to display the flattened dataframe df_flatten

df_flatten.display()

This gives the following error after ~50 minutes of execution

FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded

OOM ErrorIf I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.

df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')

3 REPLIES 3

AndriusVitkausk
New Contributor III

Do you have access to the data at source? See if you can pull the data down into storage in pieces instead of it being such a huge single file.

If you have a single JSON file, everything will be allocated to a single core and the decompression is just going to nuke your executor memory - hence the garbage collector not being able to free up space and dying eventually. It won't really matter how big the cluster is either with such a file. Will need to chop that file up into chunks before feeding to spark so the entire cluster can take a small chunk of it and process in parallel.

Pat
Honored Contributor III

Hi @Sameer Khalid​ ,

check this notebook on how to work with nested json : https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-datafra...

what you need to look at is how to work with:

  • Struct - col_name.*
  • arrays - explode

I might be able to look more closely to the logic of the flatten function later.

Annapurna_Hiriy
New Contributor III
New Contributor III