cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark tasks getting stick on one executor

paranoid_jvm
New Contributor II

Hi All,

I am running a Spark job using cluster with 8 executor with 8 cores each. The job involves execution of UDF. The job processes rows in few 100 thousands. When I run the job, each executor is assigned 8 job each. Usually the job succeeds in less than 10 minutes. But intermittently it gets stuck for up to two hours. When I analyzed further, I found out than 7 out of 8 executor completes the tasks in few minutes. But all 8 tasks assigned to one executors gets stuck. All 8 thread all in TIMED_WAITING state. They wait for almost 8 ours and then fails. Then they gets restarted and in next attempt completes in minutes. During this time, CPU utilization is close to zero. Which means executor is not doing anything else. Other cluster metrics like memory utilization and shuffle read/writing etc. are all normal and no spike in any of the metrics. I have also tried various repartitioning to rule out the data skew issue.

Below are the logs from executor:

privateLog: "Executor task launch worker for task 1.0 in stage 16015.0 (TID 77452)" #4774 TIMED_WAITING holding [Lock(java.util.concurrent.ThreadPoolExecutor$Worker@921636861})]
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1332)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:48)
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:462)
com.databricks.sql.execution.safespark.FutureUtils$.cooperativeAwaitResult(FutureUtils.scala:44)
com.databricks.sql.execution.safespark.EvalExternalUDFExec.$anonfun$awaitBatchResult$2(EvalExternalUDFExec.scala:231)
com.databricks.sql.execution.safespark.EvalExternalUDFExec$$Lambda$4071/1987843549.apply(Unknown Source)
com.databricks.sql.execution.safespark.SafesparkErrorMessages$.scrollToLocalizedException(SafesparkErrorMessages.scala:39)
com.databricks.sql.execution.safespark.EvalExternalUDFExec.$anonfun$awaitBatchResult$1(EvalExternalUDFExec.scala:231)
com.databricks.sql.execution.safespark.EvalExternalUDFExec$$Lambda$4070/1485419994.apply(Unknown Source)
com.databricks.sql.execution.safespark.EvalExternalUDFExec.withDurationMs(EvalExternalUDFExec.scala:245)
com.databricks.sql.execution.safespark.EvalExternalUDFExec.awaitBatchResult(EvalExternalUDFExec.scala:230)
com.databricks.sql.execution.safespark.EvalExternalUDFExec.$anonfun$doExecute$12(EvalExternalUDFExec.scala:199)
com.databricks.sql.execution.safespark.EvalExternalUDFExec$$Lambda$3888/679008648.apply(Unknown Source)
scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
com.databricks.sql.execution.safespark.VectorSchemaRootConverter$$anon$1.hasNext(VectorSchemaRootConverter.scala:77)
scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:886)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:120)
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$2.hasNext(InMemoryRelation.scala:287)
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:227)
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:312)
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1654)
org.apache.spark.storage.BlockManager$$Lambda$1050/461739670.apply(Unknown Source)
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1581)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1645)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1438)
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1392)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:422)
org.apache.spark.rdd.RDD.iterator(RDD.scala:372)
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
org.apache.spark.scheduler.ResultTask$$Lambda$1227/1122518805.apply(Unknown Source)
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
org.apache.spark.scheduler.ResultTask$$Lambda$1203/848194627.apply(Unknown Source)
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
org.apache.spark.scheduler.Task$$Lambda$1175/1027670073.apply(Unknown Source)
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
com.databricks.unity.HandleImpl$$Lambda$1176/1761608791.apply(Unknown Source)
scala.util.Using$.resource(Using.scala:269)
com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
org.apache.spark.scheduler.Task$$Lambda$1129/1931909925.apply(Unknown Source)
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:900)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$1124/516512895.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:903)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$1089/1829229717.apply$mcV$sp(Unknown Source)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:798)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @paranoid_jvm

  • Timeout exceptions can occur when the executor is under memory constraint or facing out-of-memory (OOM) issues while processing data. This can impact the garbage collection process, causing further delays.
  • Consider increasing the executor memory to provide more resources for your UDF execution. Additionally, fine-tune the configuration between executor memory and driver memory1.
  • You might also want to reduce the number of executors and allocate less memory initially (e.g., start with 4GB per executor) to see if that improves stability.
  • If your Spark application has dynamic allocation enabled, it requests additional executors when there are pending tasks waiting to be scheduled.
  • Ensure that the existing set of executors is sufficient to simultaneously handle all submitted but u...
  • Since you’ve already ruled out data skew issues through various repartitioning attempts, focus on shuffling.
  • Check if any specific partitions are causing excessive shuffling. You can use Spark UI to analyze shuffle read/write metrics.
  • Consider optimizing your UDF to minimize shuffling, especially if it involves large data transfers between partitions.
  • You mentioned that 7 out of 8 executors complete tasks quickly, while the remaining executor gets stuck.
  • Verify the distribution of tasks across executors. It’s possible that the stuck executor is handling more tasks than others.
  • Also, check if the stuck executor is handling partitions that require shuffling. Adjust the number of cores per executor if needed.
  • Monitor the Spark UI during job execution to identify any anomalies or bottlenecks.
  • Look for patterns in the stuck executor’s behaviour, such as specific stages or tasks.
  • Collect additional logs and metrics to diagnose the issue further.

Feel free to explore these suggestions, and let me know if you need further assistance! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group