Home / The 5-minute guide to using bucketing in Pyspark

The 5-minute guide to using bucketing in Pyspark

The 5-minute guide to using bucketing in Pyspark

There are many different tools in the world, each of which solves a range of problems. Many of them are judged by how well and correct they solve this or that problem, but there are tools that you just like, you want to use them. They are properly designed and fit well in your hand, you do not need to dig into the documentation and understand how to do this or that simple action. About one of these tools for me I will be writing this series of posts.

I will describe the optimization methods and tips that help me solve certain technical problems and achieve high efficiency. This is my updated collection.

Many of the optimizations that I will describe will not affect the JVM languages ​​so much, but without these methods, many Python applications may simply not work.

Whole series:


Let's start with the problem.

We've got two tables and we do one simple inner join by one column:

t1 = spark.table("unbucketed1")
t2 = spark.table("unbucketed2")

t1.join(t2, "key").explain()

In the physical plan, what you will get is something like the following:

== Physical Plan ==                                                             
*(5) Project [key#10L, value#11, value#15]
+- *(5) SortMergeJoin [key#10L], [key#14L], Inner
   :- *(2) Sort [key#10L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#10L, 200)
   :     +- *(1) Project [key#10L, value#11]
   :        +- *(1) Filter isnotnull(key#10L)
   :           +- *(1) FileScan parquet default.unbucketed1[key#10L,value#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
   +- *(4) Sort [key#14L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#14L, 200)
         +- *(3) Project [key#14L, value#15]
            +- *(3) Filter isnotnull(key#14L)
               +- *(3) FileScan parquet default.unbucketed2[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

SortMergeJoin is the default spark join, but now we are concerned with the other two things on the execution plan. They are two Exchange operations. We are always concerned with exchanges because they shuffle our data — we want to avoid it, well, unless there is no choice. But...

There must be a better way

We know that we joined by the key column so we will use this information just to get rid of these two exchanges.

How?

Use bucketing

Bucketing

Bucketing is an optimization technique that decomposes data into more manageable parts(buckets) to determine data partitioning. The motivation is to optimize the performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and hence stages), because the shuffle may not be necessary -- both DataFrames can be already located in the same partitions.

Bucketing is enabled by default. Spark SQL uses spark.sql.sources.bucketing.enabled configuration property to control whether it should be enabled and used for query optimization or not.

Bucketing specifies physical data placement so we pre shuffle our data because we want to avoid this data shuffle at runtime.

Oookay, do I really need to do an additional step if the shuffle will be executed anyway?

If you're joining multiple times than yes. The more joins the better performance gains.

An example of how to create a bucketed table:

df.write\
    .bucketBy(16, "key") \
    .sortBy("value") \
    .saveAsTable("bucketed", format="parquet")

So here, bucketBy distributes data across a fixed number of buckets(16 in our case) and can be used when a number of unique values are unbounded. If the number of unique values is limited it's better to use partitioning rather than bucketing.

t2 = spark.table("bucketed")
t3 = spark.table("bucketed")

# bucketed - bucketed join. 
# Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, "key").explain()

And resulting physical plan:

== Physical Plan ==
*(3) Project [key#14L, value#15, value#30]
+- *(3) SortMergeJoin [key#14L], [key#29L], Inner
   :- *(1) Sort [key#14L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [key#14L, value#15]
   :     +- *(1) Filter isnotnull(key#14L)
   :        +- *(1) FileScan parquet default.bucketed[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
   +- *(2) Sort [key#29L ASC NULLS FIRST], false, 0
      +- *(2) Project [key#29L, value#30]
         +- *(2) Filter isnotnull(key#29L)
            +- *(2) FileScan parquet default.bucketed[key#29L,value#30] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

Here we have not only fewer code-gen stages but also no exchanges.

As of Spark 2.4, Spark supports bucket pruning to optimize filtering on the bucketed column (by reducing the number of bucket files to scan).

Summary

Overall, bucketing is a relatively new technique that in some cases might be a great improvement both in stability and performance. However, I found that using it is not trivial and has many gotchas.

Bucketing works well when the number of unique values is unbounded. Columns that are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted which help optimize joins, aggregations, and queries on bucketed columns.

Full gist

Support author