cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Should I always cache my RDD's and DataFrames?

cfregly
Contributor
 
6 REPLIES 6

cfregly
Contributor

You should definitely

cache()
RDD's and DataFrames in the following cases:

  • Reusing them in an iterative loop (ie. ML algos)
  • Reuse the RDD multiple times in a single application, job, or notebook.
  • When the upfront cost to regenerate the RDD partitions is costly (ie. HDFS, after a complex set of
    map()
    ,
    filter()
    , etc.) This helps in the recovery process if a Worker node dies.

Keep in mind that Spark will automatically evict RDD partitions from Workers in an LRU manner. The LRU eviction happens independently on each Worker and depends on the available memory in the Worker.

During the lifecycle of an RDD, RDD partitions may exist in memory or on disk across the cluster depending on available memory.

The Storage tab on the Spark UI shows where partitions exist (memory or disk) across the cluster at any given point in time.

Note that

cache()
is an alias for
persist(StorageLevel.MEMORY_ONLY)
which may not be ideal for datasets larger than available cluster memory. Each RDD partition that is evicted out of memory will need to be rebuilt from source (ie. HDFS, Network, etc) which is expensive.

A better solution would be to use

persist(StorageLevel.MEMORY_AND_DISK_ONLY)
which will spill the RDD partitions to the Worker's local disk if they're evicted from memory. In this case, rebuilding a partition only requires pulling data from the Worker's local disk which is relatively fast.

You also have the choice of persisting the data as a serialized byte array by appending

_SER
as follows:
MEMORY_SER
and
MEMORY_AND_DISK_SER
. This can save space, but incurs an extra serialization/deserialization penalty. And because we're storing data as a serialized byte arrays, less Java objects are created and therefore GC pressure is reduced.

You can also choose to replicate the data to another node by append

_2
to the StorageLevel (either serialized or not serialized) as follows:
MEMORY_SER_2
and
MEMORY_AND_DISK_2
. This enables fast partition recovery in the case of a node failure as data can be rebuilt from a rack-local, neighboring node through the same network switch, for example.

You can see the full list here: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.StorageLevel$

Note that

cache()
is now an alias for
persist(StorageLevel.MEMORY_AND_DISK)
according to the docs.

Hello Mefryar,

I still see cache is an alias of persist(StorageLevel.MEMORY_ONLY). Attached doc links.

Official doc

Official Pyspark doc

Hi, @Sivagangireddy Singamโ€‹ Singam. I see that the RDD programming guide does say that the default storage level is

MEMORY_ONLY

, but the latest PySpark docs (2.4.4) state "The default storage level has changed to

MEMORY_AND_DISK

." (The PySpark docs you linked to were 2.1.2.)

ScottKinman
New Contributor II

Thanks for this clarification on deserialization penalty. I always wanted to know when this penalty is imposed.

https://domyhomeworkonline.net/do-my-summer-homework.php

ThomasDecaux
New Contributor II

Hello,

What is most efficient between RDD and DataFrame ? (I mean better to cache, consume less memory)

Thanks you,

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.