<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How to efficiently process a 100GiB JSON nested file and store it in Delta? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18196#M12030</link>
    <description>&lt;P&gt;Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This reading of JSON file is taking ~25 minutes for a 33GiB file.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;U&gt;My Cluster Configuration Summary:&lt;/U&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="Cluster Summary"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1041iD87D29E653EDD954/image-size/large?v=v2&amp;amp;px=999" role="button" title="Cluster Summary" alt="Cluster Summary" /&gt;&lt;/span&gt;The schema for the JSON file is the following&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;root
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- reporting_structure: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- allowed_amount_files: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |-- in_network_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- location: string (nullable = true)
 |    |    |-- reporting_plans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- plan_id: string (nullable = true)
 |    |    |    |    |-- plan_id_type: string (nullable = true)
 |    |    |    |    |-- plan_market_type: string (nullable = true)
 |    |    |    |    |-- plan_name: string (nullable = true)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Because the data is a nested JSON file, I have used the following function code to flatten the data.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql.types import *
from pyspark.sql.functions import *
&amp;nbsp;
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Calling the flatten function&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df_flatten = flatten(df)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This gives the output as follows&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;Processing :reporting_structure Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_allowed_amount_files Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_in_network_files Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure_in_network_files Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_reporting_plans Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure_reporting_plans Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;I then tried to display the flattened dataframe df_flatten&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df_flatten.display()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This gives the following error after ~50 minutes of execution&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="OOM Error"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1038i2530AFBBB6B29A62/image-size/large?v=v2&amp;amp;px=999" role="button" title="OOM Error" alt="OOM Error" /&gt;&lt;/span&gt;If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 07 Dec 2022 12:22:17 GMT</pubDate>
    <dc:creator>Sameer_876675</dc:creator>
    <dc:date>2022-12-07T12:22:17Z</dc:date>
    <item>
      <title>How to efficiently process a 100GiB JSON nested file and store it in Delta?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18196#M12030</link>
      <description>&lt;P&gt;Hi, I'm a fairly new user and I am using Azure Databricks to process a ~1000GiB JSON nested file containing insurance policy data. I uploaded the JSON file to Azure Data Lake Gen2 storage and read the JSON file into a dataframe.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df=spark.read.option("multiline","true").json('mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz')&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This reading of JSON file is taking ~25 minutes for a 33GiB file.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;U&gt;My Cluster Configuration Summary:&lt;/U&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="Cluster Summary"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1041iD87D29E653EDD954/image-size/large?v=v2&amp;amp;px=999" role="button" title="Cluster Summary" alt="Cluster Summary" /&gt;&lt;/span&gt;The schema for the JSON file is the following&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;root
 |-- reporting_entity_name: string (nullable = true)
 |-- reporting_entity_type: string (nullable = true)
 |-- reporting_structure: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- allowed_amount_files: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |-- in_network_files: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- location: string (nullable = true)
 |    |    |-- reporting_plans: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- plan_id: string (nullable = true)
 |    |    |    |    |-- plan_id_type: string (nullable = true)
 |    |    |    |    |-- plan_market_type: string (nullable = true)
 |    |    |    |    |-- plan_name: string (nullable = true)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Because the data is a nested JSON file, I have used the following function code to flatten the data.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql.types import *
from pyspark.sql.functions import *
&amp;nbsp;
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Calling the flatten function&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df_flatten = flatten(df)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This gives the output as follows&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;Processing :reporting_structure Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_allowed_amount_files Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_in_network_files Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure_in_network_files Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;
Processing :reporting_structure_reporting_plans Type : &amp;lt;class 'pyspark.sql.types.ArrayType'&amp;gt;
Processing :reporting_structure_reporting_plans Type : &amp;lt;class 'pyspark.sql.types.StructType'&amp;gt;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;I then tried to display the flattened dataframe df_flatten&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df_flatten.display()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This gives the following error after ~50 minutes of execution&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;FileReadException: Error while reading file mnt/monut/mnt/mount/Anthem/2022-10-11/IndexFile/2022-10-01_anthem_index.json.gz
Caused by: OutOfMemoryError: GC overhead limit exceeded&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="OOM Error"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1038i2530AFBBB6B29A62/image-size/large?v=v2&amp;amp;px=999" role="button" title="OOM Error" alt="OOM Error" /&gt;&lt;/span&gt;If I try to write the data to a ADLS Gen 2 location as delta the spark job runs and after some ~60 minutes it gets failed.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df.write.mode('overwrite').format('delta').save('mnt/monut/mnt/mount/Anthem/2022-10-11/processed/')&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 07 Dec 2022 12:22:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18196#M12030</guid>
      <dc:creator>Sameer_876675</dc:creator>
      <dc:date>2022-12-07T12:22:17Z</dc:date>
    </item>
    <item>
      <title>Re: How to efficiently process a 100GiB JSON nested file and store it in Delta?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18197#M12031</link>
      <description>&lt;P&gt;Do you have access to the data at source? See if you can pull the data down into storage in pieces instead of it being such a huge single file.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;If you have a single JSON file, everything will be allocated to a single core and the decompression is just going to nuke your executor memory - hence the garbage collector not being able to free up space and dying eventually. It won't really matter how big the cluster is either with such a file. Will need to chop that file up into chunks before feeding to spark so the entire cluster can take a small chunk of it and process in parallel.&lt;/P&gt;</description>
      <pubDate>Wed, 07 Dec 2022 13:37:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18197#M12031</guid>
      <dc:creator>AndriusVitkausk</dc:creator>
      <dc:date>2022-12-07T13:37:45Z</dc:date>
    </item>
    <item>
      <title>Re: How to efficiently process a 100GiB JSON nested file and store it in Delta?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18198#M12032</link>
      <description>&lt;P&gt;Hi @Sameer Khalid​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;check this notebook on how to work with nested json : &lt;A href="https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-dataframe.html" target="test_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-dataframe.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;what you need to look at is how to work with:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Struct - col_name.*&lt;/LI&gt;&lt;LI&gt;arrays  - explode&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I might be able to look more closely to the logic of the flatten function later.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 07 Dec 2022 13:41:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18198#M12032</guid>
      <dc:creator>Pat</dc:creator>
      <dc:date>2022-12-07T13:41:25Z</dc:date>
    </item>
    <item>
      <title>Re: How to efficiently process a 100GiB JSON nested file and store it in Delta?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18199#M12033</link>
      <description>&lt;P&gt;Hi Sameer, please refer to following documents on how to work with nested json:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.databricks.com/optimizations/semi-structured.html" target="test_blank"&gt;https://docs.databricks.com/optimizations/semi-structured.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-dataframe.html" target="test_blank"&gt;https://learn.microsoft.com/en-us/azure/databricks/kb/_static/notebooks/scala/nested-json-to-dataframe.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 31 Jan 2023 16:20:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-efficiently-process-a-100gib-json-nested-file-and-store/m-p/18199#M12033</guid>
      <dc:creator>Annapurna_Hiriy</dc:creator>
      <dc:date>2023-01-31T16:20:49Z</dc:date>
    </item>
  </channel>
</rss>

