cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ExecutorLostFailure: Remote RPC Client Disassociated

McKayHarris
New Contributor II

This is an expensive and long-running job that gets about halfway done before failing. The stack trace is included below, but here is the salient part:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4881 in stage 1.0 failed 4 times, most recent failure: Lost task 4881.3 in stage 1.0 (TID 7305, 10.37.129.129): ExecutorLostFailure (executor 116 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

This job has been running fine for months up to this point. I have tried increasing the node size, but still receiving this error. I saw from a Google search that this might be YARN not having enough provisioned memory, but I was under the impression that it was all configured under the hood with Databricks. Should I try tweaking these values? If so which?

Here is the full stack-trace:

---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-b42178413d3f> in <module>()    254     )
    255--> 256final_data.write         .format('com.databricks.spark.redshift').option('preactions', delete_stmt).option('url', REDSHIFT_URL).option('dbtable', load_table).option('tempdir', REDSHIFT_TEMPDIR +'/courses_monthly').mode("append").save()/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)    528             self.format(format)    529if path is None:--> 530self._jwrite.save()    531else:    532             self._jwrite.save(path)/databricks/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)    931         answer = self.gateway_client.send_command(command)    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934    935for temp_arg in temp_args:/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)     61def deco(*a,**kw):     62try:---> 63return f(*a,**kw)     64except py4j.protocol.Py4JJavaError as e:     65             s = e.java_exception.toString()/databricks/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)    310                 raise Py4JJavaError(
    311"An error occurred while calling {0}{1}{2}.\n".--> 312                     format(target_id, ".", name), value)
    313else:    314                 raise Py4JError(

 

Py4JJavaError: An error occurred while calling o305.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:511)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
    at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:278)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:346)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4881 in stage 1.0 failed 4 times, most recent failure: Lost task 4881.3 in stage 1.0 (TID 7305, 10.37.129.129): ExecutorLostFailure (executor 116 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1452)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1440)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1439)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1439)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1665)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1881)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1901)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
    ... 34 more 

17 REPLIES 17

Bill_Chambers
Contributor II

Hey McKay, there could be bad import data. Databricks would not have changed anything about this job, is it possible someone changed the settings or that there was a spot price spike during the last run?

@Bill Chambers​  Hey, the only thing I changed was improving some of the date time logic used to specify which directory to pull the data from. And it has failed the same way across 6-7 runs so it's not spot price spikes, any suggestions on what to try?

niravshah3
New Contributor II

I am seeing similar failure for my job. Its frustating no solution so far. Seems like some bug with Databricks as everything works with EMR.

I took a look at your other post about a "similar" problem, and the other issues appears to be library related and incompatible scala versions being used. I'd recommend looking over your configurations and understanding the versions of libraries in use.

Braj259
New Contributor II

I am getting the same Error as well. My job run for very long and then fails throwing the same error. I am using spark 2.0.0 and EMR 5.0.0. I looked everywhere no solution.

NithinAP
New Contributor II

I got this resolved removing the caching of dataframes at various stages @McKay Harris​ 

AntonBaranau
New Contributor II

Got similar with Runtime 5.5 LTS with spark 2.4.3 when calling some spacy nlp model

Have you tried enabling Apache Arrow in your job? This may improve memory utilization for your job. You can do that by adding this snippet to the top of your script or setting it as part of the Spark config for your job.

# Enable Arrow-based columnar data transfers

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

See the docs here: https://docs.databricks.com/spark/latest/spark-sql/spark-pandas.html#optimizing-conversion-between-s...

shibirajar
New Contributor II

We are still facing this issue. Any solution ???

Hello, did you ever find a resolution to this issue?

TheodoreVadpey
New Contributor III

I'm having this error too. After months of the model working, I tweaked the data and now I get this "RPC client disconnected probably due to containers exceeding thresholds, bla bla" issue

fisheep
New Contributor II

I got the same error when I save my dataframe to S3,

Although other DataFrame can save successfully.

I found a method to avoid the problem in my case.

.

Define a function to save the DataFrame to hdfs first,

and then use the saved parquet file create a new DataFrame.

After this the new DataFrame can save to S3 successfully.

.

def save_to_hdfs_first(id, df_save):

  df_save.write.mode('overwrite').parquet('/tmp/' + id + '.parquet')

  df_new = spark.read.parquet('/tmp/' + id + '.parquet')

  return df_new

.

I don't know if the memory or the partition problem,

But this method can indeed solved my problem.

We did this same "chopping off the DAG" approach some time ago amid S3 writing problems, and apparently I need to revive it again. Have you tried .checkpoint() instead?

SatyaD
New Contributor II

I ran into the same exception using the Data Frame and changing the cluster configuration didn't do any help. I tried some of the suggestions from the above but those didn't either.

The only way I could get around this use was to create a temporary view from the Data Frame and do a select with only a limited number of results on it. After this I was able to use the entire temporary view without any issues. If I don't do a LIMIT on the number of results I ran into the same issue again even the with the view. Hope this helps someone.