Spark tips. DataFrame API

Spark tips. DataFrame API

There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.

I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency using Apache Spark. This is my updated collection.

Many of the optimizations that I will describe will not affect the JVM languages ​​so much, but without these methods, many Python applications may simply not work.

Whole series:

Use DataFrame API

We know that RDD is a fault-tolerant collection of elements that can be processed in parallel. But RDDs actually kind of black box of data — we know that it holds some data but we do not know the type of the data or any other properties of the data. Hence it's data cannot be optimized as well as the operations on it.

Spark 1.3 introduced a new abstraction — a DataFrame, in Spark 1.6 the Project Tungsten was introduced, an initiative which seeks to improve the performance and scalability of Spark. DataFrame data is organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

Using DataFrame API you get:

Efficient serialization/deserialization and memory usage


In RDDs Spark uses Java serialization, whenever it needs to distribute data over a cluster. Serializing individual Scala and Java objects are expensive. It became more expensive in Pyspark when all data go through double serialization/deserialization to java/scala then to python(using cloudpickle) and back again.

Double serialization cost is the most expensive part and the biggest takeaway for working with Pyspark. For distributed systems like Spark, a lot of our time is spent just serializing the data. Actual computations are frequently not the big blocking part — a lot of it is just serializing stuff and shuffling things around. We want to avoid this double serialization cost. In RDD we do have this double serialization cost and we pay it on a per operation basis — wherever we need to distribute the data on a cluster or call python function. It also requires sending both data and structure between nodes.

Because Spark understands the structure of DataFrames and column types it can figure out how to store them more efficiently. Two things that make DataFrame API that helps to do it(through Project Tungsten). First, the usage of off-heap storage for data in binary format. Second, generating encoder code on the fly for working with this binary format for your specific objects.

In simple words, Spark says:

I've got this efficient encoder that I just generated to pull this stuff out for you, but I'm going to store it in my compact format. And I'm going to generate these encoders so that I can pull the data right in that compact format and process it directly in that format over the chain of transformations. I'm going to minimize the amount of work I need to do in the JVM here(no I'm not going to Python) and maximize the amount of data I process in my compact off-heap stored format. So this way I will decrease memory usage because the Tungsten format is more compact than the on-heap format. Turns out you're not paying the penalty of serialization/deserialization — you're operating right out on this compact format and off-heap memory is a great way to reduce GC pauses because it's not in the GC's scope.

So that's wonderful we can keep our data inside the JVM but we can still write Python code and it will be efficient!

Schema Projection

RDD API uses schema projection explicitly. Therefore, a user needs to define the schema manually.

In DataFrame, there is no need to specify a schema explicitly. Generally, Spark can discover schema automatically. But, schema resolution depends mostly on data sources. If the source is supposed to contain structured data (i.e. relational database), the schema is retrieved directly without guessing.

More complex operations apply to semi-structured data, such as JSON files. In these cases, the schema is inferred.


Spark dataframe

RDDs cannot be optimized by Spark — they are entirely lambda driven. RDD is more of a black box of data that cannot be optimized because Spark can't look inside. So Spark can't do any optimizations on your behalf. It is felt more acutely in non-JVM languages like Python.

DataFrame has additional metadata due to its columnar format, which allows Spark to run certain optimizations on the finalized query. DataFrames query plans are created for execution using Spark Catalyst and executed using the Tungsten execution engine.

What is Catalyst?

Catalyst is the name of Spark’s integral query optimizer and execution planner for Datasets/DataFrames.

Catalyst is where most of the “magic” happens to improve the execution speed of your code. But in any complex system, “magic” is unfortunately not good enough to always guarantee optimal performance. Just as with relational databases, it is valuable to learn a bit about exactly how the optimizer works to understand its planning and tune your applications.

In particular, Catalyst can perform sophisticated refactors of complex queries. However, almost all of its optimizations are qualitative and rule-based rather than quantitative and statistics-based. For example, Spark knows how and when to do things like combine filters, or move filters before joins. Spark 2.0 even allows you to define, add, and test out your own additional optimization rules at runtime.

Catalyst supports both rule-based and cost-based optimization.

  1. Cost-Based Optimizer(CBO): If a SQL query can be executed in 2 different ways ( like path 1 and path2 for the same query), then what CBO does is, it basically calculates the cost of each path and analyze which path the costs less and then executes that path so that it can optimize the query execution.

  2. Rule-Based optimizer(RBO): this follows the rules which are needed for executing a query. So depending on the number of rules which are to be applied, the optimizer runs the query.

As an example, imagine we have users(they are coming from the database) and their transactions(I will generate some random values but it can be the database as well):

transactions = spark.range(160000)\
	.select(F.col("id").alias("key"), F.rand(12).alias("value"))
users =
	table="users", url="jdbc:postgresql://localhost:5432/postgres")

