Apache Spark is a distributed computing engine. Its main feature is the ability to perform computations in memory. Hence, it is obvious that memory management plays a very important role in the whole system.
The concept of memory management is quite complex at its core. It is not related to the topic but I still remember the concepts of Pool and Arena in Python memory management. Memory management in Spark is probably even more confusing. That is why many engineers as well as me do not fully understand it. For better use of Spark and achieving high performance, however, a deep understanding of its memory management model is important.
Hell yeah, we gonna go into the Spark memory management!
There are few levels of memory management — Spark level, Yarn level, JVM level, and OS level. We will jump from one level to another and we won't discuss each of them directly, but it's worth understanding which levels exist so as not to get lost.
Let's start at a high level from the launching Spark application process.
When the Spark application is launched, the Spark cluster will start two processes — Driver and Executor.
The driver is a master process responsible for creating the Spark context, submission of Spark jobs, and translation of the whole Spark pipeline into computational units — tasks. It also coordinates task scheduling and orchestration on each Executor.
Driver memory management is not much different from the typical JVM process and therefore will not be discussed further.
The executor is responsible for performing specific computational tasks on the worker nodes and returning the results to the driver, as well as providing storage for RDDs. And its internal memory management is very interesting.
When submitting a Spark job in a cluster with Yarn, Yarn allocates Executor containers to perform the job on different nodes.
ResourceManager handles memory requests and allocates executor container up to maximum allocation size settled by
yarn.scheduler.maximum-allocation-mb configuration. Memory requests higher than the specified value will not take effect.
On a single node, it is done by NodeManager. NodeManager has an upper limit of resources available to it because it is limited by the resources of one node of the cluster. In Yarn it set up by
yarn.nodemanager.resource.memory-mb configuration. It is the amount of physical memory per NodeManager, in MB, which can be allocated for yarn containers.
One ExecutorContainer is just one JVM. And the entire ExecutorContainer memory area is divided into three sections:
This is the memory that should be known to most Spark developers. This is the memory size specified by
--executor-memory during submitting spark application or by setting
spark.executor.memory. It's the maximum JVM heap memory(Xmx). Objects here are bound by the garbage collector(GC). Further, I will call this part of memory Executor Memory.
Spark makes it possible to use off-heap storage for certain operations. And you can provide the size of the off-heap memory that will be used by your application.
This is a seatbelt for the Spark execution pipelines. It is used for the various internal Spark overheads.
Let's walk through each of them, and start with Executor Memory.
Dive into the heap
We will look at the Spark source code, specifically this part of it: org/apache/spark/memory.
Let's go deeper into the Executor Memory.
Spark provides an interface for memory management via
MemoryManager. It implements the policies for dividing the available memory across tasks and for allocating memory between storage and execution.
MemoryManager has two implementations —
StaticMemoryManager was used before 1.6 and still supported and can be configured with
spark.memory.useLegacyMode parameter. It is a legacy and will not be described further.
UnifiedMemoryManager is the default
MemoryManager in Spark since 1.6. And we going to dive into its high-level implementation.
As we have already mentioned, the amount of memory available to each executor is controlled by
spark.executor.memory configuration. To use and manage this part of memory more efficiently, Spark has logically and physically divided this part of the memory.
The bulk of the data living in Spark applications is physically grouped into blocks. They are transferable objects, used as inputs to Spark tasks, as returned as outputs, they also used as intermediate steps in the shuffle process, and to store temporary files.
The BlockManager is the key-value store for blocks in Spark. BlockManager works as a local cache that runs on every node of the Spark application, i.e. driver and executors. They can be stored on disk or in memory (on/off-heap), either locally or remotely, for some time. After that, they are evicted but we gonna discuss it in more detail later.
What is the basic idea behind blocks? Among other things, blocks help achieve more concurrency. For example, if a given block can be extracted from 4 different executor nodes, it would be faster to extract it from them in parallel.
The most boring part of the memory. Spark reserves this memory to store internal objects. It guarantees to reserve sufficient memory for the system even for small JVM heaps.
Reserved Memory is hardcoded and equal to 300 MB (value
RESERVED_SYSTEM_MEMORY_BYTES in source code). In the test environment (when
spark.testing set) we can modify it with
Storage Memory is used for caching and broadcasting data.
Storage Memory size can be found by:
Storage Memory = usableMemory * spark.memory.fraction * spark.memory.storageFraction
Storage Memory is 30% of all system memory by default
(1 * 0.6 * 0.5 = 0.3).
It is mainly used to store temporary data in the shuffle, join, sort, aggregation, etc.
Most likely, if your pipeline runs too long, the problem lies in the lack of space here.
Execution Memory = usableMemory * spark.memory.fraction * (1 - spark.memory.storageFraction)
As Storage Memory, Execution Memory is also equal to 30% of all system memory by default
(1 * 0.6 * (1 - 0.5) = 0.3).
In the implementation of UnifiedMemory, these two parts of memory can be borrowed from each other. The specific borrowing mechanism will be discussed in detail under the Dynamic occupancy mechanism section.
It is mainly used to store data needed for RDD conversion operations, such as lineage. You can store your own data structures there that will be used inside transformations. It's up to you what would be stored in this memory and how. Spark makes completely no accounting on what you do there and whether you respect this boundary or not.
User Memory = usableMemory * (1 - spark.memory.fraction)
1 * (1 - 0.6) = 0.4 — 40% of available memory by default.
Dynamic occupancy mechanism
Execution and Storage have a shared memory. They can borrow it from each other. This process is called the Dynamic occupancy mechanism.
There are two parts of the shared memory — the Storage side and the Execution side. The shared Storage memory can be used up to a certain threshold. In the code, this threshold is called
onHeapStorageRegionSize. This part of memory is used by Storage memory, but only if it is not occupied by Execution memory. Storage memory has to wait for the used memory to be released by the executor processes. The default size of
onHeapStorageRegionSize is all Storage Memory.
When Execution memory is not used, Storage can borrow as much Execution memory as available until execution reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the Execution memory request.
The creators of this mechanism decided that Execution memory has priority over Storage memory. They had reasons to do so — the execution of the task is more important than the cached data, the whole job can crash if there is an OOM in the execution.
If space on both sides is insufficient and within the appropriate boundaries, it is evicted according to their respective storage levels using the LRU algorithm.
The eviction process has its overhead. The cost of memory eviction depends on the storage level.
MEMORY_ONLY may be the most expensive because it has to be recalculated.
MEMORY_AND_DISK_SER is the opposite. On this storage level, the format of data stored at runtime is compact and the overhead for serialization is low and it only includes disk I/O.
Despite the fact that most operations are happening entirely in on-heap memory and use the mighty help of GC, Spark also makes it possible to use off-heap storage for certain operations.
This memory does not bound to GC but calls the Java API(
sun.misc.Unsafe) for unsafe operations such as C which uses
malloc() to use operating system memory. This way, Spark can directly operate the off-heap memory, reducing unnecessary memory overhead, frequent GC scanning, GC collection, and improving processing performance.
By knowing an application logic, direct memory handling can provide significant performance benefits but also requires careful management of these pieces of memory. This might not be desired or even possible in some deployment scenarios. It's a common practice to restrict unsafe operations in the Java security manager configuration.
If off-heap memory is enabled, Executor will have both on-heap and off-heap memory. They are complementary — the Execution memory is the sum of on-heap Execution memory and off-heap Execution memory. The same applies to the Storage memory.
Mostly, off-heap memory was done for Tungsten, to perform bookkeeping for memory that Tungsten may use.
Speaking of Tungsten..
One of the goals of the project Tungsten is to enhance memory management and binary processing. Tungsten uses custom Encoders/Decoders to represent JVM objects in a compact format to ensure high performance and low memory footprint.
Thus, even working in on-heap mode by default Tungsten tries to manage memory explicitly and eliminate the overhead of the JVM object model and garbage collection. Tungsten in this mode really does allocate objects on the heap for its internal purposes, and those pieces of allocated memory can be huge, but this happens much less frequently and withstands GC short-lived generation smoothly.
If Tungsten is configured to use off-heap execution memory for allocating data, then all data page allocations must fit within this off-heap size limit. These are things that need to be carefully designed to allocate memory outside the JVM process.
This can cause some difficulties in container managers when you need to allow and plan for additional pieces of memory besides the JVM process configuration.
Off-heap memory is disabled by default, but you can enable it with the
spark.memory.offHeap.enabled parameter and set the memory size with the
spark.memory.offHeap.size parameter. It has no effect on heap memory usage but make sure not to exceed your executor's total limits.
Compared to on-heap memory, the off-heap memory model is relatively simple. It only includes Storage memory and Execution memory. Both Storage memory and Execution memory account for about 50% of all system memory by default. In UnifiedMemory management, these two parts can be borrowed from each other as we described earlier.
When allocating ExecutorContainer in cluster mode, additional memory is also allocated for things like VM overheads, interned strings, other native overheads, etc. This memory is set using
spark.executor.memoryOverhead configuration (or deprecated
spark.yarn.executor.memoryOverhead). The default size is 10% of Executor memory with a minimum of 384 MB.
This additional memory includes memory for PySpark executors when the
spark.executor.pyspark.memory is not configured and memory used by other non-executable processes running in the same container.
With Spark 3.0 this memory does not include off-heap memory.
The overall memory is calculated using the following formula:
val totalMemMiB = (executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
Spark tasks never directly interact with the
TaskMemoryManager is used to manage the memory of individual tasks — acquire memory, release memory, and calculate memory allocation requested from the heap or outside the heap.
Without going into details of the implementation of this class, let us describe one of the problems it solves.
Tasks in Executor are executed in threads, and each thread shares JVM resources, i.e. Execution memory. There is no strong isolation of memory resources between tasks because
TaskMemoryManager shares the memory managed by the
MemoryManager. Therefore, it is possible that the task that came first may take up a lot of memory, and the next task may hang due to lack of memory.
TaskMemoryManager via memory pools limit memory that can be allocated to each task to range from
1 / (2 * n) to
1 / n, where
n is the number of tasks that are currently running. Therefore, more tasks running concurrently — less memory available to each of them.
Despite a good performance by default, you can customize Spark to your specific use case. Spark has a lot of configuration parameters that can be changed and thus customize the processing of any data on any cluster more efficiently.
For Spark, efficient memory usage is critical for good performance and Spark has its own internal model of memory management that is able to cope with its work with the default configuration.
Simply enabling dynamic resource allocation is not sufficient. Data engineers also have to understand how executor memory is laid out and used by Spark so that executors are not starved of memory or troubled by JVM garbage collection. To use Spark at its full potential, try tuning your spark configuration with an automatic tool I made for you — Spark configuration optimizer.
Spark 2.x introduced the second-generation Tungsten engine, featuring whole-stage code generation and vectorized column-based memory layout. Built on ideas and techniques from modern compilers, this new version is also capitalized on modern CPUs and cache architectures for fast parallel data access.