Partitioning on Disk with partitionBy

Spark writers allow for data to be partitioned on disk with partitionBy. Some queries can run 50 to 100 times faster on a partitioned data lake, so partitioning is vital for certain queries.

Creating and maintaining partitioned data lake is hard.

This blog post discusses how to use partitionBy and explains the challenges of partitioning production-sized datasets on disk. Different memory partitioning tactics will be discussed that let partitionBy operate more efficiently.

You’ll need to master the concepts covered in this blog to create partitioned data lakes on large datasets, especially if you’re dealing with a high cardinality or high skew partition key.

Make sure to read Writing Beautiful Spark Code for a detailed overview of how to create production grade partitioned lakes.

Memory partitioning vs. disk partitioning

coalesce() and repartition() change the memory partitions for a DataFrame.

partitionBy() is a DataFrameWriter method that specifies if the data should be written to disk in folders. By default, Spark does not write data to disk in nested folders.

Memory partitioning is often important independent of disk partitioning. In order to write data on disk properly, you’ll almost always need to repartition the data in memory first.

Simple example

Suppose we have the following CSV file with first_name, last_name, and country columns:

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

Let’s partition this data on disk with country as the partition key. Let’s create one file per partition.

val path = new java.io.File("./src/main/resources/ss_europe/").getCanonicalPath
val df = spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)

val outputPath = new java.io.File("./tmp/partitioned_lake1/").getCanonicalPath
df
  .repartition(col("country"))
  .write
  .partitionBy("country")
  .parquet(outputPath)

Here’s what the data will look like on disk:

partitioned_lake1/
  country=Argentina/
    part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=China/
    part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=Russia/
    part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet

Creating one file per disk partition is not going to work for production sized datasets. Suppose the China partition contains 100GB of data – we won’t be able to write out all of that data in a single file.

partitionBy with repartition(5)

Let’s run repartition(5) to get each row of data in a separate memory partition before running partitionBy and see how that impacts how the files get written to disk.

val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
  .repartition(5)
  .write
  .partitionBy("country")
  .parquet(outputPath)

Here’s what the files look like on disk:

partitioned_lake2/
  country=Argentina/
    part-00003-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
  country=China/
    part-00000-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00004-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
  country=Russia/
    part-00001-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00002-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet

The partitionBy writer will write out files on disk for each memory partition. The maximum number of files written out is the number of unique countries multiplied by the number of memory partitions.

In this example, we have 3 unique countries * 5 memory partitions, so up to 15 files could get written out (if each memory partition had one Argentinian, one Chinese, and one Russian person). We only have 5 rows of data, so only 5 files are written in this example.

partitionBy with repartition(1)

If we repartition the data to one memory partition before partitioning on disk with partitionBy, then we’ll write out a maximum of three files. numMemoryPartitions * numUniqueCountries = maxNumFiles. 1 * 3 = 3.

Let’s take a look at the code.

val outputPath = new java.io.File("./tmp/partitioned_lake2/").getCanonicalPath
df
  .repartition(1)
  .write
  .partitionBy("country")
  .parquet(outputPath)

Here’s what the files look like on disk:

partitioned_lake3/
  country=Argentina/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
  country=China/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet
  country=Russia/
    part-00000-bc6ce757-d39f-489e-9677-0a7105b29e66.c000.snappy.parquet

Partitioning datasets with a max number of files per partition

Let’s use a dataset with 80 people from China, 15 people from France, and 5 people from Cuba. Here’s a link to the data.

Here’s what the data looks like:

person_name,person_country
a,China
b,China
c,China
...77 more China rows
a,France
b,France
c,France
...12 more France rows
a,Cuba
b,Cuba
c,Cuba
...2 more Cuba rows

Let’s create 8 memory partitions and scatter the data randomly across the memory partitions (we’ll write out the data to disk, so we can inspect the contents of a memory partition).

val outputPath = new java.io.File("./tmp/repartition_for_lake4/").getCanonicalPath
df
  .repartition(8, col("person_country"), rand)
  .write
  .csv(outputPath)

Let’s look at one of the CSV files that is outputted:

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

This technique helps us set a maximum number of files per partition when creating a partitioned lake. Let’s write out the data to disk and observe the output.

val outputPath = new java.io.File("./tmp/partitioned_lake4/").getCanonicalPath
df
  .repartition(8, col("person_country"), rand)
  .write
  .partitionBy("person_country")
  .csv(outputPath)

Here’s what the files look like on disk:

partitioned_lake4/
  person_country=China/
    part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 6 more files
  person_country=Cuba/
    part-00002-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00003-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 2 more files
  person_country=France/
    part-00000-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    part-00001-0887fbd2-4d9f-454a-bd2a-de42cf7e7d9e.c000.csv
    ... 5 more files

