There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.
I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency using Apache Spark. This is my updated collection.
Many of the optimizations that I will describe will not affect the JVM languages so much, but without these methods, many Python applications may simply not work.
- Spark tips. DataFrame API
- Spark Tips. Don't collect data on driver
- Spark Tips. Partition Tuning
- Spark tips. Caching
DataFrame and DataSet APIs are based on RDD so I will only be mentioning RDD in this post, but it can easily be replaced with Dataframe or Dataset.
Caching, as trivial as it may seem, is a difficult task for engineers.
Apache Spark relies on engineers to execute caching decisions. Engineers need to be clear about what RDDs should be cached, when, where, and how RDDs should be cached and when they should be removed from the cache.
This is becoming a bit more complicated with the lazy nature of Apache Spark. The evaluation of the DAG in Spark is happening only when an action is called, and only then the transformations on which the action depends are evaluated, and the intermediate RDDs get materialized. When DAG evaluation is completed, these intermediate RDDs will be cleaned up by the Spark engine. If some of the intermediate RDDs will be re-used by the following actions, it will be recomputed once again, which degrades overall pipeline performance. To improve the performance, engineers should cache these RDDs which can be reused multiple times, Spark cannot do that itself right now. For example, if the RDD used by two actions is not cached, the RDD will be evaluated twice, which will degrade the performance of the application. If the list of RDDs that are used multiple times is large then deciding which ones to cache can be tricky.
It is essential to cache RDDs in the following cases:
- repeated use of them in an iterative loop (for example in ML algorithms)
- repeated use of RDD multiple times in the same application or task.
- if cost of recovering RDD partitions is expensive (i.e. HDFS, after a complex set of
filter(), etc). Caching helps in the recovery process if the worker node dies.
Use appropriate caching strategy
In the last post we have been describing several different caching strategies. And of course, each of them has its pros and cons, otherwise, there would be only one strategy, wouldn't there?
The default strategy in Apache Spark is
MEMORY_AND_DISK and it is fine for the majority of pipelines and uses all the available memory in the cluster and thus speeds up the operations. If there is not enough memory for caching then Spark in this strategy saves the data on disk — reading blocks from disk is usually faster than re-evaluating.
DISK_ONLY only make sense for some special use-cases, but don't worry you will know exactly when you need them.
If you don't want to lose newly cached data for some reason, maybe replicating data costs less than losing this cached data completely. In this case, it's better to use *_2 caching strategies, but be aware that cached data will take twice as much space.
Another tradeoff you should think about is data volume vs pipeline speed. If you need high speed to access cached data and you have extra memory, it is better to use deserialized strategies. Overwise, if you are short on memory and speed is not a priority, then you should use serialized strategies.
Do not forget to unpersist
If an RDD is stored, but not used, the RDD will remain in memory until the application completes, or it is eliminated by another cached RDD. Thus, cached RDDs can take up precious memory and affect application execution, resulting in poor performance of the whole application.
Memory is not infinite, and when the cache fills up, Apache Spark will start preempting data that hasn't been used for a long time by LRU (Least Recently Used) strategy.
unpersist() will allow you to control what should be preempted. Don't forget that the more memory there is, the more efficient the Spark applications will be.
Clean up after yourself especially on shared clusters — don't forget to unpersist.
An example of the rather famous real-world error SPARK-3918 in MLlib leads to OOM in its application.
Avoid unnecessary caching
There is no universal answer to what should be cached. Caching an intermediate RDD can dramatically improve performance and it’s tempting to cache everything. However, that's not always a good idea, for example:
- Due to Spark’s default caching strategy (in-memory then swap to disk) the data can end up in slower storage
- Using that memory for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the RDD.
- Once the dataset is cached by Spark, the Catalyst optimizer's ability to optimize transformations may be limited. For example, it will no longer be able to do push filters to the source system because it now works on the in-memory data, not on the source database.
Thus, caching data is recommended only if it will be used repeatedly in the future, e.g., when applying transformations on the same RDDs or iteratively exploring a dataset for example during tuning ML models.
RDD checkpointing is a different concept than Spark Streaming checkpointing. The former concept is designed to solve the linearity problem, while the latter is designed to ensure reliable streaming and fault tolerance.
Caching is not the only way to reuse some computations. There is also a checkpointing mechanism. Checkpointing saves the data on disk and will break the RDD's lineage. Spark construct a new plan for the next transformations. Checkpointing is related to
localCheckpoint() API, which differ in the data storage.
The difference between checkpointing and
persist(DISK_ONLY) is that persisting materializes and stores the RDD in memory or on disk, depending on your configuration, and stores lineage graph. A lineage will keep track of what transformations has to be applied on that RDD, including the location from where it has to read the data. Checkpointing does not store lineage graph and only writes the contents of the RDD to disk so if the same data will be used next time it will be read from the checkpoint location without any processing.
Checkpointing is useful when you need to trim a query plan because it is too big. A large request plan can be a bottleneck in the driver where it is processed. Sometimes it can be the cause of OOM — for example, ML code often creates a large lineage.
Improper caching decisions in RDD can lead to duplicate computations or loss of precious memory space, resulting in serious performance degradation of Apache Spark applications.
There is no universal answer when choosing what should be cached. However, caching is very useful for applications that reuse RDD multiple times. For example, iterative machine learning algorithms include such RDDs that are reused in each iteration.
Caching keeps the lineage graph of RDDs. Checkpointing does not keep the lineage graph and saves the content of the RDD to disk.