DataFrame and DataSet APIs are based on RDD so I will only be mentioning RDD in this post, but it can easily be replaced with Dataframe or Dataset.

Recap

CacheManager

Under the hood, Spark uses a Resilient Distributed Dataset (RDD) to store and transform data, which is a read-only collection of objects partitioned across multiple machines.

The Apache Spark application consists of two types of operations —transformations (such as map or filter) and actions (such as count or take). Transformations are used to create RDDs, and actions are used to obtain the actual results of transformations on the RDDs. When the action is complete, all RDDs used in transformations are discarded from working memory. Each RDD partition removed from memory would need to be rebuilt from the source (i.e. HDFS, Network, etc.), which is costly.

Caching

CacheManager

RDDs can sometimes be expensive to materialize. Even if they aren't, you don't want to do the same computations over and over again. To prevent that Apache Spark can cache RDDs in memory(or disk) and reuse them without performance overhead. In Spark, an RDD that is not cached and checkpointed will be executed every time an action is called.

In Apache Spark, there are two API calls for caching — cache() and persist(). The difference between them is that cache() will save data in each individual node's RAM memory if there is space for it, otherwise, it will be stored on disk, while persist(level) can save in memory, on disk, or out of cache in serialized or non-serialized format according to the caching strategy specified by level. cache() is an alias for persist(StorageLevel.MEMORY_AND_DISK)

RDDs are lazily evaluated in Spark. Thus, RDD is not evaluated until an action is called and neither cache() nor persist() is an action.

Caching strategies

There are several levels of data persistence in Apache Spark:

  • MEMORY_ONLY. Data is cached in memory in unserialized format only.
  • MEMORY_AND_DISK. Data is cached in memory. If memory is insufficient, the evicted blocks from memory are serialized to disk. This mode is recommended when re-evaluation is expensive and memory resources are limited.
  • DISK_ONLY. Data is cached on disk only in serialized format.
  • OFF_HEAP. Blocks are cached off-heap.

Once the RDD storage level is defined, it cannot be changed.

Serialization

The above caching strategies can also use serialization to store data in a serialized format. Serialization increases the cost of processing but reduces the amount of memory occupied by large data sets. This may save space by 2-4 times less but will incur an additional serialization/deserialization penalty. And because we store data as serialized arrays of bytes, fewer Java objects are created and hence less GC pressure.

These options add the "_SER" suffix to the above schemas - MEMORY_ONLY_SER, MEMORY_AND_DISK_SER.

It is worth noting that DISK_ONLY and OFF_HEAP always write data in serialized format.

Replication

Data can also be replicated to another node by adding a "_2" suffix to the storage level - MEMORY_ONLY_2, MEMORY_AND_DISK_SER_2. Replication is useful for speeding up recovery in the event of a single cluster node (or executor) failure. This allows fast partition recovery in the event of a node failure as follows RDDs can be rebuilt, for example, from a neighboring node data.

Example

>>> df = spark.read.parquet(data_path)
>>> df.cache() # Cache the data
>>> df.count() # Materialize the cache, command took 5.11 seconds
>>> df.is_cached # Determining whether a dataframe is cached
True
>>> df.count() # Now get it from the cache, command took 0.44 seconds
>>> df.storageLevel # Determing the persistent type (memory, deserialized, # replicas)
StorageLevel(True, True, False, True, 1) 
>>> df.unpersist()

Several actions are performed on this Dataframe. The data is cached the first time the action is called. Further actions use the cached data. Without cache(), each action would execute the entire RDD DAG, processing the intermediate steps to generate the data. In this example, caching speeds up execution by avoiding RDD re-evaluation.

The same can be done with temporary table:

>>> df.createOrReplaceTempView('df')
>>> spark.catalog.cacheTable('df')
>>> spark.catalog.isCached(tableName='df')
True
>>> spark.catalog.uncacheTable('df')
>>> spark.catalog.clearCache() # Clear all the tables from cache

Caching behavior

We will look at the Spark source code.

CacheManager

CacheManager

persist uses CacheManager for an in-memory cache of structured queries. CacheManager in turn updates the query plan by adding a new operator — InMemoryRelation, which will carry information about this cache plan, and the cached plan itself is stored in cachedData. InMemoryRelation will be used at query execution time later when action will be called. This InMemoryRelation is then used in the physical scheduling step to create a physical operator — InMemoryTableScan.

The CacheManager phase is part of the logical planning and occurs after the analyzer and before the optimizer. If InMemoryRelation coincides with the analyzed plan of the cached query, Spark can reuse it. If the data is not found in the caching layer Spark will become responsible for retrieving it and will use it immediately afterward.

It seems simple, why tell us anything?

It turns out that it's not all that obvious. For example, let's look at the following transformations:

>>> df = spark.read.parquet(data_path)
>>> df.select(col1, col2).filter(col2 > 0).cache()
>>> df.filter(col2 > 0).select(col1, col2)

For this query, you might think that the analyzed plans for the cached RDD and the next transformed result are identical because the filter will be pushed by the optimizer in both cases. But again, the CacheManager phase happens BEFORE the optimizer and in this case, only the optimized plans will be the same, but not the analyzed plans. Therefore, the query will not use the cache simply because the analyzed plans are different.

Block-level caching

Block-level caching

Under the hood, each RDD consists of several blocks (I wrote that in this post), and internally each block is cached independently of the others. Each executor in Spark has an associated BlockManager, which is used to cache the RDD blocks. The BlockManager's memory allocation for caching is specified by spark.memory.storageFraction, which gives some of the memory from the memory pool allocated to the Spark engine itself (i.e. specified by spark.memory.fraction). The caching itself is performed on the node that created the particular RDD block.

Because of the above, when you use cache() or persist(), the RDD is not fully cached until you call an action that looks at every entry (like count()). If you use a take(1) type action, only one partition will be cached because the Catalyst optimizer understands that you do not need to compute all partitions just to get one entry.

Block eviction

When the storage memory fills up, the Least Recent Used policy will be used to clear space for new blocks.

WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: 
Evicting cached table partition metadata from memory due to size constraints 
(spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may 
impact query planning performance.

It is worth noting that cached partitions can be evicted before they are actually reused. Depending on the caching strategy adopted, the evicted blocks can be cached on a disk.

Recommended books

Last updated Tue May 11 2021