DataFrame and DataSet APIs are based on RDD so I will only be mentioning RDD in this post, but it can easily be replaced with Dataframe or Dataset.
Under the hood, Spark uses a Resilient Distributed Dataset (RDD) to store and transform data, which is a read-only collection of objects partitioned across multiple machines.
The Apache Spark application consists of two types of operations —transformations (such as
filter) and actions (such as
take). Transformations are used to create RDDs, and actions are used to obtain the actual results of transformations on the RDDs. When the action is complete, all RDDs used in transformations are discarded from working memory. Each RDD partition removed from memory would need to be rebuilt from the source (i.e. HDFS, Network, etc.), which is costly.
RDDs can sometimes be expensive to materialize. Even if they aren't, you don't want to do the same computations over and over again. To prevent that Apache Spark can cache RDDs in memory(or disk) and reuse them without performance overhead. In Spark, an RDD that is not cached and checkpointed will be executed every time an action is called.
In Apache Spark, there are two API calls for caching —
persist(). The difference between them is that
cache() will save data in each individual node's RAM memory if there is space for it, otherwise, it will be stored on disk, while
persist(level) can save in memory, on disk, or out of cache in serialized or non-serialized format according to the caching strategy specified by level.
cache() is an alias for
RDDs are lazily evaluated in Spark. Thus, RDD is not evaluated until an action is called and neither
persist() is an action.
There are several levels of data persistence in Apache Spark:
MEMORY_ONLY. Data is cached in memory in unserialized format only.
MEMORY_AND_DISK. Data is cached in memory. If memory is insufficient, the evicted blocks from memory are serialized to disk. This mode is recommended when re-evaluation is expensive and memory resources are limited.
DISK_ONLY. Data is cached on disk only in serialized format.
OFF_HEAP. Blocks are cached off-heap.
Once the RDD storage level is defined, it cannot be changed.
The above caching strategies can also use serialization to store data in a serialized format. Serialization increases the cost of processing but reduces the amount of memory occupied by large data sets. This may save space by 2-4 times less but will incur an additional serialization/deserialization penalty. And because we store data as serialized arrays of bytes, fewer Java objects are created and hence less GC pressure.
These options add the "_SER" suffix to the above schemas -
It is worth noting that
OFF_HEAP always write data in serialized format.
Data can also be replicated to another node by adding a "_2" suffix to the storage level -
MEMORY_AND_DISK_SER_2. Replication is useful for speeding up recovery in the event of a single cluster node (or executor) failure. This allows fast partition recovery in the event of a node failure as follows RDDs can be rebuilt, for example, from a neighboring node data.
>>> df = spark.read.parquet(data_path) >>> df.cache() # Cache the data >>> df.count() # Materialize the cache, command took 5.11 seconds >>> df.is_cached # Determining whether a dataframe is cached True >>> df.count() # Now get it from the cache, command took 0.44 seconds >>> df.storageLevel # Determing the persistent type (memory, deserialized, # replicas) StorageLevel(True, True, False, True, 1) >>> df.unpersist()
Several actions are performed on this Dataframe. The data is cached the first time the action is called. Further actions use the cached data. Without
cache(), each action would execute the entire RDD DAG, processing the intermediate steps to generate the data. In this example, caching speeds up execution by avoiding RDD re-evaluation.
The same can be done with temporary table:
>>> df.createOrReplaceTempView('df') >>> spark.catalog.cacheTable('df') >>> spark.catalog.isCached(tableName='df') True >>> spark.catalog.uncacheTable('df') >>> spark.catalog.clearCache() # Clear all the tables from cache
We will look at the Spark source code.
persist uses CacheManager for an in-memory cache of structured queries. CacheManager in turn updates the query plan by adding a new operator —
InMemoryRelation, which will carry information about this cache plan, and the cached plan itself is stored in
InMemoryRelation will be used at query execution time later when action will be called. This
InMemoryRelation is then used in the physical scheduling step to create a physical operator —
The CacheManager phase is part of the logical planning and occurs after the analyzer and before the optimizer. If
InMemoryRelation coincides with the analyzed plan of the cached query, Spark can reuse it. If the data is not found in the caching layer Spark will become responsible for retrieving it and will use it immediately afterward.
It seems simple, why tell us anything?
It turns out that it's not all that obvious. For example, let's look at the following transformations:
>>> df = spark.read.parquet(data_path) >>> df.select(col1, col2).filter(col2 > 0).cache() >>> df.filter(col2 > 0).select(col1, col2)
For this query, you might think that the analyzed plans for the cached RDD and the next transformed result are identical because the
filter will be pushed by the optimizer in both cases. But again, the CacheManager phase happens BEFORE the optimizer and in this case, only the optimized plans will be the same, but not the analyzed plans. Therefore, the query will not use the cache simply because the analyzed plans are different.
Under the hood, each RDD consists of several blocks (I wrote that in this post), and internally each block is cached independently of the others. Each executor in Spark has an associated BlockManager, which is used to cache the RDD blocks. The BlockManager's memory allocation for caching is specified by
spark.memory.storageFraction, which gives some of the memory from the memory pool allocated to the Spark engine itself (i.e. specified by
spark.memory.fraction). The caching itself is performed on the node that created the particular RDD block.
Because of the above, when you use
persist(), the RDD is not fully cached until you call an action that looks at every entry (like
count()). If you use a
take(1) type action, only one partition will be cached because the Catalyst optimizer understands that you do not need to compute all partitions just to get one entry.
When the storage memory fills up, the Least Recent Used policy will be used to clear space for new blocks.
WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
It is worth noting that cached partitions can be evicted before they are actually reused. Depending on the caching strategy adopted, the evicted blocks can be cached on disk.