cancel
Showing results for 
Search instead for 
Did you mean: 

How do I avoid the "No space left on device" error where my disk is running out of space?

cfregly
Contributor
 
1 ACCEPTED SOLUTION

Accepted Solutions

cfregly
Contributor

This error indicates that the Worker's local disks are filling up.

Context of the Error

A Worker's local disk is used by Spark for the following:

  • Intermediate shuffle files
    • Contain the RDD's parent dependency data (lineage)
  • RDD persistence
    • StorageLevel.MEMORY_AND_DISK
    • StorageLevel.DISK_ONLY

You can inspect the amount of local disk space before and after your shuffle as follows (Scala):

import scala.sys.process._
val perNodeSpaceInGB = sc.parallelize(0 to 100).map { _ =>
val hostname = ("hostname".!!).trim
val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024
//System.gc()
(hostname, spaceInGB)
}.collect.distinct
println(f"There are ${perNodeSpaceInGB.size} nodes in this cluster. Per node free space (in GB):\n--------------------------------------")
perNodeSpaceInGB.foreach{case (a, b) => println(f"$a\t\t$b%2.2f")}
val totalSpaceInGB = perNodeSpaceInGB.map(_._2).sum
println(f"---------------------------

Causes of the Error

Intermediate shuffle files that contain an RDD's parent dependency data (lineage) hang around on the Workers in case the RDD needs to be recovered from its parents.

If the intermediate shuffle files are not removed quickly enough, they can cause the "No space left on device" error to occur on a Worker.

Here is an example that might lead to intermediate shuffle files not being cleaned up (Python):

# Define an RDD which creates some shuffles
myRdd = sc.textFile(...).groupByKey(...).map(...) 
myRdd.count()

When this is run, the local

myRdd
variable will prevent the removal of the intermediate shuffle files on the Workers.

Another, more subtle, example of a dangling RDD reference is this: consider a notebook cell with a single

unpersist
call:

myRdd.unpersist()

RDD.unpersist()
returns a reference to the RDD being unpersisted. The last value in a notebook cell is automatically assigned to an
Out[someNumber]
variable in the Python interpreter.

This subtle variable can keep the RDD alive and prevent the removal of intermediate shuffle files. This problem isn't specific to

unpersist()
, either: I think that any case where you have an RDD as the final element of a notebook cell may lead to a reference to the RDD that prevents the removal of intermediate shuffle files.

There might be a way to clear the

Out
variables to force them to be cleaned up, but I'm not sure offhand.

Consider using functions to limit the scope of RDD references.

Workaround 1: Explicitly Remove Intermediate Shuffle Files

This intermediate shuffle files on the Workers are removed from disk when the RDD is free'd and goes out of scope.

RDD.unpersist()
is one way for the RDD to go out of scope. Also, you can explicitly re-assign the RDD variable to
None
or
null
when you're done using them.

These mechanisms will flag the intermediate shuffle files for removal. (Note: this may not be desirable if you need to keep the RDD around for later processing.)

Upon GC, the Spark ContextCleaner will remove the flagged intermediate shuffle files on all Workers that contributed to the lineage of the RDD that was free'd.

In other words, a GC - which is usually meant to free up memory - is also used by Spark to free up the intermediate shuffle files on Workers via the ContextCleaner.

If the GC required by the intermediate shuffle file cleaner process is not happening fast enough on its own, you can explicitly call

System.gc()
in Scala or
sc._jvm.System.gc()
in Python to nudge the JVM into a GC and ultimately remove the intermediate shuffle files. While this technically isn't guaranteed to force a GC, it's proven effective for users in this situation.

Workaround 2: Use More Workers

Assuming even distribution of partitions, adding more Workers will - on average - reduce the disk space required for the intermediate shuffle files on each Worker.

Workaround 3: Checkpoint the RDD

Another solution, used by Spark Streaming in particular, is to periodically call

RDD.checkpoint()
. This saves the current immutable state of the RDD to S3, snips the RDD lineage, and allows the intermediate shuffle files to be removed.

This requires a prior call

sc.setCheckpointDir()
with something like
/checkpoints
. This will save the checkpoint data to DBFS/S3 in that location.

This is the best of both worlds: the RDD is still recoverable, but the intermediate shuffle files can be removed from the Workers.

Workaround 4: [Spark SQL Only] Increase Shuffle Partitions

If you're seeing this with Spark SQL HiveQL commands, you can try increasing the number of Spark SQL shuffle partitions as follows:

SET spark.sql.shuffle.partitions=400; 

View solution in original post

9 REPLIES 9

cfregly
Contributor

This error indicates that the Worker's local disks are filling up.

Context of the Error

A Worker's local disk is used by Spark for the following:

  • Intermediate shuffle files
    • Contain the RDD's parent dependency data (lineage)
  • RDD persistence
    • StorageLevel.MEMORY_AND_DISK
    • StorageLevel.DISK_ONLY

You can inspect the amount of local disk space before and after your shuffle as follows (Scala):

import scala.sys.process._
val perNodeSpaceInGB = sc.parallelize(0 to 100).map { _ =>
val hostname = ("hostname".!!).trim
val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024
//System.gc()
(hostname, spaceInGB)
}.collect.distinct
println(f"There are ${perNodeSpaceInGB.size} nodes in this cluster. Per node free space (in GB):\n--------------------------------------")
perNodeSpaceInGB.foreach{case (a, b) => println(f"$a\t\t$b%2.2f")}
val totalSpaceInGB = perNodeSpaceInGB.map(_._2).sum
println(f"---------------------------

Causes of the Error

Intermediate shuffle files that contain an RDD's parent dependency data (lineage) hang around on the Workers in case the RDD needs to be recovered from its parents.

If the intermediate shuffle files are not removed quickly enough, they can cause the "No space left on device" error to occur on a Worker.

Here is an example that might lead to intermediate shuffle files not being cleaned up (Python):

# Define an RDD which creates some shuffles
myRdd = sc.textFile(...).groupByKey(...).map(...) 
myRdd.count()

When this is run, the local

myRdd
variable will prevent the removal of the intermediate shuffle files on the Workers.

Another, more subtle, example of a dangling RDD reference is this: consider a notebook cell with a single

unpersist
call:

myRdd.unpersist()

RDD.unpersist()
returns a reference to the RDD being unpersisted. The last value in a notebook cell is automatically assigned to an
Out[someNumber]
variable in the Python interpreter.

This subtle variable can keep the RDD alive and prevent the removal of intermediate shuffle files. This problem isn't specific to

unpersist()
, either: I think that any case where you have an RDD as the final element of a notebook cell may lead to a reference to the RDD that prevents the removal of intermediate shuffle files.

There might be a way to clear the

Out
variables to force them to be cleaned up, but I'm not sure offhand.

Consider using functions to limit the scope of RDD references.

Workaround 1: Explicitly Remove Intermediate Shuffle Files

This intermediate shuffle files on the Workers are removed from disk when the RDD is free'd and goes out of scope.

RDD.unpersist()
is one way for the RDD to go out of scope. Also, you can explicitly re-assign the RDD variable to
None
or
null
when you're done using them.

These mechanisms will flag the intermediate shuffle files for removal. (Note: this may not be desirable if you need to keep the RDD around for later processing.)

Upon GC, the Spark ContextCleaner will remove the flagged intermediate shuffle files on all Workers that contributed to the lineage of the RDD that was free'd.

In other words, a GC - which is usually meant to free up memory - is also used by Spark to free up the intermediate shuffle files on Workers via the ContextCleaner.

If the GC required by the intermediate shuffle file cleaner process is not happening fast enough on its own, you can explicitly call

System.gc()
in Scala or
sc._jvm.System.gc()
in Python to nudge the JVM into a GC and ultimately remove the intermediate shuffle files. While this technically isn't guaranteed to force a GC, it's proven effective for users in this situation.

Workaround 2: Use More Workers

Assuming even distribution of partitions, adding more Workers will - on average - reduce the disk space required for the intermediate shuffle files on each Worker.

Workaround 3: Checkpoint the RDD

Another solution, used by Spark Streaming in particular, is to periodically call

RDD.checkpoint()
. This saves the current immutable state of the RDD to S3, snips the RDD lineage, and allows the intermediate shuffle files to be removed.

This requires a prior call

sc.setCheckpointDir()
with something like
/checkpoints
. This will save the checkpoint data to DBFS/S3 in that location.

This is the best of both worlds: the RDD is still recoverable, but the intermediate shuffle files can be removed from the Workers.

Workaround 4: [Spark SQL Only] Increase Shuffle Partitions

If you're seeing this with Spark SQL HiveQL commands, you can try increasing the number of Spark SQL shuffle partitions as follows:

SET spark.sql.shuffle.partitions=400; 

Wanglei
New Contributor II

It is helpful. So do you mean the suggested way in Python is like the following?

t = myRdd.unpersist()

t = None

Besides, could you point me any doc about Python Out variable, I am new in Python and did not find that.

any recommendation on how to work around this issue when using Spark SQL only?

we already SET spark.sql.shuffle.partitions=XXX; to a couple of different values but it still keeps failing. Also, the cluster size / number of workers should be more than sufficient

could the enable_elastic_disc setting on the cluster (https://docs.azuredatabricks.net/api/latest/clusters.html) help on this

regards,

-gerhard

Capemo
New Contributor II

Had to update this line

val spaceInGB = ("df /local_disk".!!).split(" +")(9).toInt / 1024 / 1024

to

val spaceInGB = ("df /local_disk0".!!).split(" +")(9).toInt / 1024 / 1024

In Databricks 7.3.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.