Chaining Custom PySpark DataFrame Transformations

PySpark code should generally be organized as single purpose DataFrame transformations that can be chained together for production analyses (e.g. generating a datamart).

This blog post demonstrates how to monkey patch the DataFrame object with a transform method, how to define custom DataFrame transformations, and how to chain the function calls.

We’ll also demonstrate how to run multiple custom transformations with function composition using the cytoolz library.

If you’re using the Scala API, read this blog post on chaining DataFrame transformations with Scala.

Chaining DataFrame Transformations with lambda

Let’s monkey patch the DataFrame object with a transform method so we can chain DataFrame transformations.

from pyspark.sql.dataframe import DataFrame


def transform(self, f):
    return f(self)


DataFrame.transform = transform

This code snippet is from the quinn project.

Let’s define a couple of simple DataFrame transformations to test the transform method.

def with_greeting(df):
    return df.withColumn("greeting", lit("hi"))

def with_something(df, something):
    return df.withColumn("something", lit(something))

Let’s create a DataFrame and then run the with_greeting and with_something DataFrame transformations.

data = [("jose", 1), ("li", 2), ("liz", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])

actual_df = (source_df
    .transform(lambda df: with_greeting(df))
    .transform(lambda df: with_something(df, "crazy")))
print(actual_df.show())

+----+---+--------+---------+
|name|age|greeting|something|
+----+---+--------+---------+
|jose|  1|      hi|    crazy|
|  li|  2|      hi|    crazy|
| liz|  3|      hi|    crazy|
+----+---+--------+---------+

The lambda is optional for custom DataFrame transformations that only take a single DataFrame argument so we can refactor with_greeting line as follows:

actual_df = (source_df
    .transform(with_greeting)
    .transform(lambda df: with_something(df, "crazy")))

Without the DataFrame#transform method, we would have needed to write code like this:

df1 = with_greeting(source_df)
actual_df = with_something(df1, "moo")

The transform method improves our code by helping us avoid multiple order dependent variable assignments. Creating multiple variables gets especially ugly when 5+ transformations need to be run - you don’t want df1, df2, df3, df4, and df5 😡

Let’s define a DataFrame transformation with an alternative method signature to allow for easier chaining 😅

Chaining DataFrame Transformations with functools.partial

Let’s define a with_jacket DataFrame transformation that appends a jacket column to a DataFrame.

def with_jacket(word, df):
    return df.withColumn("jacket", lit(word))

We’ll use the same source_df DataFrame and with_greeting method from before and chain the transformations with functools.partial.

from functools import partial

actual_df = (source_df
    .transform(with_greeting)
    .transform(partial(with_jacket, "warm")))
print(actual_df.show())

+----+---+--------+------+
|name|age|greeting|jacket|
+----+---+--------+------+
|jose|  1|      hi|  warm|
|  li|  2|      hi|  warm|
| liz|  3|      hi|  warm|
+----+---+--------+------+

functools.partial helps us get rid of the lambda functions, but we can do even better…

Defining DataFrame transformations as nested functions

DataFrame transformations that are defined with nested functions have the most elegant interface for chaining. Let’s define a with_funny function that appends a funny column to a DataFrame.

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

We’ll use the same source_df DataFrame and with_greeting method from before.

actual_df = (source_df
     .transform(with_greeting)
     .transform(with_funny("haha")))
print(actual_df.show())

+----+---+--------+-----+
|name|age|greeting|funny|
+----+---+--------+-----+
|jose|  1|      hi| haha|
|  li|  2|      hi| haha|
| liz|  3|      hi| haha|
+----+---+--------+-----+

Notice that we don’t need the verbose lambda syntax when DataFrame transformations are defined with inner functions.

This is much better! 🎊

Function composition with cytoolz

We can define custom DataFrame transformations with the @curry decorator and run them with function composition provided by cytoolz.

from cytoolz import curry
from cytoolz.functoolz import compose

@curry
def with_stuff1(arg1, arg2, df):
    return df.withColumn("stuff1", lit(f"{arg1} {arg2}"))

@curry
def with_stuff2(arg, df):
    return df.withColumn("stuff2", lit(arg))
data = [("jose", 1), ("li", 2), ("liz", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])

pipeline = compose(
    with_stuff1("nice", "person"),
    with_stuff2("yoyo")
)
actual_df = pipeline(source_df)
print(actual_df.show())

+----+---+------+-----------+
|name|age|stuff2|     stuff1|
+----+---+------+-----------+
|jose|  1|  yoyo|nice person|
|  li|  2|  yoyo|nice person|
| liz|  3|  yoyo|nice person|
+----+---+------+-----------+

The compose function applies transformations from right to left (bottom to top). We can modify the function to apply the transformations from left to right (top to bottom):

pipeline = compose(*reversed([
    with_stuff1("nice", "person"),
    with_stuff2("yoyo")
]))
actual_df = pipeline(source_df)
print(actual_df.show())

+----+---+-----------+------+
|name|age|     stuff1|stuff2|
+----+---+-----------+------+
|jose|  1|nice person|  yoyo|
|  li|  2|nice person|  yoyo|
| liz|  3|nice person|  yoyo|
+----+---+-----------+------+

Custom transformations are often order dependent and running them from left to right may be required.

Chaining custom transformations with the Scala API

The Scala API defines a Dataset#transform method that makes it easy to chain custom transformations. The Scala programming lanaguage allows for multiple parameter lists, so you don’t need to define nested functions.

Chaining custom DataFrame transformations is easier with the Scala API, but still necessary when writing PySpark code!

This blog post explains how to chain DataFrame transformations with the Scala API.

Next steps

You should organize your code as single purpose DataFrame transformations that tested individually. Read this post on designing easily testable Spark code.

Use the transform method to chain your DataFrame transformations and run production analyses. Any DataFrame transformations that make assumptions about the underlying schema of a DataFrame should be validated with the quinn DataFrame validation helper methods.

If you’re writing PySpark code properly, you should be using the transform method quite frequently 😉

Leave a Reply

Your email address will not be published. Required fields are marked *