SparkRuntimeException: Sent message larger than max (10701549 vs. 10485760)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-21-2024 06:14 AM
Hey Databricks team,
I have been facing a weird error when i upgrade to use Unity Catalog. Actually where is the limit 10485760 (10MB) coming from?
I have spark.sql.autoBroadcastJoinThreshold set to -1 already, and I can't find out any other spark configs that have that number (10485760)
Things that i have tried:
- upgrade the worker type and driver type to rd-fleet.24large , max worker: 128
- optimize the memory usage of the UDFs
Configs that i have tried:
- spark.driver.maxResultSize 32000000000
- spark.network.timeout 300
- spark.default.parallelism 4096
- spark.databricks.hive.metastore.client.pool.size 3
- spark.driver.memory 32g
- spark.databricks.safespark.externalUDF.plan.limit 1000000
- spark.rpc.message.maxSize 500
- spark.databricks.execution.grpc.maxMessageSize 268435456
- spark.connect.grpc.maxInboundMessageSize 268435456
Full error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 94.0 failed 4 times, most recent failure: Lost task 0.3 in stage 94.0 (TID 99255) (10.48.124.146 executor 140): org.apache.spark.SparkRuntimeException: [UDF_ERROR.INTERNAL] Execution of function line_string_linear_interp(coordinates#1601) failed with an internal error: RESOURCE_EXHAUSTED: Sent message larger than max (10701549 vs. 10485760) == Stacktrace == at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.createSparkRuntimeException(SafesparkErrorMessages.scala:134) at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.convertToSparkRuntimeException(SafesparkErrorMessages.scala:88) at com.databricks.sql.execution.safespark.ExternalUDFRunner$$anon$2.onError(ExternalUDFRunner.scala:431) at com.databricks.spark.safespark.udf.UDFSession$_TransformerProxy.onError(UDFSession.scala:134) at grpc_shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at grpc_shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) at grpc_shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3694) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3616) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3603) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3603) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1548) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1548) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1548) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3939) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3851) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3839) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1272) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1260) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2961) at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:338) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:282) at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:366) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:363) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:117) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3670) at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3661) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4586) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:965) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4584) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:303) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:533) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:226) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:155) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:482) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4584) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3660) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:787) at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1112) at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:260) at sun.reflect.GeneratedMethodAccessor635.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkRuntimeException: [UDF_ERROR.INTERNAL] Execution of function line_string_linear_interp(coordinates#1601) failed with an internal error: RESOURCE_EXHAUSTED: Sent message larger than max (10701549 vs. 10485760) == Stacktrace == at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.createSparkRuntimeException(SafesparkErrorMessages.scala:134) at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.convertToSparkRuntimeException(SafesparkErrorMessages.scala:88) at com.databricks.sql.execution.safespark.ExternalUDFRunner$$anon$2.onError(ExternalUDFRunner.scala:431) at com.databricks.spark.safespark.udf.UDFSession$_TransformerProxy.onError(UDFSession.scala:134) at grpc_shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at grpc_shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) at grpc_shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-21-2024 07:00 AM
Hi @calvinchan_iot ,
I think that you have a problem with udf function -> line_string_linear_interp.
I suspect the limit you've encountered is due to serialization issues of that udf. You exceeded max size of rpc message.
You can try to get current rpc memory size using below command:
spark.conf.get("spark.rpc.message.maxSize")
And then try to increase the value:
spark.conf.set("spark.rpc.message.maxSize", "500")
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-21-2024 08:43 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-21-2024 02:18 PM
This 500 was just for an example. The first code should return your current setting and then you should set maxSize accordingly.
Anyway, is it possible to attach code of this UDF?