Each disk partition will have up to 8 files. The data is split randomly in the 8 memory partitions. There won’t be any output files for a given disk partition if the memory partition doesn’t have any data for the country.

This is better, but still not ideal. We have 4 files for Cuba and seven files for France, so too many small files are being created.

Let’s review the contents of our memory partition from earlier:

p,China
f1,China
n1,China
a2,China
b2,China
d2,China
e2,China
f,France
c,Cuba

partitionBy will split up this particular memory partition into three files: one China file with 7 rows of data, one France file with one row of data, and one Cuba file with one row of data.

Partitioning dataset with max rows per file

Let’s write some code that’ll create partitions with ten rows of data per file. We’d like our data to be stored in 8 files for China, one file for Cuba, and two files for France.

We can use the maxRecordsPerFile option to output files with 10 rows.

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)

This technique is particularity important for partition keys that are highly skewed. The number of inhabitants by country is a good example of a partition key with high skew. For example Jamaica has 3 million people and China has 1.4 billion people – we’ll want ~467 times more files in the China partition than the Jamaica partition.

Partitioning dataset with max rows per file pre Spark 2.2

The maxRecordsPerFile option was added in Spark 2.2, so you’ll need to write your own custom solution if you’re using an earlier version of Spark.

val countDF = df.groupBy("person_country").count()

val desiredRowsPerPartition = 10

val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )

val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)

We calculate the total number of records per partition key and then create a my_secret_partition_key column rather than relying on a fixed number of partitions.

You should choose the desiredRowsPerPartition based on what will give you ~1 GB files. If you have a 500 GB dataset with 750 million rows, set desiredRowsPerPartition to 1,500,000.

Small file problem

Partitioned data lakes can quickly develop a small file problem when they’re updated incrementally. It’s hard to compact partitioned data lakes. As we’ve seen, it’s even hard to make a partitioned data lake!

Use the tactics outlined in this blog post to build your partitioned data lakes and start them off without the small file problem!

Conclusion

Partitioned data lakes can be much faster to query (when filtering on the partition keys) because they allow for a massive data skipping.

Creating and maintaining partitioned data lakes is challenging, but the performance gains make them a worthwhile effort.

Registration

7 Comments


  1. Great article, really very helpful. If I want to use bucketBy on another column (since we can’t use the same column for partitionBy and bucketBy and doesn’t make sense), how do I avoid having m (partitions) * n (buckets) number of files in each folder.

    Reply

  2. Hi Author. Interesting post. I do have a doubt.

    what would be the output for below code.

    val outputPath = new java.io.File(“./tmp/partitioned_lake5/”).getCanonicalPath
    df
    .repartition(col(“Different_col”))
    .write
    .option(“maxRecordsPerFile”, 10)
    .partitionBy(“person_country”)
    .csv(outputPath)

    Reply

    1. Hi Srini,

      It would have the following O/P :

      China – 8 Partitions with each partition having exactly 10 records
      France – 2 Partitions with (10+5 records)
      Cuba – 1 Partition with (5 records)

      Reply

  3. Suppose we have the following schema on disk
    partitioned_lake2/
    country=Argentina/
    part-00003-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    country=China/
    part-00000-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00004-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    country=Russia/
    part-00001-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet
    part-00002-c2d1b76a-aa61-437f-affc-a6b322f1cf42.c000.snappy.parquet

    Now I want to load only the files in partition 00001 from each country. Is there a way to do that?

    Thanks in advance

    Reply

  4. This is a great article and very helpful, thank you for this write up! One important finding for me is that “maxRecordsPerFile” seems to be sequentially executed and actually shows an imbalance like problem (last few jobs will take a very long time to complete), for skewed data sets this will result in a fairly big issue.

    Using the old way (pre Spark 2.2) performs 4-5x better even tho the code will look a bit uglier.

    Reply

  5. Hi, Beautiful post

    I am new to spark…

    I get this error:
    val path = new java.io.File(“testfile.csv”).getCanonicalPath
    ^
    SyntaxError: invalid syntax

    code:
    val path = new java.io.File(“testfile.csv”).getCanonicalPath
    val df = spark
    .read
    .option(“header”, “true”)
    .option(“charset”, “UTF8”)
    .csv(path)

    val outputPath = new java.io.File(“./tmp/partitioned_lake1/”).getCanonicalPath
    df
    .repartition(col(“country”))
    .write
    .partitionBy(“country”)
    .parquet(outputPath)

    What is the solution to fix this?

    Thank you

    Reply

  6. If i want only one country in one partition what to do then.?

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *