cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

toPandas() causes IndexOutOfBoundsException in Apache Arrow

ivanychev
Contributor II

Using DBR 10.0

When calling toPandas() the worker fails with IndexOutOfBoundsException. It seems like ArrowWriter.sizeInBytes (which looks like a proprietary method since I can't find it in OSS) calls arrow's getBufferSizeFor which fails with this error. What is the root cause of this issue?

Here's a sample of the full stack trace:

java.lang.IndexOutOfBoundsException: index: 16384, length: 4 (expected: range(0, 16384))
at org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:318)
at org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:305)
at org.apache.arrow.memory.ArrowBuf.getInt(ArrowBuf.java:424)
at org.apache.arrow.vector.complex.BaseRepeatedValueVector.getBufferSizeFor(BaseRepeatedValueVector.java:229)
at org.apache.arrow.vector.complex.ListVector.getBufferSizeFor(ListVector.java:621)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.getSizeInBytes(ArrowWriter.scala:165)
at org.apache.spark.sql.execution.arrow.ArrowWriter.sizeInBytes(ArrowWriter.scala:118)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:224)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1647)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:235)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:199)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)

Sergey
14 REPLIES 14

Anonymous
Not applicable

@Sergey Ivanychev​ , I think it's trying to return too much data to pandas and overloading the memory. What are you trying to do? You shouldn't need to use pandas much anymore with the 3.2 introduction of pandas API for Spark https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

I'm feeding the DataFrame to the ML model. The `toPandas()` works perfectly fine with `spark.sql.execution.arrow.pyspark.enabled` set to `false`.

But disabling arrow pipeline by pipeline is far from perfect. The error above doesn't explain a lot and the fail occurs in the proprietary code. At this point I don't know where to look for an error

Sergey

Weirdly, `getBufferSizeFor` is the cause of the failure. IMO the method with such a name shouldn't cause out of bounds error.

Sergey

Hubert-Dudek
Esteemed Contributor III

to_pandas() is only for a small dataset.

Please use instead:

to_pandas_on_spark()

It is essential to use Pandas on Spark instead of ordinary Pandas so that it will work in a distributed way. Here is more info https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html

So always import Pandas as:

import pyspark.pandas as ps

As I noted, `to_pandas() ` works great with `spark.sql.execution.arrow.pyspark.enabled` set to `false`. I understand that to_pandas_on_spark() is an option, but I need a Pandas DataFrame, not a Pandas-on-Spark DataFrame.

Sergey

Anonymous
Not applicable

Turning arrow off is going to increase your execution time. It might be better to use something like applyinpandas. You might want to adjust the batch size https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size

Again, I can't use `applyinpandas` because I need to collect data to feed into an ML model. I need a *Pandas dataframe*.

I have enough memory on my driver (turning off arrow makes the code work).

Sergey

Anonymous
Not applicable

applyinpandas takes a function argument, which can be an ML model.

We train an ML model, not apply it. We need to fetch a batch of data as Pandas dataframe and feed it into a model for training.

Sergey

Anonymous
Not applicable

Yes, the ml model training is done with a function such as model.fit().

I know that. Is my question not clear?

Sergey

Anonymous
Not applicable

I have the similar situation. 

sean_owen
Honored Contributor II

This could be a Arrow version mismatch. Do you by chance try to install anything that could install a different arrow version? it can happen indirectly via other libs.

vikas_ahlawat
New Contributor II

I am also facing the same issue, I have applied the config: `spark.sql.execution.arrow.pyspark.enabled` set to `false`, but still facing the same issue.

Any Idea, what's going on???. Please help me out....

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 4 times, most recent failure: Lost task 0.3 in stage 39.0 (TID 3789) (10.132.234.41 executor 39): java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
	at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
	at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
	at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
	at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
	at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
	at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
 
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2873)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2820)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2814)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2814)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1350)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1350)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1350)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3081)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3022)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3010)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
	at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
	at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
	at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
	at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
	at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
	at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
 
 
=== Streaming Query ===
Identifier: [id = 1f85f00f-6e6f-4b42-b178-0fe871f8ec02, runId = 46d257c6-3992-40bc-9353-7d8bb161925c]
Current Committed Offsets: {}

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group