cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

SparkRuntimeException: Sent message larger than max (10701549 vs. 10485760)

calvinchan_iot
Visitor

Hey Databricks team,

I have been facing a weird error when i upgrade to use Unity Catalog. Actually where is the limit 10485760 (10MB) coming from?

I have spark.sql.autoBroadcastJoinThreshold set to -1 already, and I can't find out any other spark configs that have that number (10485760)

Things that i have tried:

  • upgrade the worker type and driver type to rd-fleet.24large , max worker: 128
  • optimize the memory usage of the UDFs

Configs that i have tried:

  • spark.driver.maxResultSize 32000000000
  • spark.network.timeout 300
  • spark.default.parallelism 4096
  • spark.databricks.hive.metastore.client.pool.size 3
  • spark.driver.memory 32g
  • spark.databricks.safespark.externalUDF.plan.limit 1000000
  • spark.rpc.message.maxSize 500
  • spark.databricks.execution.grpc.maxMessageSize 268435456
  • spark.connect.grpc.maxInboundMessageSize 268435456

Full error

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 94.0 failed 4 times, most recent failure: Lost task 0.3 in stage 94.0 (TID 99255) (10.48.124.146 executor 140): org.apache.spark.SparkRuntimeException: [UDF_ERROR.INTERNAL] Execution of function line_string_linear_interp(coordinates#1601) failed with an internal error: RESOURCE_EXHAUSTED: Sent message larger than max (10701549 vs. 10485760) == Stacktrace == at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.createSparkRuntimeException(SafesparkErrorMessages.scala:134) at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.convertToSparkRuntimeException(SafesparkErrorMessages.scala:88) at com.databricks.sql.execution.safespark.ExternalUDFRunner$$anon$2.onError(ExternalUDFRunner.scala:431) at com.databricks.spark.safespark.udf.UDFSession$_TransformerProxy.onError(UDFSession.scala:134) at grpc_shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at grpc_shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) at grpc_shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3694) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3616) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3603) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3603) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1548) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1548) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1548) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3939) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3851) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3839) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1272) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1260) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2961) at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:338) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:282) at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:366) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:363) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:117) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114) at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:553) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545) at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565) at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419) at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3670) at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3661) at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4586) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:965) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4584) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:303) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:533) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:226) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:155) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:482) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4584) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3660) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101) at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:787) at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1112) at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:260) at sun.reflect.GeneratedMethodAccessor635.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkRuntimeException: [UDF_ERROR.INTERNAL] Execution of function line_string_linear_interp(coordinates#1601) failed with an internal error: RESOURCE_EXHAUSTED: Sent message larger than max (10701549 vs. 10485760) == Stacktrace == at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.createSparkRuntimeException(SafesparkErrorMessages.scala:134) at com.databricks.sql.execution.safespark.SafesparkErrorMessages$.convertToSparkRuntimeException(SafesparkErrorMessages.scala:88) at com.databricks.sql.execution.safespark.ExternalUDFRunner$$anon$2.onError(ExternalUDFRunner.scala:431) at com.databricks.spark.safespark.udf.UDFSession$_TransformerProxy.onError(UDFSession.scala:134) at grpc_shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at grpc_shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) at grpc_shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) at grpc_shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

 

 

 

3 REPLIES 3

szymon_dybczak
Contributor III

Hi @calvinchan_iot ,

I think that you have a problem with udf function -> line_string_linear_interp.
I suspect the limit you've encountered is due to serialization issues of that udf. You exceeded max size of rpc message. 

You can try to get current rpc memory size using below command:

spark.conf.get("spark.rpc.message.maxSize")

And then try to increase the value:

spark.conf.set("spark.rpc.message.maxSize", "500")



calvinchan_iot
Visitor

Hi @szymon_dybczak , i did but the problem persisits

This 500 was just for an example. The first code should return your current setting and then you should set maxSize accordingly. 

Anyway, is it possible to attach code of this UDF?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group