Tags Projects About License

Spark Tips. DataFrame API

Spark Tips. DataFrame API

There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.

I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency using Apache Spark. This is my updated collection.

Many of the optimizations that I will describe will not affect the JVM languages ​​so much, but without these methods, many Python applications may simply not work.

Whole series:

Use DataFrame API

We know that RDD is a fault-tolerant collection of elements that can be processed in parallel. But RDDs actually kind of black box of data — we know that it holds some data but we do not know the type of the data or any other properties of the data. Hence it's data cannot be optimized as well as the operations on it.

Spark 1.3 introduced a new abstraction — a DataFrame, in Spark 1.6 the Project Tungsten was introduced, an initiative which seeks to improve the performance and scalability of Spark. DataFrame data is organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

Using DataFrame API you get:

Efficient serialization/deserialization and memory usage


In RDDs Spark uses Java serialization when it is necessary to distribute data across a cluster. The serialization of individual Scala and Java objects is expensive. In Pyspark, it has become more expensive when all data is double-serialized/deserialized to Java/Scala and then to Python (using cloudpickle) and back.

The cost of double serialization is the most expensive part when working with Pyspark. For distributed systems such as Spark, most of our time is spent only on serialization of data. Actual computing is often not a big blocking part — most of it is just serializing things and shuffling them around. We want to avoid this double cost of serialization.

In RDD we have this double serialization, and we pay for it on every operation — wherever we need to distribute data into a cluster or call the python function. It also requires both data and structure to be transferred between nodes.

With the DataFrame API, everything is a bit different. Since Spark understands data structure and column types if they are represented as DataFrame, he can understand how to store and manage them more efficiently(Java objects have a large inherent memory overhead as well as JVM GC).

The DataFrame API does two things that help to do this (through the Tungsten project). First, using off-heap storage for data in binary format. Second, generating encoder code on the fly to work with this binary format for your specific objects.

In simple words, Spark says:

I'm going to generate these efficient encoders so that I can pull the data right out in this compact format and process it in that format right through the transformation chain. I'm going to minimize the amount of work I'm required to do in JVM (no, I'm not going to do anything in Python) and maximize the amount of data I can process in my compact format stored in off-heap memory. It turns out that you don't pay a penalty for serialization/deserialization — you work directly on this compact format, and using off-heap memory is a great way to reduce GC pauses because it is not in the scope of GC.

So it's great that we can store our data inside the JVM, but we can still write code in Python using a very clear API and it will still be very efficient!

Schema Projection

The RDD API explicitly uses schematic projection. Therefore the user needs to define the schema manually.

There's no need to specify the schema explicitly in DataFrame. As a rule, Spark can detect the schema automatically(inferSchema option). But the schema's resolution depends mainly on the data sources. If the source should contain structured data (i.e. relational database), the schema is extracted directly without any guesswork.

More complex operations apply to semi-structured data, such as JSON files. In these cases the schema is guesswork.


Spark dataframe

RDD cannot be optimized by Spark — they are fully lambda driven. RDD is rather a "black box" of data, which cannot be optimized because Spark can' t look inside what the data actually consists of. So Spark can't do any optimizations on your behalf. This feels more acute in non-JVM languages such as Python.

DataFrame has additional metadata due to its column format, which allows Spark to perform certain optimizations on a completed request. Before your query is run, a logical plan is created using Catalyst Optimizer and then it's executed using the Tungsten execution engine.

What is Catalyst?

Catalyst Optimizer — the name of the integrated query optimizer and execution scheduler for Spark Datasets/DataFrame.

Catalyst Optimizer is the place where most of the "magic" tends to improve the speed of your code execution by logically improving it. But in any complex system, unfortunately, "magic" is not enough to always guarantee optimal performance. As with relational databases, it is useful to learn a little bit about how the optimizer works to understand how it plans and customizes your applications.

In particular, Catalyst Optimizer can perform refactoring of complex queries. However, almost all of its optimizations are qualitative and rule-based rather than quantitative and statistical. For example, Spark knows how and when to do things like combine filters or move filters before joining. Spark 2.0 even allows you to define, add, and test your own additional optimization rules at runtime.

