toPandas() causes IndexOutOfBoundsException in Apache Arrow
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022
10:47 AM
- last edited on
03-21-2025
06:30 AM
by
Advika
Using DBR 10.0
When calling toPandas() the worker fails with IndexOutOfBoundsException. It seems like ArrowWriter.sizeInBytes (which looks like a proprietary method since I can't find it in OSS) calls arrow's getBufferSizeFor which fails with this error. What is the root cause of this issue?
Here's a sample of the full stack trace:
java.lang.IndexOutOfBoundsException: index: 16384, length: 4 (expected: range(0, 16384))
at org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:318)
at org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:305)
at org.apache.arrow.memory.ArrowBuf.getInt(ArrowBuf.java:424)
at org.apache.arrow.vector.complex.BaseRepeatedValueVector.getBufferSizeFor(BaseRepeatedValueVector.java:229)
at org.apache.arrow.vector.complex.ListVector.getBufferSizeFor(ListVector.java:621)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.getSizeInBytes(ArrowWriter.scala:165)
at org.apache.spark.sql.execution.arrow.ArrowWriter.sizeInBytes(ArrowWriter.scala:118)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:224)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1647)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:235)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:199)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
- Labels:
-
Databricks Runtime
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 11:14 AM
@Sergey Ivanychev , I think it's trying to return too much data to pandas and overloading the memory. What are you trying to do? You shouldn't need to use pandas much anymore with the 3.2 introduction of pandas API for Spark https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:17 PM
I'm feeding the DataFrame to the ML model. The `toPandas()` works perfectly fine with `spark.sql.execution.arrow.pyspark.enabled` set to `false`.
But disabling arrow pipeline by pipeline is far from perfect. The error above doesn't explain a lot and the fail occurs in the proprietary code. At this point I don't know where to look for an error
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:21 PM
Weirdly, `getBufferSizeFor` is the cause of the failure. IMO the method with such a name shouldn't cause out of bounds error.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:28 PM
to_pandas() is only for a small dataset.
Please use instead:
to_pandas_on_spark()It is essential to use Pandas on Spark instead of ordinary Pandas so that it will work in a distributed way. Here is more info https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html
So always import Pandas as:
import pyspark.pandas as ps
My blog: https://databrickster.medium.com/
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:33 PM
As I noted, `to_pandas() ` works great with `spark.sql.execution.arrow.pyspark.enabled` set to `false`. I understand that to_pandas_on_spark() is an option, but I need a Pandas DataFrame, not a Pandas-on-Spark DataFrame.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:50 PM
Turning arrow off is going to increase your execution time. It might be better to use something like applyinpandas. You might want to adjust the batch size https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 12:57 PM
Again, I can't use `applyinpandas` because I need to collect data to feed into an ML model. I need a *Pandas dataframe*.
I have enough memory on my driver (turning off arrow makes the code work).
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 01:23 PM
applyinpandas takes a function argument, which can be an ML model.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 01:29 PM
We train an ML model, not apply it. We need to fetch a batch of data as Pandas dataframe and feed it into a model for training.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2022 01:39 PM
Yes, the ml model training is done with a function such as model.fit().
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-20-2022 01:29 AM
I know that. Is my question not clear?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-11-2022 11:28 PM
I have the similar situation.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-19-2022 03:41 PM
This could be a Arrow version mismatch. Do you by chance try to install anything that could install a different arrow version? it can happen indirectly via other libs.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-11-2022 06:36 AM
I am also facing the same issue, I have applied the config: `spark.sql.execution.arrow.pyspark.enabled` set to `false`, but still facing the same issue.
Any Idea, what's going on???. Please help me out....
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 4 times, most recent failure: Lost task 0.3 in stage 39.0 (TID 3789) (10.132.234.41 executor 39): java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2873)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2820)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2814)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2814)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1350)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1350)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1350)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3081)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3022)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3010)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
=== Streaming Query ===
Identifier: [id = 1f85f00f-6e6f-4b42-b178-0fe871f8ec02, runId = 46d257c6-3992-40bc-9353-7d8bb161925c]
Current Committed Offsets: {}