Home
Tags Projects About License

Spark. Anatomy of Spark application

Spark. Anatomy of Spark application

Apache Spark is considered as a powerful complement to Hadoop, big data’s original technology. Spark is a more accessible, powerful, and capable big data tool for tackling various big data challenges. It has become mainstream and the most in-demand big data framework across all major industries. Spark has become part of the Hadoop since 2.0. And is one of the most useful technologies for Python Big Data Engineers.

This series of posts is a single-stop resource that gives a Spark architecture overview and it's good for people looking to learn spark.

Whole series:


Architecture

Spark yarn architecture

The components of the spark application are:

  • Driver
  • Application Master
  • Spark Context
  • Cluster Resource Manager(aka Cluster Manager)
  • Executors

Spark uses a master/slave architecture with a central coordinator called Driver and a set of executable workflows called Executors that are located at various nodes in the cluster.

Driver

The Driver(aka driver program) is responsible for converting a user application to smaller execution units called tasks and then schedules them to run with a cluster manager on executors. The driver is also responsible for executing the Spark application and returning the status/results to the user.

Spark Driver contains various components – DAGScheduler, TaskScheduler, BackendScheduler and BlockManager. They are responsible for the translation of user code into actual Spark jobs executed on the cluster.

Other Driver properties:

  • can run in an independent process or on one of the work nodes for High Availability (HA);
  • stores metadata about all Resilient Distributed Databases and their partitions;
  • is created after the user sends the Spark application to the cluster manager (YARN in our case);
  • runs in its own JVM;
  • optimizes logical DAG transformations and, if possible, combines them in stages and determines the best location for execution of this DAG;
  • creates Spark WebUI with detailed information about the application;

Application Master

As we described in first post — Application Master is a framework-specific entity charged with negotiating resources with ResourceManager(s) and working with NodeManager(s) to perform and monitor application tasks. Each application running on the cluster has its own, dedicated Application Master instance.

Spark Master is created simultaneously with Driver on the same node (in case of cluster mode) when a user submits the Spark application using spark-submit.

The Driver informs the Application Master of the executor's needs for the application, and the Application Master negotiates the resources with the Resource Manager to host these executors.

In offline mode, the Spark Master acts as Cluster Manager.

Spark Context

Spark Context is the main entry point into Spark functionality, and therefore the heart of any Spark application. It allows Spark Driver to access the cluster through its Cluster Resource Manager and can be used to create RDDs, accumulators and broadcast variables on the cluster. Spark Context also tracks executors in real-time by sending regular heartbeat messages.

Spark Context is created by Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the lifetime of the Spark application.

Spark Context stops working after the Spark application is finished. For each JVM only one Spark Context can be active. You must stop()activate Spark Context before creating a new one.

Cluster Resource Manager

Cluster Manager in a distributed Spark application is a process that controls, governs, and reserves computing resources in the form of containers on the cluster. These containers are reserved by request of Application Master and are allocated to Application Master when they are released or available.

Once the containers are allocated by Cluster Manager, the Application Master transfers the container resources back to the Spark Driver, and the Spark Driver is responsible for performing the various steps and tasks of the Spark application.

SparkContext can connect to different types of Cluster Managers. Now the most popular types are YARN, Mesos, Kubernetes or even Nomad. There is also Spark's own standalone cluster manager.

Fun fact is that Mesos was also developed by the creator of Spark.

Executors

Executors are the processes at the worker's nodes, whose job is to complete the assigned tasks. These tasks are executed on the worker nodes and then return the result to the Spark Driver.

Executors are started once at the beginning of Spark Application and then work during all life of the application, this phenomenon is known as "Static Allocation of Executors". However, users can also choose to dynamically allocate executors where they can add or remove executors to Spark dynamically to match the overall workload (but this can affect other applications running on the cluster). Even if one Spark executor crashes, the Spark application can continue to work.

Performers provide storage either in-memory for RDD partitions that are cached (locally) in Spark applications (via BlockManager) or on disk while using localCheckpoint.

Other executor properties:

Other performer properties:

  • stores data in a cache in a JVM heap or on disk
  • reads data from external sources
  • writes data to external sources
  • performs all data processing

Spark Application running steps

On this level of understanding let's create and break down one of the simplest Spark applications.

from pyspark.sql import SparkSession

# initialization of spark context
conf = SparkConf().setAppName(appName).setMaster(master) 
sc = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .config(conf=conf)\
        .getOrCreate()

# read data from HDFS, as a result we get RDD of lines
linesRDD = sc.textFile("hdfs://...")

# from RDD of lines create RDD of lists of words 
wordsRDD = linesRDD.flatMap(lambda line: line.split(" ")

# from RDD of lists of words make RDD of words tuples where 
# the first element is a word and the second is counter, at the
# beginning it should be 1
wordCountRDD= wordsRDD.map(lambda word: (word, 1))

# combine elements with the same word value
resultRDD = wordCountRDD.reduceByKey(lambda a, b: a + b)

# write it back to HDFS
resultRDD.saveAsTextFile("hdfs://...")
spark.stop()    

Spark word count example

  1. When we send the Spark application in cluster mode, the spark-submit utility communicates with the Cluster Resource Manager to start the Application Master.

  2. The Resource Manager is then held responsible for selecting the necessary container in which to run the Application Master. The Resource Manager then tells a specific Node Manager to launch the Application Master.

  3. The Application Master registers with the Resource Manager. Registration allows the client program to request information from the Resource Manager, that information allows the client program to communicate directly with its own Application Master.

  4. The Spark Driver then runs on the Application Master container (in case of cluster mode).

  5. The driver implicitly converts user code containing transformations and actions into a logical plan called a DAG. All RDDs are created in the driver and do nothing until the action is called. At this stage, the driver also performs optimizations such as pipelining narrow transformations.

  6. It then converts the DAG into a physical execution plan. After conversion to a physical execution plan, the driver creates physical execution units called tasks at each stage.

  7. The Application Master now communicates with the Cluster Manager and negotiates resources. Cluster Manager allocates containers and asks the appropriate NodeManagers to run the executors on all selected containers. When executors run, they register with the Driver. This way, the Driver has a complete view of the artists.

  8. At this point, the Driver will send tasks to executors via Cluster Manager based on the data placement.

  9. The code of the user application is launched inside the container. It provides information (stage of execution, status) to the Application Master.

  10. At this stage, we will start to execute our code. Our first RDD will be created by reading data in parallel from HDFS to different partitions on different nodes based on HDFS InputFormat. Thus, each node will have a subset of data.

  11. After reading the data we have two map transformations which will be executed in parallel on each partition.

  12. Next, we have a reduceByKey transformation, it is not a narrow transformation like map, so it will create an additional stage. It combines records with the same keys, then moves data between nodes (shuffle) and partitions to combine the keys of the same record.

  13. We then perform an action — write back to HDFS which will trigger the entire DAG execution.

  14. During the execution of the user application, the client communicates with the Application Master to obtain the application status.

  15. When the application finishes executing and all of the necessary work is done, the Application Master disconnects itself from the Resource Manager and stops, freeing up its container for other purposes.

Recommended books



Previous post
Buy me a coffee
Next post

More? Well, there you go:

Spark History Server and monitoring jobs performance

Spark core concepts explained

Things you need to know about Hadoop and YARN being a Spark developer