- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-16-2020 06:18 PM
I am having trouble efficiently reading & parsing in a large number of stream files in Pyspark!
ContextHere is the schema of the stream file that I am reading in JSON. Blank spaces are edits for confidentiality purposes.
root
|-- location_info: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- restaurant_type: string (nullable = true)
| | |
| | |
| | |-- other_data: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- other_data_1 string (nullable = true)
| | | | |-- other_data_2: string (nullable = true)
| | | | |-- other_data_3: string (nullable = true)
| | | | |-- other_data_4: string (nullable = true)
| | | | |-- other_data_5: string (nullable = true)
| | |
| | |-- latitude: string (nullable = true)
| | |-- longitude: string (nullable = true)
| | |-- timezone: string (nullable = true)
|-- restaurant_id: string (nullable = true)
Current Method of Reading & Parsing (which works but takes TOO long)
- Although the following method works and is itself a solution to even getting started reading in the files, this method takes very long when the number of files increases in the thousands
- Each file size is around 10MB
- The files are essential "stream" files and have names like this
. Therefore I read it in as a JSON in Pyspark (not sure what else I would read it in as anyway?)s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4
- If you notice I call for replacing the restaurant_id with '\n{"restaurant_id', this is because if I don't then the read operation only reads in the first record in the file, and ignores the other contents...
```
# Reading multiple files in the dir source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*").values().flatMap(lambda x: x .replace('{"restaurant_id','\n{"restaurant_id').split('\n')))# explode here to have restaurant_id, and nested data exploded_source_df_1 = source_df_1.select(col('restaurant_id'), explode(col('location_info')).alias('location_info'))# Via SQL operation : this will solve the problem for parsing exploded_source_df_1.createOrReplaceTempView('result_1')subset_data_1 = spark.sql(''' SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone from result_1 ''')
```
Things I would love help with:- (1) Is there a faster way I can read this in?
- (2) If I try and cache / persist the data frame, when would I be able to do it since it seems like the
is an action in itself so if I call persist() at the end it seems to redo the entire read?.values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' )
You can refer to this thread as to how I arrived at this solution in the first place: link. Thank you very much for your time
- Labels:
-
Nested json
-
Stream
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-21-2022 11:00 AM
I'm interested in seeing what others have come up with. Currently I'm using Json. normalize() then taking any additional nested statements and using a loop to pull them out -> re-combine them.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-21-2022 11:00 AM
I'm interested in seeing what others have come up with. Currently I'm using Json. normalize() then taking any additional nested statements and using a loop to pull them out -> re-combine them.

