cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

BufferHolder Exceeded on Json flattening

D3nnisd
New Contributor III

On Databricks, we use the following code to flatten JSON in Python. The data is from a REST API:

```

df = spark.read.format("json").option("header", "true").option("multiline", "true").load(SourceFileFolder + sourcetable + "*.json")

df2 = df.select(psf.explode('value').alias('tmp')).select('tmp.*') 

 df2.write.format("delta").save(DeltaLakeFolder)

```

We don't know the schema's as they change so it is as generic as possible. However, as the json files grow above 2.8GB, I now see the following error:

```

Caused by: java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 168 because the size after growing exceeds size limitation 2147483632

``` 

The json is like this:

```

{

"@odata.context": "RANDOMSTRING)",

"value": [

{

"COL1": null,

"COL2": "VAL2",

"COL3": "VAL3",

"COL4": "VAL4",

"COL5": "VAL5",

"COL6": "VAL6",

"COL8": "VAL7",

"COL9": null

},

{

"COL1": null,

"COL2": "VAL2",

"COL3": "VAL3",

"COL4": "VAL4",

"COL5": "VAL5",

"COL6": "VAL6",

"COL8": "VAL7",

"COL9": null

},

{

"COL1": null,

"COL2": "VAL2",

"COL3": "VAL3",

"COL4": "VAL4",

"COL5": "VAL5",

"COL6": "VAL6",

"COL8": "VAL7",

"COL9": null

}

]

}

```

How can I resolve this or work around this?

Thanks in advance!

Kind regards,

Dennis

1 ACCEPTED SOLUTION

Accepted Solutions

Dan_Z
Honored Contributor
Honored Contributor

So for instance:

from pyspark.sql import Row
from pyspark.sql.types import *
 
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
 
def read_process_json(iterator):
  import pandas as pd
  
  def process_from_path(path):
    rawJSON = pd.io.json.read_json(path)
    return pd.json_normalize(rawJSON['value'])
  
  for pdf in iterator:
    DFseries = pdf["file"].apply(process_from_path).tolist()
    yield pd.concat(DFseries)
    
outSchema = StructType([
  StructField("COL1",StringType(),True),
  StructField("COL2",StringType(),True),
  StructField("COL3",StringType(),True),
  StructField("COL4",StringType(),True),
  StructField("COL5",StringType(),True),
  StructField("COL6",StringType(),True),
  StructField("COL8",StringType(),True),
  StructField("COL9",StringType(),True)
])
 
display(df.mapInPandas(read_process_json, schema=outSchema))

Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.

View solution in original post

15 REPLIES 15

-werners-
Esteemed Contributor III

if you use the multiline = true, the file will be read as a whole

(see https://docs.databricks.com/data/data-sources/read-json.html).

So if you can, try with single-line.

If that is not possible, you can try to split the json file on the storage, or use larger workers/driver.

D3nnisd
New Contributor III

Thank you for your quick response!

I tried with single line, but that resulted in Caused by: java.io.IOException: Too many bytes before newline: 2147483648

I am currently on 28GB/4core with 8/nodes 1 driver. What would increase the buffer in my case?

D3nnisd
New Contributor III

I've edited the Json as it also contains a single string at the start of the file. I think that's why the single mode doesn't work.

-werners-
Esteemed Contributor III

If that string is too big, you will run into problems indeed.

Also keep in mind that each line must contain a separate, self-contained valid JSON object.

(https://spark.apache.org/docs/3.2.0/sql-data-sources-json.html#content)

So you might have to do some editing first.

Here someone also posted a possible solution to edit the json.

Hubert-Dudek
Esteemed Contributor III

You can also just read it as text file to memory RDD and than read that RDD as json. During reading as text file is easy to set partitioning. so it will be not one big object but for example 8:

rdd = sc.textFile(sourceFile, 8)
df = spark.read.json(rdd)

If there are problems with json text you can clean it using map:

rdd = sc.textFile(sourceFile, 8).map(your lambda function)
df = spark.read.json(rdd)

D3nnisd
New Contributor III

Hi , thank you for your response!

I'll give this a try!

Thanks!

D3nnisd
New Contributor III

With the following code:

rdd = sc.textFile(SourceFileFolder + sourcetable + "*.json", 8)
 
df = spark.read.option("header", "true").option("multiline", "true").json(rdd)

I am getting

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 7.0 failed 4 times, most recent failure: Lost task 2.3 in stage 7.0 (TID 113) (10.139.64.6 executor 8): java.io.IOException: Too many bytes before newline: 2147483648

Am i missing something?

Dan_Z
Honored Contributor
Honored Contributor

@Dennis D​ , what's happening here is that more than 2 GB (2147483648 bytes) is being loaded into a single column value. This is a hard-limit for serialization. This KB article addresses it. The solution would be to find some way to have this loaded into different column values. I'll poke around and see if I can figure out a nice way to code it. Will respond if I find something, otherwise you'll need to take werner's approach and edit the incoming JSON.

D3nnisd
New Contributor III

Hi , thanks for the response!

What i do is first load the full json (with the structure above), and then explode the value part.

Would it be possible to instantly load and explode it into the dataframe so we never reach that limit in a column?

Dan_Z
Honored Contributor
Honored Contributor

hm, you might be able to do something like this using a UDF. That would force the read and explode to happen on one node. But using a UDF would require you to know the output schema. I think the best bet is to set up some sort of input JSON parser. like SAX. I haven't done that before.

D3nnisd
New Contributor III

I do know the output schema, but theres loads of files so I like to avoid having to create schema's for each of them. Would you be able to provide me with a small sample based on the schema in my original post? I am not familiar with UDF.

Dan_Z
Honored Contributor
Honored Contributor

So for instance:

from pyspark.sql import Row
from pyspark.sql.types import *
 
file = [Row(file="/dbfs/tmp/comm_q_json.json")]
df = spark.createDataFrame(file, ("file: String"))
 
def read_process_json(iterator):
  import pandas as pd
  
  def process_from_path(path):
    rawJSON = pd.io.json.read_json(path)
    return pd.json_normalize(rawJSON['value'])
  
  for pdf in iterator:
    DFseries = pdf["file"].apply(process_from_path).tolist()
    yield pd.concat(DFseries)
    
outSchema = StructType([
  StructField("COL1",StringType(),True),
  StructField("COL2",StringType(),True),
  StructField("COL3",StringType(),True),
  StructField("COL4",StringType(),True),
  StructField("COL5",StringType(),True),
  StructField("COL6",StringType(),True),
  StructField("COL8",StringType(),True),
  StructField("COL9",StringType(),True)
])
 
display(df.mapInPandas(read_process_json, schema=outSchema))

Here we rely on the pandas API to do the JSON wrangling. I'm not 100% sure it would work, but who knows. You might have to tweak some configs if the results are too large to serialize.

Dan_Z
Honored Contributor
Honored Contributor

Also- this is what I used to load the data on disk:

dbutils.fs.put("/tmp/comm_q_json.json",
"""
{
"@odata.context": "RANDOMSTRING)",
"value": [
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
},
{
"COL1": null,
"COL2": "VAL2",
"COL3": "VAL3",
"COL4": "VAL4",
"COL5": "VAL5",
"COL6": "VAL6",
"COL8": "VAL7",
"COL9": null
}
]
}
""", True)

D3nnisd
New Contributor III

Hi!

I tried your solution and it works for most table.s Thanks!

I tried another table, but i get :

ValueError: Unexpected character found when decoding object value'

I am not sure how to solve this as i can't figure out where in the JSON. Is there a way to automatically solve it?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.