- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 07:39 AM
On Databricks, we use the following code to flatten JSON in Python. The data is from a REST API:
```
df = spark.read.format("json").option("header", "true").option("multiline", "true").load(SourceFileFolder + sourcetable + "*.json")
df2 = df.select(psf.explode('value').alias('tmp')).select('tmp.*')
df2.write.format("delta").save(DeltaLakeFolder)
```
We don't know the schema's as they change so it is as generic as possible. However, as the json files grow above 2.8GB, I now see the following error:
```
Caused by: java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 168 because the size after growing exceeds size limitation 2147483632
```
The json is like this:
```
{
"@odata.context": "RANDOMSTRING)",
"value": [
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
}
]
}
```
How can I resolve this or work around this?
Thanks in advance!
Kind regards,
Dennis
- Labels:
-
Azure databricks
-
JSON
-
Python
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 10:46 AM
So for instance:
from pyspark.sql import Row
from pyspark.sql.types import *
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
def read_process_json(iterator):
import pandas as pd
def process_from_path(path):
rawJSON = pd.io.json.read_json(path)
return pd.json_normalize(rawJSON['value'])
for pdf in iterator:
DFseries = pdf["file"].apply(process_from_path).tolist()
yield pd.concat(DFseries)
outSchema = StructType([
StructField("COL1",StringType(),True),
StructField("COL2",StringType(),True),
StructField("COL3",StringType(),True),
StructField("COL4",StringType(),True),
StructField("COL5",StringType(),True),
StructField("COL6",StringType(),True),
StructField("COL8",StringType(),True),
StructField("COL9",StringType(),True)
])
display(df.mapInPandas(read_process_json, schema=outSchema))
Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:06 AM
if you use the multiline = true, the file will be read as a whole
(see https://docs.databricks.com/data/data-sources/read-json.html).
So if you can, try with single-line.
If that is not possible, you can try to split the json file on the storage, or use larger workers/driver.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:18 AM
Thank you for your quick response!
I tried with single line, but that resulted in Caused by: java.io.IOException: Too many bytes before newline: 2147483648
I am currently on 28GB/4core with 8/nodes 1 driver. What would increase the buffer in my case?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:26 AM
I've edited the Json as it also contains a single string at the start of the file. I think that's why the single mode doesn't work.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:36 AM
If that string is too big, you will run into problems indeed.
Also keep in mind that each line must contain a separate, self-contained valid JSON object.
(https://spark.apache.org/docs/3.2.0/sql-data-sources-json.html#content)
So you might have to do some editing first.
Here someone also posted a possible solution to edit the json.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:56 AM
You can also just read it as text file to memory RDD and than read that RDD as json. During reading as text file is easy to set partitioning. so it will be not one big object but for example 8:
rdd = sc.textFile(sourceFile, 8)
df = spark.read.json(rdd)
If there are problems with json text you can clean it using map:
rdd = sc.textFile(sourceFile, 8).map(your lambda function)
df = spark.read.json(rdd)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 09:02 AM
Hi , thank you for your response!
I'll give this a try!
Thanks!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 09:14 AM
With the following code:
rdd = sc.textFile(SourceFileFolder + sourcetable + "*.json", 8)
df = spark.read.option("header", "true").option("multiline", "true").json(rdd)
I am getting
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 7.0 failed 4 times, most recent failure: Lost task 2.3 in stage 7.0 (TID 113) (10.139.64.6 executor 8): java.io.IOException: Too many bytes before newline: 2147483648
Am i missing something?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:57 AM
@Dennis Dā , what's happening here is that more than 2 GB (2147483648 bytes) is being loaded into a single column value. This is a hard-limit for serialization. This KB article addresses it. The solution would be to find some way to have this loaded into different column values. I'll poke around and see if I can figure out a nice way to code it. Will respond if I find something, otherwise you'll need to take werner's approach and edit the incoming JSON.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 08:59 AM
Hi , thanks for the response!
What i do is first load the full json (with the structure above), and then explode the value part.
Would it be possible to instantly load and explode it into the dataframe so we never reach that limit in a column?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 09:43 AM
hm, you might be able to do something like this using a UDF. That would force the read and explode to happen on one node. But using a UDF would require you to know the output schema. I think the best bet is to set up some sort of input JSON parser. like SAX. I haven't done that before.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 09:46 AM
I do know the output schema, but theres loads of files so I like to avoid having to create schema's for each of them. Would you be able to provide me with a small sample based on the schema in my original post? I am not familiar with UDF.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 10:46 AM
So for instance:
from pyspark.sql import Row
from pyspark.sql.types import *
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
def read_process_json(iterator):
import pandas as pd
def process_from_path(path):
rawJSON = pd.io.json.read_json(path)
return pd.json_normalize(rawJSON['value'])
for pdf in iterator:
DFseries = pdf["file"].apply(process_from_path).tolist()
yield pd.concat(DFseries)
outSchema = StructType([
StructField("COL1",StringType(),True),
StructField("COL2",StringType(),True),
StructField("COL3",StringType(),True),
StructField("COL4",StringType(),True),
StructField("COL5",StringType(),True),
StructField("COL6",StringType(),True),
StructField("COL8",StringType(),True),
StructField("COL9",StringType(),True)
])
display(df.mapInPandas(read_process_json, schema=outSchema))
Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-20-2021 10:48 AM
Also- this is what I used to load the data on disk:
dbutils.fs.put("/tmp/comm_q_json.json",
"""
{
"@odata.context": "RANDOMSTRING)",
"value": [
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
}
]
}
""", True)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
ā10-21-2021 01:19 AM
Hi!
I tried your solution and it works for most table.s Thanks!
I tried another table, but i get :
ValueError: Unexpected character found when decoding object value'
I am not sure how to solve this as i can't figure out where in the JSON. Is there a way to automatically solve it?