04-19-2022 10:47 AM
Using DBR 10.0
When calling toPandas() the worker fails with IndexOutOfBoundsException. It seems like ArrowWriter.sizeInBytes (which looks like a proprietary method since I can't find it in OSS) calls arrow's getBufferSizeFor which fails with this error. What is the root cause of this issue?
Here's a sample of the full stack trace:
java.lang.IndexOutOfBoundsException: index: 16384, length: 4 (expected: range(0, 16384))
at org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:318)
at org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:305)
at org.apache.arrow.memory.ArrowBuf.getInt(ArrowBuf.java:424)
at org.apache.arrow.vector.complex.BaseRepeatedValueVector.getBufferSizeFor(BaseRepeatedValueVector.java:229)
at org.apache.arrow.vector.complex.ListVector.getBufferSizeFor(ListVector.java:621)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.getSizeInBytes(ArrowWriter.scala:165)
at org.apache.spark.sql.execution.arrow.ArrowWriter.sizeInBytes(ArrowWriter.scala:118)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.$anonfun$next$1(ArrowConverters.scala:224)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1647)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:235)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.next(ArrowConverters.scala:199)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
04-19-2022 11:14 AM
@Sergey Ivanychev , I think it's trying to return too much data to pandas and overloading the memory. What are you trying to do? You shouldn't need to use pandas much anymore with the 3.2 introduction of pandas API for Spark https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
04-19-2022 12:17 PM
I'm feeding the DataFrame to the ML model. The `toPandas()` works perfectly fine with `spark.sql.execution.arrow.pyspark.enabled` set to `false`.
But disabling arrow pipeline by pipeline is far from perfect. The error above doesn't explain a lot and the fail occurs in the proprietary code. At this point I don't know where to look for an error
04-19-2022 12:21 PM
Weirdly, `getBufferSizeFor` is the cause of the failure. IMO the method with such a name shouldn't cause out of bounds error.
04-19-2022 12:28 PM
to_pandas() is only for a small dataset.
Please use instead:
to_pandas_on_spark()
It is essential to use Pandas on Spark instead of ordinary Pandas so that it will work in a distributed way. Here is more info https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html
So always import Pandas as:
import pyspark.pandas as ps
04-19-2022 12:33 PM
As I noted, `to_pandas() ` works great with `spark.sql.execution.arrow.pyspark.enabled` set to `false`. I understand that to_pandas_on_spark() is an option, but I need a Pandas DataFrame, not a Pandas-on-Spark DataFrame.
04-19-2022 12:50 PM
Turning arrow off is going to increase your execution time. It might be better to use something like applyinpandas. You might want to adjust the batch size https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#setting-arrow-batch-size
04-19-2022 12:57 PM
Again, I can't use `applyinpandas` because I need to collect data to feed into an ML model. I need a *Pandas dataframe*.
I have enough memory on my driver (turning off arrow makes the code work).
04-19-2022 01:23 PM
applyinpandas takes a function argument, which can be an ML model.
04-19-2022 01:29 PM
We train an ML model, not apply it. We need to fetch a batch of data as Pandas dataframe and feed it into a model for training.
04-19-2022 01:39 PM
Yes, the ml model training is done with a function such as model.fit().
04-20-2022 01:29 AM
I know that. Is my question not clear?
08-11-2022 11:28 PM
I have the similar situation.
09-19-2022 03:41 PM
This could be a Arrow version mismatch. Do you by chance try to install anything that could install a different arrow version? it can happen indirectly via other libs.
12-11-2022 06:36 AM
I am also facing the same issue, I have applied the config: `spark.sql.execution.arrow.pyspark.enabled` set to `false`, but still facing the same issue.
Any Idea, what's going on???. Please help me out....
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 4 times, most recent failure: Lost task 0.3 in stage 39.0 (TID 3789) (10.132.234.41 executor 39): java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2873)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2820)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2814)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2814)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1350)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1350)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1350)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3081)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3022)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3010)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.IndexOutOfBoundsException: index: 2147483640, length: 174 (expected: range(0, 2147483648))
at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:699)
at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:890)
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1087)
at org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:287)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:151)
at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:105)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
=== Streaming Query ===
Identifier: [id = 1f85f00f-6e6f-4b42-b178-0fe871f8ec02, runId = 46d257c6-3992-40bc-9353-7d8bb161925c]
Current Committed Offsets: {}
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group