How to Efficiently Read Nested JSON in PySpark?

DarshilDesai
New Contributor II

I am having trouble efficiently reading & parsing in a large number of stream files in Pyspark!

Context

Here is the schema of the stream file that I am reading in JSON. Blank spaces are edits for confidentiality purposes.

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |-- longitude: string (nullable = true)
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)
Current Method of Reading & Parsing (which works but takes TOO long)
  • Although the following method works and is itself a solution to even getting started reading in the files, this method takes very long when the number of files increases in the thousands
  • Each file size is around 10MB
  • The files are essential "stream" files and have names like this
    s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4
    . Therefore I read it in as a JSON in Pyspark (not sure what else I would read it in as anyway?)
    • If you notice I call for replacing the restaurant_id with '\n{"restaurant_id', this is because if I don't then the read operation only reads in the first record in the file, and ignores the other contents...

```

# Reading multiple files in the dir source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*").values().flatMap(lambda x: x .replace('{"restaurant_id','\n{"restaurant_id').split('\n')))# explode here to have restaurant_id, and nested data exploded_source_df_1 = source_df_1.select(col('restaurant_id'), explode(col('location_info')).alias('location_info'))# Via SQL operation : this will solve the problem for parsing exploded_source_df_1.createOrReplaceTempView('result_1')

subset_data_1 = spark.sql(''' SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone from result_1 ''')

```

Things I would love help with:
  • (1) Is there a faster way I can read this in?
  • (2) If I try and cache / persist the data frame, when would I be able to do it since it seems like the
    .values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' )
    is an action in itself so if I call persist() at the end it seems to redo the entire read?

You can refer to this thread as to how I arrived at this solution in the first place: link. Thank you very much for your time

Chris_Shehu
Valued Contributor III

I'm interested in seeing what others have come up with. Currently I'm using Json. normalize() then taking any additional nested statements and using a loop to pull them out -> re-combine them.

View solution in original post