โ02-24-2015 03:51 PM
โ02-24-2015 04:01 PM
This error indicates that the Worker's local disks are filling up.
Context of the Error
A Worker's local disk is used by Spark for the following:
You can inspect the amount of local disk space before and after your shuffle as follows (Scala):
import scala.sys.process._
val perNodeSpaceInGB = sc.parallelize(0 to 100).map { _ =>
val hostname = ("hostname".!!).trim
val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024
//System.gc()
(hostname, spaceInGB)
}.collect.distinct
println(f"There are ${perNodeSpaceInGB.size} nodes in this cluster. Per node free space (in GB):\n--------------------------------------")
perNodeSpaceInGB.foreach{case (a, b) => println(f"$a\t\t$b%2.2f")}
val totalSpaceInGB = perNodeSpaceInGB.map(_._2).sum
println(f"---------------------------
Causes of the Error
Intermediate shuffle files that contain an RDD's parent dependency data (lineage) hang around on the Workers in case the RDD needs to be recovered from its parents.
If the intermediate shuffle files are not removed quickly enough, they can cause the "No space left on device" error to occur on a Worker.
Here is an example that might lead to intermediate shuffle files not being cleaned up (Python):
# Define an RDD which creates some shuffles
myRdd = sc.textFile(...).groupByKey(...).map(...)
myRdd.count()
When this is run, the local
myRdd
variable will prevent the removal of the intermediate shuffle files on the Workers.Another, more subtle, example of a dangling RDD reference is this: consider a notebook cell with a single
unpersist
call:myRdd.unpersist()
RDD.unpersist()
returns a reference to the RDD being unpersisted. The last value in a notebook cell is automatically assigned to an Out[someNumber]
variable in the Python interpreter.This subtle variable can keep the RDD alive and prevent the removal of intermediate shuffle files. This problem isn't specific to
unpersist()
, either: I think that any case where you have an RDD as the final element of a notebook cell may lead to a reference to the RDD that prevents the removal of intermediate shuffle files.There might be a way to clear the
Out
variables to force them to be cleaned up, but I'm not sure offhand.Consider using functions to limit the scope of RDD references.
Workaround 1: Explicitly Remove Intermediate Shuffle Files
This intermediate shuffle files on the Workers are removed from disk when the RDD is free'd and goes out of scope.
RDD.unpersist()
is one way for the RDD to go out of scope. Also, you can explicitly re-assign the RDD variable to None
or null
when you're done using them. These mechanisms will flag the intermediate shuffle files for removal. (Note: this may not be desirable if you need to keep the RDD around for later processing.)
Upon GC, the Spark ContextCleaner will remove the flagged intermediate shuffle files on all Workers that contributed to the lineage of the RDD that was free'd.
In other words, a GC - which is usually meant to free up memory - is also used by Spark to free up the intermediate shuffle files on Workers via the ContextCleaner.
If the GC required by the intermediate shuffle file cleaner process is not happening fast enough on its own, you can explicitly call
System.gc()
in Scala or sc._jvm.System.gc()
in Python to nudge the JVM into a GC and ultimately remove the intermediate shuffle files. While this technically isn't guaranteed to force a GC, it's proven effective for users in this situation.Workaround 2: Use More Workers
Assuming even distribution of partitions, adding more Workers will - on average - reduce the disk space required for the intermediate shuffle files on each Worker.
Workaround 3: Checkpoint the RDD
Another solution, used by Spark Streaming in particular, is to periodically call
RDD.checkpoint()
. This saves the current immutable state of the RDD to S3, snips the RDD lineage, and allows the intermediate shuffle files to be removed.This requires a prior call
sc.setCheckpointDir()
with something like /checkpoints
. This will save the checkpoint data to DBFS/S3 in that location.This is the best of both worlds: the RDD is still recoverable, but the intermediate shuffle files can be removed from the Workers.
Workaround 4: [Spark SQL Only] Increase Shuffle Partitions
If you're seeing this with Spark SQL HiveQL commands, you can try increasing the number of Spark SQL shuffle partitions as follows:
SET spark.sql.shuffle.partitions=400;
โ02-24-2015 04:01 PM
This error indicates that the Worker's local disks are filling up.
Context of the Error
A Worker's local disk is used by Spark for the following:
You can inspect the amount of local disk space before and after your shuffle as follows (Scala):
import scala.sys.process._
val perNodeSpaceInGB = sc.parallelize(0 to 100).map { _ =>
val hostname = ("hostname".!!).trim
val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024
//System.gc()
(hostname, spaceInGB)
}.collect.distinct
println(f"There are ${perNodeSpaceInGB.size} nodes in this cluster. Per node free space (in GB):\n--------------------------------------")
perNodeSpaceInGB.foreach{case (a, b) => println(f"$a\t\t$b%2.2f")}
val totalSpaceInGB = perNodeSpaceInGB.map(_._2).sum
println(f"---------------------------
Causes of the Error
Intermediate shuffle files that contain an RDD's parent dependency data (lineage) hang around on the Workers in case the RDD needs to be recovered from its parents.
If the intermediate shuffle files are not removed quickly enough, they can cause the "No space left on device" error to occur on a Worker.
Here is an example that might lead to intermediate shuffle files not being cleaned up (Python):
# Define an RDD which creates some shuffles
myRdd = sc.textFile(...).groupByKey(...).map(...)
myRdd.count()
When this is run, the local
myRdd
variable will prevent the removal of the intermediate shuffle files on the Workers.Another, more subtle, example of a dangling RDD reference is this: consider a notebook cell with a single
unpersist
call:myRdd.unpersist()
RDD.unpersist()
returns a reference to the RDD being unpersisted. The last value in a notebook cell is automatically assigned to an Out[someNumber]
variable in the Python interpreter.This subtle variable can keep the RDD alive and prevent the removal of intermediate shuffle files. This problem isn't specific to
unpersist()
, either: I think that any case where you have an RDD as the final element of a notebook cell may lead to a reference to the RDD that prevents the removal of intermediate shuffle files.There might be a way to clear the
Out
variables to force them to be cleaned up, but I'm not sure offhand.Consider using functions to limit the scope of RDD references.
Workaround 1: Explicitly Remove Intermediate Shuffle Files
This intermediate shuffle files on the Workers are removed from disk when the RDD is free'd and goes out of scope.
RDD.unpersist()
is one way for the RDD to go out of scope. Also, you can explicitly re-assign the RDD variable to None
or null
when you're done using them. These mechanisms will flag the intermediate shuffle files for removal. (Note: this may not be desirable if you need to keep the RDD around for later processing.)
Upon GC, the Spark ContextCleaner will remove the flagged intermediate shuffle files on all Workers that contributed to the lineage of the RDD that was free'd.
In other words, a GC - which is usually meant to free up memory - is also used by Spark to free up the intermediate shuffle files on Workers via the ContextCleaner.
If the GC required by the intermediate shuffle file cleaner process is not happening fast enough on its own, you can explicitly call
System.gc()
in Scala or sc._jvm.System.gc()
in Python to nudge the JVM into a GC and ultimately remove the intermediate shuffle files. While this technically isn't guaranteed to force a GC, it's proven effective for users in this situation.Workaround 2: Use More Workers
Assuming even distribution of partitions, adding more Workers will - on average - reduce the disk space required for the intermediate shuffle files on each Worker.
Workaround 3: Checkpoint the RDD
Another solution, used by Spark Streaming in particular, is to periodically call
RDD.checkpoint()
. This saves the current immutable state of the RDD to S3, snips the RDD lineage, and allows the intermediate shuffle files to be removed.This requires a prior call
sc.setCheckpointDir()
with something like /checkpoints
. This will save the checkpoint data to DBFS/S3 in that location.This is the best of both worlds: the RDD is still recoverable, but the intermediate shuffle files can be removed from the Workers.
Workaround 4: [Spark SQL Only] Increase Shuffle Partitions
If you're seeing this with Spark SQL HiveQL commands, you can try increasing the number of Spark SQL shuffle partitions as follows:
SET spark.sql.shuffle.partitions=400;
โ11-25-2016 11:40 PM
It is helpful. So do you mean the suggested way in Python is like the following?
t = myRdd.unpersist()
t = None
Besides, could you point me any doc about Python Out variable, I am new in Python and did not find that.
โ06-11-2019 12:27 AM
any recommendation on how to work around this issue when using Spark SQL only?
we already SET spark.sql.shuffle.partitions=XXX; to a couple of different values but it still keeps failing. Also, the cluster size / number of workers should be more than sufficientcould the enable_elastic_disc setting on the cluster (https://docs.azuredatabricks.net/api/latest/clusters.html) help on this
regards,
-gerhardโ11-30-2020 03:03 PM
Had to update this line
val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024
to
val spaceInGB = ("df /local_disk0".!!).split(" +")(9).toInt / 1024 / 1024
In Databricks 7.3.
โ04-25-2016 12:26 AM
@cfregly
Great post!. I am getting the above error with spark-sql hiveql commands. Can you please explain how increasing "spark.sql.shuffle.partitions" property helps? What else can be done to avoid the space issue in spark sql?
Thank You.
โ07-26-2017 12:31 AM
Yeah, not sure how that really helps in this case. Any explanation?
โ08-08-2017 08:04 AM
The 1st Workaround (Explicitly Remove Intermediate Shuffle Files) worked for me. Thank you.
โ09-07-2017 01:48 AM
This is a generic problem.
Cheap solution is to increase number of shuffle partitions (in case loads are skewed) or restart the cluster.
Safe solution is to increase cluster size or node sizes (SSD, RAM,โฆ)
Eventually, you have to make sure that you have efficient codes. You read and write (do not keep things in memory, but instead process like a streaming pipeline from source to sink). Things like repartition can break this efficiency.
Also make sure that you are not overwriting a cached variable. For example below code is wrong:
df=โฆcache()
df=df.withColumn(โฆ..).cache()
Instead put an unpersist between both lines. Otherwise there is an orphan reference to a cached data.
โ07-17-2019 04:05 AM
I have 8 GB of internal memory, but several MB of them are free but I also have an additional memory with an 8 GB memory card. Anyway, there is no enough space and the memory card is completely empty.
essay service
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group