Important Considerations when filtering in Spark with filter and where

This blog post explains how to filter in Spark and discusses the vital factors to consider when filtering.

Poorly executed filtering operations are a common bottleneck in Spark analyses.

You need to make sure your data is stored in a format that is efficient for Spark to query. You also need to make sure the number of memory partitions after filtering is appropriate for your dataset.

Executing a filtering query is easy… filtering well is difficult. Read the Beautiful Spark book if you want to learn how to create create data lakes that are optimized for performant filtering operations.

Read this blog post closely. Filtering properly will make your analyses run faster and save your company money. It’s the easiest way to become a better Spark programmer.

Filter basics

Let’s create a DataFrame and view the contents:

val df = Seq(
  ("famous amos", true),
  ("oreo", true),
  ("ginger snaps", false)
).toDF("cookie_type", "contains_chocolate")
df.show()

+------------+------------------+
| cookie_type|contains_chocolate|
+------------+------------------+
| famous amos|              true|
|        oreo|              true|
|ginger snaps|             false|
+------------+------------------+

Now let’s filter the DataFrame to only include the rows with contains_chocolate equal to true.

val filteredDF = df.where(col("contains_chocolate") === lit(true))

filteredDF.show()

+-----------+------------------+
|cookie_type|contains_chocolate|
+-----------+------------------+
|famous amos|              true|
|       oreo|              true|
+-----------+------------------+

