Writing Parquet Files in Python with Pandas, PySpark, and Koalas

This blog post shows how to convert a CSV file to Parquet with Pandas, Spark, PyArrow and Dask.

It discusses the pros and cons of each approach and explains how both approaches can happily coexist in the same ecosystem.

Parquet is a columnar file format whereas CSV is row based. Columnar file formats are more efficient for most analytical queries. You can speed up a lot of your Panda DataFrame queries by converting your CSV files and working off of Parquet files.

All the code used in this blog is in this GitHub repo.

Pandas approach

Suppose you have the following data/us_presidents.csv file:

full_name,birth_year
teddy roosevelt,1901
abe lincoln,1809

You can easily read this file into a Pandas DataFrame and write it out as a Parquet file as described in this Stackoverflow answer.

import pandas as pd

def write_parquet_file():
    df = pd.read_csv('data/us_presidents.csv')
    df.to_parquet('tmp/us_presidents.parquet')

write_parquet_file()

This code writes out the data to a tmp/us_presidents.parquet file.

Let’s read the Parquet data into a Pandas DataFrame and view the results.

df = pd.read_parquet('tmp/us_presidents.parquet')
print(df)

         full_name  birth_year
0  teddy roosevelt        1901
1      abe lincoln        1809

Pandas provides a beautiful Parquet interface. Pandas leverages the PyArrow library to write Parquet files, but you can also write Parquet files directly from PyArrow.

PyArrow

PyArrow lets you read a CSV file into a table and write out a Parquet file, as described in this blog post. The code is simple to understand:

import pyarrow.csv as pv
import pyarrow.parquet as pq

table = pv.read_csv('./data/people/people1.csv')
pq.write_table(table, './tmp/pyarrow_out/people1.parquet')

PyArrow is worth learning because it provides access to file schema and other metadata stored in the Parquet footer. Studying PyArrow will teach you more about Parquet.

Dask

Dask is a parallel computing framework that makes it easy to convert a lot of CSV files to Parquet files with a single operation as described in this post.

Here’s a code snippet, but you’ll need to read the blog post to fully understand it:

import dask.dataframe as dd

df = dd.read_csv('./data/people/*.csv')
df.to_parquet('./tmp/people_parquet2', write_index=False)

Dask is similar to Spark and easier to use for folks with a Python background. Spark is still worth investigating, especially because it’s so powerful for big data sets.

PySpark

Let’s read the CSV data to a PySpark DataFrame and write it out in the Parquet format.

We’ll start by creating a SparkSession that’ll provide us access to the Spark CSV reader.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .getOrCreate()

df = spark.read.csv('data/us_presidents.csv', header = True)
df.repartition(1).write.mode('overwrite').parquet('tmp/pyspark_us_presidents')

We need to specify header = True when reading the CSV to indicate that the first row of data is column headers.

Spark normally writes data to a directory with many files. The directory only contains one file in this example because we used repartition(1). Spark can write out multiple files in parallel for big datasets and that’s one of the reasons Spark is such a powerful big data engine.

Let’s look at the contents of the tmp/pyspark_us_presidents directory:

pyspark_us_presidents/
  _SUCCESS
  part-00000-81610cf2-dc76-481e-b302-47b59e06d9b6-c000.snappy.parquet

The part-00000-81...snappy.parquet file contains the data. Spark uses the Snappy compression algorithm for Parquet files by default.

Let’s read tmp/pyspark_us_presidents Parquet data into a DataFrame and print it out.

df = spark.read.parquet('tmp/pyspark_us_presidents')
df.show()

+---------------+----------+
|      full_name|birth_year|
+---------------+----------+
|teddy roosevelt|      1901|
|    abe lincoln|      1809|
+---------------+----------+

Setting up a PySpark project on your local machine is surprisingly easy, see this blog post for details.

Koalas

koalas lets you use the Pandas API with the Apache Spark execution engine under the hood.

Let’s read the CSV and write it out to a Parquet folder (notice how the code looks like Pandas):

import databricks.koalas as ks

df = ks.read_csv('data/us_presidents.csv')
df.to_parquet('tmp/koala_us_presidents')

Read the Parquet output and display the contents:

df = ks.read_parquet('tmp/koala_us_presidents')
print(df)

         full_name  birth_year
0  teddy roosevelt        1901
1      abe lincoln        1809

Koalas outputs data to a directory, similar to Spark. Here’s what the tmp/koala_us_presidents directory contains:

koala_us_presidents/
  _SUCCESS
  part-00000-1943a0a6-951f-4274-a914-141014e8e3df-c000.snappy.parquet

Pandas and Spark can happily coexist

Pandas is great for reading relatively small datasets and writing out a single Parquet file.

Spark is great for reading and writing huge datasets and processing tons of files in parallel.

Suppose your data lake currently contains 10 terabytes of data and you’d like to update it every 15 minutes. You get 100 MB of data every 15 minutes. Maybe you setup a lightweight Pandas job to incrementally update the lake every 15 minutes. You can do the big extracts and data analytics on the whole lake with Spark.

Next steps

The Delta Lake project makes Parquet data lakes a lot more powerful by adding a transaction log. This makes it easier to perform operations like backwards compatible compaction, etc.

I am going to try to make an open source project that makes it easy to interact with Delta Lakes from Pandas. The Delta lake design philosophy should make it a lot easier for Pandas users to manage Parquet datasets. Stay tuned!

Registration

1 Comment


  1. I got an error while using pyspark.

Comments are closed, but trackbacks and pingbacks are open.