Apache Spark is considered as a powerful complement to Hadoop, big data’s original technology. Spark is a more accessible, powerful and capable big data tool for tackling various big data challenges. It has become mainstream and the most in-demand big data framework across all major industries. Spark has become part of the Hadoop since 2.0. And is one of the most useful technologies for Python Big Data Engineers.
This series of posts is a single-stop resource that gives spark architecture overview and it's good for people looking to learn spark.
- Things you need to know about Hadoop and YARN being a Spark developer
- Spark core concepts explained
- Spark. Anatomy of Spark application
Apache Spark Architecture is based on two main abstractions:
- Resilient Distributed Dataset (RDD)
- Directed Acyclic Graph (DAG)
Let's dive in these concepts
RDD — the Spark basic concept
The key to understanding Apache Spark is RDD — the Resilient Distributed Dataset. RDD contains an arbitrary collection of objects. Each data set in RDD is logically distributed across the cluster nodes so that they can be processed in parallel.
Physically, RDD is stored as an object in driver JVM and it refers to the data stored either in a persisted store (HDFS, Cassandra, HBase, etc.) or the cache (memory, memory+disks, disk only, etc.) or another RDD.
RDD stores the following metadata:
▪ Partitions – set of data splits associated with this RDD. They are located on the cluster nodes. One partition is a minimal data batch which will be processed by each cluster node;
▪ Dependencies – list of parent RDDs involved in the computation aka lineage graph;
▪ Computation – function to compute child RDD given the parent RDD from the Dependencies;
▪ Preferred Locations – where is the best place to put computations on partitions (data locality);
▪ Partitioner – how the data is split into Partitions(by default they split by
RDD can be recreated as well as data that it refers to because every RDD knows how it was created (by storing the lineage graph). RDD also can be materialized, in memory or on disk.
>>> rdd = sc.parallelize(range(20)) # create RDD >>> rdd ParallelCollectionRDD at parallelize at PythonRDD.scala:195 >>> rdd.collect() # collect data on driver and show [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
RDD can also be cached and manually partitioned. Caching is beneficial when we use RDD several times(and may slow down our computations otherwise). And manual partitioning is important to correctly balance data between partitions. Generally, smaller partitions allow distributing data more equally, among more executors. Hence, fewer partitions may boost tasks with a lot of repartitions(data reorganizations during computations).
Let's check the number of partitions and data on them:
>>> rdd.getNumPartitions() # get current number of paritions 4 >>> rdd.glom().collect() # collect data on driver based on partitions [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]
All interesting that happens in Spark happens through RDD operations. That is, usually Spark applications look like the following — we create RDD (for example, we set data source as file on HDFS), we transform it (
reduce, ...), do something with the result (for example, we throw it back into HDFS).
Over RDD, you can do two types of operations (and, accordingly, all the work with the data is in the sequence of these two types): transformations and actions.
The result of applying this operation to RDD is a new RDD. As a rule, these are operations that in some way convert the elements of a given data.
Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark maintains the record of which operation is being called(through DAG, we will talk about it later).
We can think Spark RDD as the data, that we built up through transformation. Because of transformations laziness, we can execute operation any time by calling an action on data. Hence, data is not loaded until it is necessary. It gives plenty of opportunities to induce low-level optimizations.
At a high level, two groups of transformations can be applied onto the RDDs, namely narrow transformations, and wide transformations.
Narrow transformation doesn't require the data to be shuffled or reorganized across the partitions. For example,
filter, etc. The narrow transformations will be grouped (or pipe-lined) together into a single stage.
A shuffle occurs when data is rearranged between partitions. This is required when a transformation requires information from other partitions, such as summing all the values in a column. Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor.
But there are exceptions, operations like
coalesce may cause the task to work with multiple input partitions, but the transformation will still be considered narrow because the input records used to compute any output record can still be found only in a limited subset of partitions.
Let's use filter transformation on our data:
>>> filteredRDD = rdd.filter(lambda x: x > 10) >>> print(filteredRDD.toDebugString()) # to see the execution graph; only one stage is created (4) PythonRDD at RDD at PythonRDD.scala:53  | ParallelCollectionRDD at parallelize at PythonRDD.scala:195  >>> filteredRDD.collect() [11, 12, 13, 14, 15, 16, 17, 18, 19]
In this example we don't need to shuffle the data, each partition can be processed independently.
However, Spark also supports transformations with wide dependencies(namely wide transformations), such as
reduceByKey, etc. Within such dependencies, the data required for calculation may be located in several partitions of the parent RDD. All data with the same key must be in the same partition, processed by a single task. To implement these operations, Spark must perform the shuffling, moving data across the cluster and forming a new stage with a new set of partitions as in the example below:
>>> groupedRDD = filteredRDD.groupBy(lambda x: x % 2) # group data based on mod >>> print(groupedRDD.toDebugString()) # two separate stages are created, because of the shuffle (4) PythonRDD at RDD at PythonRDD.scala:53  | MapPartitionsRDD at mapPartitions at PythonRDD.scala:133  | ShuffledRDD at partitionBy at NativeMethodAccessorImpl.java:0  +-(4) PairwiseRDD at groupBy at <ipython-input-5-a92aa13dcb83>:1  | PythonRDD at groupBy at <ipython-input-5-a92aa13dcb83>:1  | ParallelCollectionRDD at parallelize at PythonRDD.scala:195 
Actions are applied when it is necessary to materialize the result — save the data to disk, or output part of the data to the console.
collect operation we used so far is also an action — it collects data.
Actions are not lazy — they actually will trigger the data processing. Actions are RDD operations that produce non-RDD values.
For example, to get the sum of our filtered data we can use
>> filteredRDD.reduce(lambda a, b: a + b) 135
Unlike Hadoop where the user has to break down the whole job into smaller jobs and chain them together to go along with MapReduce, Spark identifies the tasks that can be computed in parallel with partitioned data on the cluster. With these identified tasks, Spark builds a logical flow of operations that can be represented in a graph that is directed and acyclic, also known as DAG (Directed Acyclic Graph), where a node is RDD partition and the edge is transformation on the data. Thus Spark builds its plan of executions implicitly from the provided spark application.
The DAGScheduler divides operators into stages of tasks. A stage is consists of tasks based on the input data partitions. The DAGScheduler pipelines some transformations together. E.g. many map operators can be squash into a single stage. The final result of a DAGScheduler is a set of stages. The stages are passed on to the TaskScheduler. The number of tasks submitted depends on the number of partitions. The TaskScheduler launches tasks via the cluster manager. The TaskScheduler doesn't know about dependencies of the stages.
RDDs are capable of defining location preference to compute partitions. Location preference refers to information about the RDD location. The DAGScheduler places the partitions in such a way that the task is close to data as much as possible(data locality).
Daily dose of