users.join(transactions, "key")\
	.filter(F.col("key") > 100)\

We see here that our developer in the programming rush wrote filtering users after join. It will be more reasonable to the first filter and then join because the shuffle data will be reduced. But if we will see the physical plan, we see that Catalyst did it for us. Moreover, he did a predicate pushdown of our filter into the database(Spark will attempt to move the filtering of data as close to the source as possible to avoid loading unnecessary data into memory, see PushedFilters).

== Physical Plan ==
*(5) SortMergeJoin [cast(key#6 as bigint)], [key#2L], Inner
:- *(2) Sort [cast(key#6 as bigint) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(key#6 as bigint), 200)
:     +- *(1) Filter (cast(key#6 as int) > 100)
:        +- *(1) Scan JDBCRelation(users) [numPartitions=1] [key#6,name#7] PushedFilters: [*IsNotNull(key)], ReadSchema: struct<key:string,name:string>
+- *(4) Sort [key#2L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#2L, 200)
      +- *(3) Project [id#0L AS key#2L, rand(12) AS value#3]
         +- *(3) Range (0, 160000, step=1, splits=1)

Spark doesn’t "own" any storage, so it does not build on-disk indexes, B-Trees, etc. (although its parquet file support, if used well, can get you some related features). Spark has been optimized for the volume, variety, etc. of big data – so, traditionally, it has not been designed to maintain and use statistics about a dataset. E.g., where an RDBMS might know that a specific filter will eliminate most records, and apply it early in the query, Spark does not know this fact and won’t perform that optimization.

Hence, Spark still can't do more complicated pushes for example:

transactions = spark.range(160000) \
	.select(F.col("id").alias("key"), F.rand(12).alias("value"))
users =
	table="users", url="jdbc:postgresql://localhost:5432/postgres")

users.join(transactions, "key") \
	.drop_duplicates(["key"]) \

This gives us one more Exchange(shuffle) but not a predicate pushdown of distinct operation into the database. We need to rewrite it ourselves to do distinct first.

== Physical Plan ==
*(6) HashAggregate(keys=[key#6], functions=[first(key#2L, false), first(value#3, false)])
+- Exchange hashpartitioning(key#6, 200)
   +- *(5) HashAggregate(keys=[key#6], functions=[partial_first(key#2L, false), partial_first(value#3, false)])
      +- *(5) SortMergeJoin [cast(key#6 as bigint)], [key#2L], Inner
         :- *(2) Sort [cast(key#6 as bigint) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(cast(key#6 as bigint), 200)
         :     +- *(1) Scan JDBCRelation(users) [numPartitions=1] [key#6] PushedFilters: [*IsNotNull(key)], ReadSchema: struct<key:string>
         +- *(4) Sort [key#2L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(key#2L, 200)
               +- *(3) Project [id#0L AS key#2L, rand(12) AS value#3]
                  +- *(3) Range (0, 160000, step=1, splits=1)

To sum up, use Spark DataFrames, unleash the power of Catalyst optimizer and Tungsten execution engine. You don't have to spend time tuning your RDDs you can focus on the business problem.

Understrandable code

RDD's code express more how of a solution better than what. It's a little difficult to read RDD based code and understand what's going on.

Using DataFrame API it easy to go on one level up — to business logic. Because DataFrame API looks similar to SQL, DataFrames has schema it's easy to see the actual work with data. So it using DataFrame you are closer to what you are trying to do rather than how you are trying to do it.

You're not worried about how our RDBMS goes in and figures out whether to do a table scan or which indexes to use — all you want is the result. You express what you want and you let the RDBMS under the covers figure out the most efficient way to do that.

DataFrame-based API is the primary API for MLlib

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the package.

Daily dose of