Apache Spark is regarded as a powerful complement to Hadoop, big data's original technology. Spark offers a more accessible, powerful, and versatile solution for tackling diverse big data challenges and is the most in-demand big data framework across major industries. Since Hadoop 2.0, Spark has been integrated into the Hadoop ecosystem, becoming one of the essential tools for Python Big Data Engineers.
This series of posts provides a comprehensive Spark architecture overview, ideal for those seeking to master Apache Spark.
Whole series:
- Hadoop and Yarn for Apache Spark Developer
- Spark Core Concepts Explained
- Anatomy of Apache Spark Application
- Spark Partitions
- Deep Dive into Spark Memory Management
- Explaining the mechanics of Spark caching
DataFrame and DataSet APIs are based on RDD so I will only be mentioning RDD in this post, but it can easily be replaced with Dataframe or Dataset.
Recap
Under the hood, Spark uses a Resilient Distributed Dataset (RDD) to store and transform data, which is a read-only collection of objects partitioned across multiple machines.
The Apache Spark application consists of two types of operations —transformations (such as map
or filter
) and actions (such as count
or take
). Transformations are used to create RDDs, and actions are used to obtain the actual results of transformations on the RDDs. When the action is complete, all RDDs used in transformations are discarded from working memory. Each RDD partition removed from memory would need to be rebuilt from the source (i.e. HDFS, Network, etc.), which is costly.
Caching
RDDs can sometimes be expensive to materialize. Even if they aren't, you don't want to do the same computations over and over again. To prevent that Apache Spark can cache RDDs in memory(or disk) and reuse them without performance overhead. In Spark, an RDD that is not cached and checkpointed will be executed every time an action is called.
In Apache Spark, there are two API calls for caching — cache()
and persist()
. The difference between them is that cache()
will save data in each individual node's RAM memory if there is space for it, otherwise, it will be stored on disk, while persist(level)
can save in memory, on disk, or out of cache in serialized or non-serialized format according to the caching strategy specified by level. cache()
is an alias for persist(StorageLevel.MEMORY_AND_DISK)
RDDs are lazily evaluated in Spark. Thus, RDD is not evaluated until an action is called and neither cache()
nor persist()
is an action.
Caching strategies
There are several levels of data persistence in Apache Spark:
MEMORY_ONLY
. Data is cached in memory in unserialized format only.MEMORY_AND_DISK
. Data is cached in memory. If memory is insufficient, the evicted blocks from memory are serialized to disk. This mode is recommended when re-evaluation is expensive and memory resources are limited.DISK_ONLY
. Data is cached on disk only in serialized format.OFF_HEAP
. Blocks are cached off-heap.
Once the RDD storage level is defined, it cannot be changed.
Serialization
The above caching strategies can also use serialization to store data in a serialized format. Serialization increases the cost of processing but reduces the amount of memory occupied by large data sets. This may save space by 2-4 times less but will incur an additional serialization/deserialization penalty. And because we store data as serialized arrays of bytes, fewer Java objects are created and hence less GC pressure.
These options add the "_SER" suffix to the above schemas - MEMORY_ONLY_SER
, MEMORY_AND_DISK_SER
.
It is worth noting that DISK_ONLY
and OFF_HEAP
always write data in serialized format.
Replication
Data can also be replicated to another node by adding a "_2" suffix to the storage level - MEMORY_ONLY_2
, MEMORY_AND_DISK_SER_2
. Replication is useful for speeding up recovery in the event of a single cluster node (or executor) failure. This allows fast partition recovery in the event of a node failure as follows RDDs can be rebuilt, for example, from a neighboring node data.
Example
>>> df = spark.read.parquet(data_path)
>>> df.cache() # Cache the data
>>> df.count() # Materialize the cache, command took 5.11 seconds
>>> df.is_cached # Determining whether a dataframe is cached
True
>>> df.count() # Now get it from the cache, command took 0.44 seconds
>>> df.storageLevel # Determing the persistent type (memory, deserialized, # replicas)
StorageLevel(True, True, False, True, 1)
>>> df.unpersist()
Several actions are performed on this Dataframe. The data is cached the first time the action is called. Further actions use the cached data. Without cache()
, each action would execute the entire RDD DAG, processing the intermediate steps to generate the data. In this example, caching speeds up execution by avoiding RDD re-evaluation.
The same can be done with a temporary table:
>>> df.createOrReplaceTempView('df')
>>> spark.catalog.cacheTable('df')
>>> spark.catalog.isCached(tableName='df')
True
>>> spark.catalog.uncacheTable('df')
>>> spark.catalog.clearCache() # Clear all the tables from cache
Caching behavior
We will look at the Spark source code.
CacheManager
persist
uses CacheManager for an in-memory cache of structured queries. CacheManager in turn updates the query plan by adding a new operator — InMemoryRelation
, which will carry information about this cache plan, and the cached plan itself is stored in cachedData
. InMemoryRelation
will be used at query execution time later when action will be called. This InMemoryRelation
is then used in the physical scheduling step to create a physical operator — InMemoryTableScan
.
The CacheManager phase is part of the logical planning and occurs after the analyzer and before the optimizer. If InMemoryRelation
coincides with the analyzed plan of the cached query, Spark can reuse it. If the data is not found in the caching layer Spark will become responsible for retrieving it and will use it immediately afterward.
It seems simple, why tell us anything?
It turns out that it's not all that obvious. For example, let's look at the following transformations:
>>> df = spark.read.parquet(data_path)
>>> df.select(col1, col2).filter(col2 > 0).cache()
>>> df.filter(col2 > 0).select(col1, col2)
For this query, you might think that the analyzed plans for the cached RDD and the next transformed result are identical because the filter
will be pushed by the optimizer in both cases. But again, the CacheManager phase happens BEFORE the optimizer and in this case, only the optimized plans will be the same, but not the analyzed plans. Therefore, the query will not use the cache simply because the analyzed plans are different.
Block-level caching
Under the hood, each RDD consists of several blocks (I wrote that in this post), and internally each block is cached independently of the others. Each executor in Spark has an associated BlockManager, which is used to cache the RDD blocks. The BlockManager's memory allocation for caching is specified by spark.memory.storageFraction
, which gives some of the memory from the memory pool allocated to the Spark engine itself (i.e. specified by spark.memory.fraction
). The caching itself is performed on the node that created the particular RDD block.
Because of the above, when you use cache()
or persist()
, the RDD is not fully cached until you call an action that looks at every entry (like count()
). If you use a take(1)
type action, only one partition will be cached because the Catalyst optimizer understands that you do not need to compute all partitions just to get one entry.
Block eviction
When the storage memory fills up, the Least Recent Used policy will be used to clear space for new blocks.
WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache:
Evicting cached table partition metadata from memory due to size constraints
(spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may
impact query planning performance.
It is worth noting that cached partitions can be evicted before they are actually reused. Depending on the caching strategy adopted, the evicted blocks can be cached on a disk.
Additional materials
- Spark: The Definitive Guide by Bill Chambers, Matei Zaharia
- Learning Spark by Jules S. Damji, Brooke Wenig, Tathagata Das, Denny Lee
- Mastering Apache Spark 2.0 by Jacek Laskowski