Catalyst Optimizer supports both rule-based and cost-based optimization.

  1. Cost-Based Optimizer(CBO): If an SQL query can be executed in two different ways (e.g. #1 and #2 for the same original query), then CBO essentially calculates the cost of each path and analyzes which path is cheaper, and then executes that path to improve the query execution.

  2. Rule-Based optimizer(RBO): follows different optimization rules that apply depending on the query. These include constant folding, predicate pushdown, projection pruning, null propagation, Boolean simplification, and other rules.

In fact, there is no separation inside the Spark, the two approaches work together, cost-based optimization is performed by generating multiple plans using rules, and then computing their costs.

As an example, imagine that we have users (they come from the database) and their transactions (I will generate some random values, but it could be a database as well):

transactions = spark.range(160000)\
    .select(F.col('id').alias('key'), F.rand(12).alias('value'))
users = spark.read.jdbc(
    table='users', url='jdbc:postgresql://localhost:5432/postgres')

users.join(transactions, 'key')\
    .filter(F.col('key') > 100)\

We see here that our developer in a programming rush wrote user filtering after join. It would be more reasonable to do the filter first and then join because the shuffle data will be reduced. But if we see the physical plan, we see that Catalyst Optimizer did it for us. What's more, it did a predicate pushdown of our filter to the database (Spark will try to move the filtering data as close to the source as possible to avoid loading unnecessary data into memory, see PushedFilters).

== Physical Plan ==
*(5) SortMergeJoin [cast(key#6 as bigint)], [key#2L], Inner
:- *(2) Sort [cast(key#6 as bigint) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(key#6 as bigint), 200)
:     +- *(1) Filter (cast(key#6 as int) > 100)
:        +- *(1) Scan JDBCRelation(users) [numPartitions=1] [key#6,name#7] PushedFilters: [*IsNotNull(key)], ReadSchema: struct<key:string,name:string>
+- *(4) Sort [key#2L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#2L, 200)
      +- *(3) Project [id#0L AS key#2L, rand(12) AS value#3]
         +- *(3) Range (0, 160000, step=1, splits=1)

Spark does not "own" any storage, so it does not build indexes, B-trees, etc. on the disk. (although support for parquet files, if used well, may give you some related features). Spark has been optimized for the volume, variety, etc. of big data — so, traditionally, it is not designed to maintain and use statistics. about the dataset. For example, in cases where the DBMS may know that a particular filter will remove most records and apply it early in the query, Spark does not know this fact and will not perform such optimization.

transactions = spark.range(160000) \
    .select(F.col('id').alias('key'), F.rand(12).alias('value'))
users = spark.read.jdbc(
    table='users', url='jdbc:postgresql://localhost:5432/postgres')

users.join(transactions, 'key') \
    .drop_duplicates(['key']) \

This gives us one more Exchange(shuffle) but not a predicate pushdown of distinct operation into the database. We need to rewrite it ourselves to do distinct first.

== Physical Plan ==
*(6) HashAggregate(keys=[key#6], functions=[first(key#2L, false), first(value#3, false)])
+- Exchange hashpartitioning(key#6, 200)
   +- *(5) HashAggregate(keys=[key#6], functions=[partial_first(key#2L, false), partial_first(value#3, false)])
      +- *(5) SortMergeJoin [cast(key#6 as bigint)], [key#2L], Inner
         :- *(2) Sort [cast(key#6 as bigint) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(cast(key#6 as bigint), 200)
         :     +- *(1) Scan JDBCRelation(users) [numPartitions=1] [key#6] PushedFilters: [*IsNotNull(key)], ReadSchema: struct<key:string>
         +- *(4) Sort [key#2L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(key#2L, 200)
               +- *(3) Project [id#0L AS key#2L, rand(12) AS value#3]
                  +- *(3) Range (0, 160000, step=1, splits=1)

To sum up, use Spark DataFrames, release the power of [Catalyst Optimizer] (https://databricks.com/glossary/catalyst-optimizer) and [Tungsten execution engine] (https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html). You do not have to waste time tuning your RDDs, you can focus on the business problem.

Understandable code

The code in RDD expresses more how of a solution better than what. It is a bit difficult to read the code based on RDD and understand what is going on.

Using DataFrame API it's easy to go one level up — to business logic. Since DataFrame API looks like SQL and there is a schema, it's easy to see the real work with data. So, using DataFrame API, you are closer to what you are trying to achieve and not to how you are doing it.

You don't have to worry about how the DBMS find out if you need to perform a table scan or which indexes to use — all you need is a result. You express what you want, and you let Spark under the cover find the most effective way to do it.

DataFrame-based API is the primary API for MLlib

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

Recommended books

Buy me a coffee
Next post

More? Well, there you go:

Spark tips. Don't collect data on driver

Explaining the mechanics of Spark caching

Spark Tips. Caching