cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to clear cache using a pyspark session

maartenvr
New Contributor III

Hi all,

I am using a persist call on a spark dataframe inside an application to speed-up computations. The dataframe is used throughout my application and at the end of the application I am trying to clear the cache of the whole spark session by calling clear cache on the spark session. However, I am unable to clear the cache.

So something along these lines happens:

# Python Code
from pyspark.sql import SparkSession
spark = SparkSession.builder.config().getOrCreate()
 
# Just an Example 
df = spark.read.csv("example.csv")
df.cache()
 
# Clearing Cache
spark.catalog.clearCache()

The clearCache command doesn't do anything and the cache is still visible in the spark UI. (databricks -> SparkUI -> Storage.)

The following command also doesn't show any persistent RDD's, while in reality the storage in the UI shows multiple cached RDD's.

# Python Code
from pyspark.sql import SQLContext
spark_context = spark._sc
sql_context = SQLContext(spark_context)
spark._jsc.getPersistentRDDs()
 
# Results in:
{}

What is the correct way to clear the cache of the spark session / spark cluster?

Specs: I am on Databrick runtime 10.4 LST and coherently I am using the databricks-connect==10.4.18.

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

clearCache@Maarten van Raaij​ :

Reason for calling unpersist() after clearCache() ->

When you call spark.catalog.clearCache(), it clears the cache of all cached tables and DataFrames in Spark. However, it's important to note that the clearCache()

method only removes the metadata associated with the cached tables and DataFrames, and not the actual cached data itself. The actual cached data remains in memory until it is either evicted due to memory pressure or until it is explicitly unpersisted using the unpersist() method.

View solution in original post

9 REPLIES 9

Anonymous
Not applicable

@Maarten van Raaij​ : Please try the below and experiment from the options:

  1. Can you please try using the command sparkContext().getOrCreate().getCache().clear() method. This method clears the cache of all RDDs (Resilient Distributed Datasets) and their associated metadata from the in-memory cache
  2. Asides, if the above doesnt work, it means that DataFrame is too large to fit into memory and has spilled to disk. As a solution, increase the amount of memory available to Spark, or optimize your code to reduce the size of the DataFrame
  3. Asides, if the above doesnt work, the DataFrame is being referenced by other DataFrames or objects that have not been unpersisted. As a solution, you will need to unpersist all references to the DataFrame before you can clear its cache

Anonymous
Not applicable

Hi @Maarten van Raaij​ 

Hope everything is going great.

Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark an answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you. 

Cheers!

maartenvr
New Contributor III

No solution yet:

Hi @Suteja Kanuri​ ,

Thank you for thinking along and replying!

Unfortunately, I have not found a solution yet.

  1. I am getting an error that there exists no ```.getCache()``` method on a spark context. Also note that I have tried to do something similar by using ```sql_context.clearCache()``` which didn't work properly either.
  2. This is not the case. All data is persistent in memory (according to the SparkUI)
  3. This might be the problem. The dataframe is used throughout my application to calculate other dataframes. Since the persisted dataframe is used through the application and in different scopes it is very difficult / cumbersome to unpersist all dataframes that are refering to the original persistend dataframe. That is why I am trying to clear the complete cache of the cluster.

Besides these points, I am also wondering why the cache is showing up in my SparkUI and is used when applying calculations on the data, but I can not get the persistent RDD's when I am using the sql_context. (last code block in the original post).

Are there any other ideas I could try?

Kind regards,

Maarten

Anonymous
Not applicable

@Maarten van Raaij​ :

  • About the error on ``getCache()`` -> The error message you are receiving suggests that the Spark context does not have a .getCache() method available. This may be because the method is deprecated . Instead, can you try and use the SparkSession.catalog.clearCache() method to clear the cached data.

Example:

from pyspark.sql import SparkSession
 
# create a Spark session
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()
 
# cache a DataFrame
df = spark.read.csv("data.csv")
df.cache()
 
# clear the cache
spark.catalog.clearCache()
 
# unpersist the DataFrame from memory
df.unpersist()

Note that the cache() method on the DataFrame is used to cache the data in memory. The unpersist() method is used to remove the data from memory after it is no longer needed.

  • Why cache must be showing up in sparkUI

It's possible that you are using the wrong Spark context to access the cached RDD. If you cache an RDD using the SparkContext object, you need to use the same object to retrieve the cached RDD later. Similarly, if you cache a DataFrame using the SparkSession object, you need to use the same object to retrieve the cached DataFrame later. If you are using the

sql_context object to access the cached RDD, it may not be able to find the cached RDD because it was cached using a different Spark context.

maartenvr
New Contributor III

Hi Suteja,

Thanks for the quick reply.

I have already tried the ```spark.catalog.clearCache()``` method but it doesn't work and was actually the reason for me posting the question.

In your code example, you are calling unperist on the dataframe after we have cleared the cache. Just for my information, why would we call unpersist on the dataframe if we have already cleared the cache of the session (assuming it would work).

For clarity, ```df.unpersist()``` does work, but this is cumbersome to implement in my application as the df is created in a local scope and is referred to by other scopes. I want to unpersist the Df only at the end of my application, where I do not have access to the Df variable anymore. Therefore I simply want to clear the cache of the whole spark cluster at the end of my application.

On the last part: I am calling the spark.catalog.clearCache() on the same spark session in which I persist my data. The spark context and sql context are also derived from that same sparksession.

Anonymous
Not applicable

clearCache@Maarten van Raaij​ :

Reason for calling unpersist() after clearCache() ->

When you call spark.catalog.clearCache(), it clears the cache of all cached tables and DataFrames in Spark. However, it's important to note that the clearCache()

method only removes the metadata associated with the cached tables and DataFrames, and not the actual cached data itself. The actual cached data remains in memory until it is either evicted due to memory pressure or until it is explicitly unpersisted using the unpersist() method.

Anonymous
Not applicable

@Maarten van Raaij​ :

Re-answering your 2nd question on why UI shows multiple cached RDD's.

Some reasons:

It's possible that the getPersistentRDDs() method is not returning any cached RDDs because the RDDs are cached using Storage Level MEMORY_AND_DISK, which means that they can be evicted from memory and written to disk if memory pressure becomes too high. In this case, getPersistentRDDs() will only show RDDs that are stored entirely in memory (Storage Level MEMORY_ONLY or MEMORY_ONLY_SER) and not RDDs that are stored in memory and/or on disk (Storage Level MEMORY_AND_DISK or MEMORY_AND_DISK_SER).

To see all the cached RDDs, including those stored on disk, use the code below

# Python Code
from pyspark import SparkContext
 
sc = SparkContext.getOrCreate()
rdd_storage_info = sc.getRDDStorageInfo()
 
for info in rdd_storage_info:
    print(info)

This will print out information about all the cached RDDs, including their ID, storage level, memory usage, and disk usage.

maartenvr
New Contributor III

That might clarify, since I do use the MEMORY_AND_DISK option.

The .getRDDStorageInfo() method is actually also not supperted for me, but I have enough info to continue now. Thanks for the help!

Anonymous
Not applicable

@Maarten van Raaij​ : Thats lovely! Can you help upvote the answer that helped you the most.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.