Fast Filtering with Spark PartitionFilters and PushedFilters

Spark can use the disk partitioning of files to greatly speed up certain filtering operations.

This post explains the difference between memory and disk partitioning, describes how to analyze physical plans to see when filters are applied, and gives a conceptual overview of why this design pattern can provide massive performace gains.

Normal DataFrame filter

Let’s create a CSV file (/Users/powers/Documents/tmp/blog_data/people.csv) with the following data:

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

Let’s read in the CSV data into a DataFrame:

val df = spark
  .read
  .option("header", "true")
  .csv("/Users/powers/Documents/tmp/blog_data/people.csv")

Let’s write a query to fetch all the Russians in the CSV file with a first_name that starts with M.

df
  .where($"country" === "Russia" && $"first_name".startsWith("M"))
  .show()
+----------+---------+-------+
|first_name|last_name|country|
+----------+---------+-------+
|     Maria|Sharapova| Russia|
+----------+---------+-------+

Let’s use explain() to see how the query is executed.

df
  .where($"country" === "Russia" && $"first_name".startsWith("M"))
  .explain()
== Physical Plan ==
Project [first_name#12, last_name#13, country#14]
+- Filter (((isnotnull(country#14) && isnotnull(first_name#12)) && (country#14 = Russia)) && StartsWith(first_name#12, M))
   +- FileScan csv [first_name#12,last_name#13,country#14]
        Batched: false,
        Format: CSV,
        Location: InMemoryFileIndex[file:/Users/powers/Documents/tmp/blog_data/people.csv],
        PartitionFilters: [],
        PushedFilters: [IsNotNull(country), IsNotNull(first_name), EqualTo(country,Russia), StringStartsWith(first_name,M)],
        ReadSchema: struct

Take note that there are no PartitionFilters in the physical plan.

partitionBy()

The repartition() method partitions the data in memory and the partitionBy() method partitions data in folders when it’s written out to disk.

Let’s write out the data in partitioned CSV files.

df
  .repartition($"country")
  .write
  .option("header", "true")
  .partitionBy("country")
  .csv("/Users/powers/Documents/tmp/blog_data/partitioned_lake")

Here’s what the directory structure looks like:

partitioned_lake/
  country=Argentina/
    part-00044-c5d2f540-e89b-40c1-869d-f9871b48c617.c000.csv
  country=China/
    part-00059-c5d2f540-e89b-40c1-869d-f9871b48c617.c000.csv
  country=Russia/
    part-00002-c5d2f540-e89b-40c1-869d-f9871b48c617.c000.csv

Here are the contents of the CSV file in the country=Russia directory.

first_name,last_name
Vladimir,Putin
Maria,Sharapova

Notice that the country column is not included in the CSV file anymore. Spark has abstracted a column from the CSV file to the directory name.

PartitionFilters

Let’s read from the partitioned data folder, run the same filters, and see how the physical plan changes.

Let’s run the same filter as before, but on the partitioned lake, and examine the physical plan.

val partitionedDF = spark
  .read
  .option("header", "true")
  .csv("/Users/powers/Documents/tmp/blog_data/partitioned_lake")

partitionedDF
  .where($"country" === "Russia" && $"first_name".startsWith("M"))
  .explain()
== Physical Plan ==
Project [first_name#74, last_name#75, country#76]
+- Filter (isnotnull(first_name#74) && StartsWith(first_name#74, M))
   +- FileScan csv [first_name#74,last_name#75,country#76]
        Batched: false,
        Format: CSV,
        Location: InMemoryFileIndex[file:/Users/powers/Documents/tmp/blog_data/partitioned_lake],
        PartitionCount: 1,
        PartitionFilters: [isnotnull(country#76), (country#76 = Russia)],
        PushedFilters: [IsNotNull(first_name), StringStartsWith(first_name,M)],
        ReadSchema: struct

You need to examine the physical plans carefully to identify the differences.

When filtering on df we have PartitionFilters: [] whereas when filtering on partitionedDF we have PartitionFilters: [isnotnull(country#76), (country#76 = Russia)].

Spark only grabs data from certain partitions and skips all of the irrelevant partitions. Data skipping allows for a big performance boost.

PushedFilters

When we filter off of df, the pushed filters are [IsNotNull(country), IsNotNull(first_name), EqualTo(country,Russia), StringStartsWith(first_name,M)].

When we filter off of partitionedDf, the pushed filters are [IsNotNull(first_name), StringStartsWith(first_name,M)].

Spark doesn’t need to push the country filter when working off of partitionedDF because it can use a partition filter that is a lot faster.

Partitioning in memory vs. partitioning on disk

repartition() and coalesce() change how data is partitioned in memory.

partitionBy() changes how data is partitioned when it’s written out to disk.

Use repartition() before writing out partitioned data to disk with partitionBy() because it’ll execute a lot faster and write out fewer files.

Partitioning in memory and paritioning on disk are related, but completely different concepts that expert Spark programmers must master.

Disk partitioning with skewed columns

Suppose you have a data lake with information on all 7.6 billion people in the world. The country column is skewed because a lot of people live in countries like China and India and compatively few people live in countries like Montenegro.

This code is problematic because it will write out the data in each partition as a single file.

df
  .repartition($"country")
  .write
  .option("header", "true")
  .partitionBy("country")
  .csv("/Users/powers/Documents/tmp/blog_data/partitioned_lake")

We don’t our data lake to contain some massive files because that’ll make Spark reads / writes unnecessarily slow.

If we don’t do any in memory reparitioning, Spark will write out a ton of files for each partition and our data lake will contain way too many small files.

df
  .write
  .option("header", "true")
  .partitionBy("country")
  .csv("/Users/powers/Documents/tmp/blog_data/partitioned_lake")

This answer explains how to intelligently repartition in memory before writing out to disk with partitionBy().

Here’s how we can limit each partition to a maximum of 100 files.

import org.apache.spark.sql.functions.rand

df
  .repartition(100, $"country", rand)
  .write
  .option("header", "true")
  .partitionBy("country")
  .csv("/Users/powers/Documents/tmp/blog_data/partitioned_lake")

Next steps

I recommend rereading this blog post and running all the code on your local machine with the Spark shell.

Effective disk partitioning can greatly speed up filter operations.

Registration

Leave a Reply

Your email address will not be published. Required fields are marked *