There are various alternate syntaxes that give you the same result and same performance.

  • df.where("contains_chocolate = true")
  • df.where($"contains_chocolate" === true)
  • df.where('contains_chocolate === true)

A separate section towards the end of this blog post demonstrates that all of these syntaxes generate the same execution plan, so they’ll all perform equally.

where is an alias for filter, so all these work as well:

  • df.filter(col("contains_chocolate") === lit(true))
  • df.filter("contains_chocolate = true")
  • df.filter($"contains_chocolate" === true)
  • df.filter('contains_chocolate === true)

Empty partition problem

A filtering operation does not change the number of memory partitions in a DataFrame.

Suppose you have a data lake with 25 billion rows of data and 60,000 memory partitions. Suppose you run a filtering operation that results in a DataFrame with 10 million rows. After filtering, you’ll still have 60,000 memory partitions, many of which will be empty. You’ll need to run repartition() or coalesce() to spread the data on an appropriate number of memory partitions.

Let’s look at some pseudocode:

val df = spark.read.parquet("/some/path") // 60,000 memory partitions
val filteredDF = df.filter(col("age") > 98) // still 60,000 memory partitions
// at this point, any operations performed on filteredDF will be super inefficient
val repartitionedDF = filtereDF.repartition(200) // down to 200 memory partitions

Let’s use the person_data.csv file that contains 100 rows of data and person_name and person_country columns to demonstrate this on a real dataset.

80 people are from China, 15 people are from France, and 5 people are from Cuba.

This code reads in the person_data.csv file and repartitions the data into 200 memory partitions.

val path = new java.io.File("./src/test/resources/person_data.csv").getCanonicalPath
val df = spark
  .read
  .option("header", "true")
  .csv(path)
  .repartition(200)

println(df.rdd.partitions.size) // 200

Let’s filter the DataFrame and verify that the number of memory partitions does not change:

val filteredDF = df.filter(col("person_country") === "Cuba")
println(filteredDF.rdd.partitions.size) // 200

There are only 5 rows of Cuba data and 200 memory partitions, so we know that at least 195 memory partitions are empty.

Having a lot of empty memory partitions significantly slows down analyses on production-sized datasets.

Selecting an appropriate number of memory partitions

Choosing the right number of memory partitions after filtering is difficult.

You can follow the 1GB per memory partition rule of thumb to estimate the number of memory partitions that’ll be appropriate for a filtered dataset.

Suppose you have 25 billion rows of data, which is 10 terabytes on disk (10,000 GB).

An extract with 500 million rows (2% of the total data) is probably around 200 GB of data (0.02 * 10,000), so 200 memory partitions should work well.

Underlying data stores

Filtering operations execute completely differently depending on the underlying data store.

Spark attempts to “push down” filtering operations to the database layer whenever possible because databases are optimized for filtering. This is called predicate pushdown filtering.

An operation like df.filter(col("person_country") === "Cuba") is executed differently depending on if the data store supports predicate pushdown filtering.

  • A parquet lake will send all the data to the Spark cluster, and perform the filtering operation on the Spark cluster
  • A Postgres database table will perform the filtering operation in Postgres, and then send the resulting data to the Spark cluster.

N.B. using a data lake that doesn’t allow for query pushdown is a common, and potentially massive bottleneck.

Column pruning

Spark will use the minimal number of columns possible to execute a query.

The df.select("person_country").distinct() query will be executed differently depending on the file format:

  • A Postgres database will perform the filter at the database level and only send a subset of the person_country column to the cluster
  • A Parquet data store will send the entire person_country column to the cluster and perform the filtering on the cluster (it doesn’t send the person_name column – that column is “pruned”)
  • A CSV data store will send the entire dataset to the cluster. CSV is a row based file format and row based file formats don’t support column pruning.

You almost always want to work with a file format or database that supports column pruning for your Spark analyses.

Cluster sizing after filtering

Depending on the data store, the cluster size needs might be completely different before and after performing a filtering operation.

Let’s say your 25 billion row dataset is stored in a parquet data lake and you need to perform a big filter and then do some advanced NLP on 1 million rows. You’ll need a big cluster to perform the initial filtering operation and a smaller cluster to perform the NLP analysis on the comparatively tiny dataset. For workflows like these, it’s often better to perform the filtering operation on a big cluster, repartition the data, write it to disk, and then perform the detailed analysis with a separate, smaller cluster on the extract.

Transferring big datasets from cloud storage to a cloud cluster and performing a big filtering operation is slow and expensive. You will generate a huge cloud compute bill with these types of workflows.

The pre / post filtering cluster requirements don’t change when you’re using a data storage that allows for query pushdown. The filtering operation is not performed in the Spark cluster. So you only need to use a cluster that can handle the size of the filtered dataset.

Partition filters

Data lakes can be partitioned on disk with partitionBy.

If the data lake is partitioned, Spark can use PartitionFilters, as long as the filter is using the partition key.

In our example, we could make a partitioned data lake with the person_country partition key as follows:

val path = new java.io.File("./src/test/resources/person_data.csv").getCanonicalPath
val df = spark
  .read
  .option("header", "true")
  .csv(path)
  .repartition(col("person_country"))

df
  .write
  .partitionBy("person_country")
  .option("header", "true")
  .csv("tmp/person_data_partitioned")

This’ll write out the data as follows:

person_data_partitioned/
  person_country=China/
    part-00059-dd8849eb-4e7d-4b6c-9536-59f94ea56412.c000.csv
  person_country=Cuba/
    part-00086-dd8849eb-4e7d-4b6c-9536-59f94ea56412.c000.csv
  person_country=France/
    part-00030-dd8849eb-4e7d-4b6c-9536-59f94ea56412.c000.csv

The “partition key” is person_country. Let’s use explain to verify that PartitionFilters are used when filtering on the partition key.

val partitionedPath = new java.io.File("tmp/person_data_partitioned").getCanonicalPath
spark
  .read
  .csv(partitionedPath)
  .filter(col("person_country") === "Cuba")
  .explain()
FileScan csv [_c0#132,person_country#133]
  Batched: false,
  Format: CSV,
  Location: InMemoryFileIndex[file:/Users/matthewpowers/Documents/code/my_apps/mungingdata/spark2/tmp/person_...,
  PartitionCount: 1,
  PartitionFilters: [isnotnull(person_country#133), (person_country#133 = Cuba)],
  PushedFilters: [],
  ReadSchema: struct<_c0:string>

Check out Beautiful Spark Code for a full description on how to build, update, and filter partitioned data lakes.

Explain with different filter syntax

filter and where are executed the same, regardless of whether column arguments or SQL strings are used.

Let’s verify that all the different filter syntaxes generate the same physical plan.

All of these code snippets generate the same physical plan:

df.where("person_country = 'Cuba'").explain()
df.where($"person_country" === "Cuba").explain()
df.where('person_country === "Cuba").explain()
df.filter("person_country = 'Cuba'").explain()

Here’s the generated physical plan:

== Physical Plan ==
(1) Project [person_name#152, person_country#153]
+- (1) Filter (isnotnull(person_country#153) && (person_country#153 = Cuba))
   +- (1) FileScan csv [person_name#152,person_country#153]
          Batched: false,
      Format: CSV,
          Location: InMemoryFileIndex[file:/Users/matthewpowers/Documents/code/my_apps/mungingdata/spark2/src/test/re...,
          PartitionFilters: [],
          PushedFilters: [IsNotNull(person_country),
          EqualTo(person_country,Cuba)],
          ReadSchema: struct<person_name:string,person_country:string>

Incremental updates with filter

Some filtering operations are easy to incrementally update with Structured Streaming + Trigger.Once.

See this blog post for more details.

Incrementally updating a dataset is often 100 times faster than rerunning the query on the entire dataset.

Conclusion

There are different syntaxes for filtering Spark DataFrames that are executed the same under the hood.

Optimizing filtering operations depends on the underlying data store. Your queries will be a lot more performant if the data store supports predicate pushdown filters.

If you’re working with a data storage format that doesn’t support predicate pushdown filters, try to create a partitioned data lake and leverages partition filters.

Transferring large datasets to the Spark cluster and performing the filtering in Spark is generally the slowest and most costly option. Avoid this query pattern whenever possible.

Filtering a Spark dataset is easy, but filtering in a performant, cost efficient manner is surprisingly hard. Filtering is a common bottleneck in Spark analyses.

Registration

8 Comments


  1. crystal clear explanation.


  2. I don’t get the point when you say “A parquet lake will send all the data to the Spark cluster, and perform the filtering operation on the Spark cluster”.
    I think, parquet does support predicate push down and it wont send the whole data to spark cluster. Please correct me if I am wrong.


    1. You raise a good point and I need to rephrase the sentence you quoted. Parquet performs some column pruning based on min/max statistics in the Parquet metadata, but it doesn’t typically allow for any predicate pushdown filters. I will write another blog post to discuss this in detail. Other data stores like Postgres & Snowflake allow for a lot more predicate pushdown filtering opportunities.


  3. Excellent Article. Thanks you so much 🙂


  4. Nicely explained!! In your opinion which of the presented data stores is the best ? (CSV,Parquet or Postgres) For an industrial size database.


  5. Thank you, I’m facing the same issue now, I only have 50 millions records, and when i try to filter, it hang without any response
    What I did, is after many transformation I saved the data to hive and then re-importing into spark cluster, then i used the filter and it works fine! Do you have any explanation?


  6. In fact, even CSV supports predicate pushdown, as shown in the plan you shared:

    “`
    Format: CSV,
    Location: InMemoryFileIndex[file:/Users/matthewpowers/Documents/code/my_apps/mungingdata/spark2/src/test/re…,
    PartitionFilters: [],
    PushedFilters: [IsNotNull(person_country),
    EqualTo(person_country,Cuba)],
    ReadSchema: struct
    “`

    Note the value of `PushedFilters`.

    What this does is apply the filter as Spark is reading the source data files, so non-matching rows don’t get shipped to Spark.

    I think what may be confusing the discussion here about predicate pushdown is that Spark will still have to scan through all the rows. It’s a plain CSV, after all. So the filter was pushed down, but that won’t save Spark from scanning the whole file.

    With an indexed Postgres table, by contrast, a pushed down filter will not only filter out non-matching rows at the source, but assuming the table has the right indexes, the non-matching rows will never be scanned to begin with.

    But note that a plain, unindexed Postgres table will have to be scanned all the way through, just like a CSV file would. Postgres has no way to skip over non-matching rows without an index, even if the filter was pushed down.

Comments are closed, but trackbacks and pingbacks are open.