This blog post explains how to create a PySpark project with Poetry, the best Python dependency management system. It’ll also explain how to package PySpark projects as wheel files, so you can build libraries and easily access the code on Spark clusters.
Poetry is beloved by the co-creator of Django and other bloggers. It’s even part of the hypermodern Python stack.
This post will demonstrate how Poetry is great for PySpark projects.
Creating a project
Let’s create a project called angelou after the poet Maya Angelou. Check out this repo for all the code in this blog.
Create the project with poetry new angelou
.
This will create the angelou
project with these contents:
angelou angelou __init__.py pyproject.toml README.rst tests __init__.py test_angelou.py
Here are the contents of the pyproject.toml
file:
[tool.poetry] name = "angelou" version = "0.1.0" description = "" authors = ["MrPowers <matthewkevinpowers@gmail.com>"] [tool.poetry.dependencies] python = "^3.7" [tool.poetry.dev-dependencies] pytest = "^5.2" [build-system] requires = ["poetry>=0.12"] build-backend = "poetry.masonry.api"
The pyproject.toml
file specifies the Python version and the project dependencies.
Add PySpark to project
Add PySpark to the project with the poetry add pyspark
command.
Here’s the console output when the command is run:
Creating virtualenv angelou--6rG3Bgg-py3.7 in /Users/matthewpowers/Library/Caches/pypoetry/virtualenvs Using version ^2.4.5 for pyspark Updating dependencies Resolving dependencies... (2.1s) Writing lock file Package operations: 13 installs, 0 updates, 0 removals - Installing zipp (3.1.0) - Installing importlib-metadata (1.6.0) - Installing pyparsing (2.4.7) - Installing six (1.15.0) - Installing attrs (19.3.0) - Installing more-itertools (8.3.0) - Installing packaging (20.4) - Installing pluggy (0.13.1) - Installing py (1.8.1) - Installing py4j (0.10.7) - Installing wcwidth (0.1.9) - Installing pyspark (2.4.5) - Installing pytest (5.4.2)
Poetry automatically created a virtual environment all the project dependencies are stored in the /Users/matthewpowers/Library/Caches/pypoetry/virtualenvs/angelou--6rG3Bgg-py3.7/
directory.
You might wonder why 13 dependencies were added to the project – didn’t we just install PySpark? Why wasn’t one dependency added?
PySpark depends on other libraries like py4j
, as you can see with this search. Poetry needs to add everything PySpark depends on to the project as well.
pytest requires py
, importlib-metadata
, and pluggy
, so those dependencies need to be added to our project as well.
Poetry makes sure your virtual environment contains all your explicit project dependencies and all dependencies of your explicit dependencies.
Poetry lock file
The poetry add pyspark
command also creates a poetry.lock
file as hinted by the “Writing lock file” console output when poetry add pyspark
is run.
Here’s what the Poetry website says about the Lock file:
For your library, you may commit the
poetry.lock
file if you want to. This can help your team to always test against the same dependency versions. However, this lock file will not have any effect on other projects that depend on it. It only has an effect on the main project. If you do not want to commit the lock file and you are using git, add it to the.gitignore
.
PySpark DataFrame transformation
Let’s create a PySpark DataFrame transformation that’ll append a greeting
column to a DataFrame.
Create a transformations.py
file and add this code:
import pyspark.sql.functions as F def with_greeting(df): return df.withColumn("greeting", F.lit("hello!"))
Let’s verify that the with_greeting
function appends a greeting
column as expected.
Testing the DataFrame transformation
Here’s how we’ll test the with_greeting
function:
- Create a DataFrame and run the
with_greeting
function (actual_df
) - Create another DataFrame with the anticipated results (
expected_df
) - Compare the DataFrames and make sure the actual result is the same as what’s expected
We need to create a SparkSession to create the DataFrames that’ll be used in the test.
Create a sparksession.py
file with these contents:
from pyspark.sql import SparkSession spark = (SparkSession.builder .master("local") .appName("angelou") .getOrCreate())
Create a test_transformations.py
file in the tests/
directory and add this code:
import pytest import angelou.sparksession as S import angelou.transformations as T class TestTransformations(object): def test_with_greeting(self): source_data = [ ("jose", 1), ("li", 2) ] source_df = S.spark.createDataFrame( source_data, ["name", "age"] ) actual_df = T.with_greeting(source_df) expected_data = [ ("jose", 1, "hello!"), ("li", 2, "hello!") ] expected_df = S.spark.createDataFrame( expected_data, ["name", "age", "greeting"] ) assert(expected_df.collect() == actual_df.collect())
Console workflow
Run poetry shell
to spawn a shell within the virtual environment. Then run python
to start a REPL.
We can copy and paste code from the test file to run the with_greeting
function in the shell.
>>> import angelou.sparksession as S >>> import angelou.transformations as T >>> source_data = [ ... ("jose", 1), ... ("li", 2) ... ] >>> source_df = S.spark.createDataFrame( ... source_data, ... ["name", "age"] ... ) >>> source_df.show() +----+---+ |name|age| +----+---+ |jose| 1| | li| 2| +----+---+ >>> actual_df = T.with_greeting(source_df) >>> actual_df.show() +----+---+--------+ |name|age|greeting| +----+---+--------+ |jose| 1| hello!| | li| 2| hello!| +----+---+--------+
The console REPL workflow can be useful when you’re experimenting with code.
It’s generally best to keep your console workflow to a minimum and devote your development efforts to building a great test suite.
Adding quinn dependency
quinn contains useful PySpark helper functions.
Add quinn to the project with poetry add quinn
.
The pyproject.toml
file will be updated as follows when quinn
is added:
[tool.poetry.dependencies] python = "^3.7" pyspark = "^2.4.5" quinn = "^0.4.0"
quinn is also added to the lock file in two places:
[[package]] category = "main" description = "Pyspark helper methods to maximize developer efficiency." name = "quinn" optional = false python-versions = ">=2.7" version = "0.4.0" quinn = [ {file = "quinn-0.4.0-py3-none-any.whl", hash = "sha256:4f2f1dd0086a4195ee1ec4420351001ee5687e8183f80bcbc93d4e724510d114"} ]
Using quinn
Create a with_clean_first_name
function that’ll remove all the non-word characters in the first_name
column of a DataFrame. The quinn remove_non_word_characters
function will help.
import quinn def with_clean_first_name(df): return df.withColumn( "clean_first_name", quinn.remove_non_word_characters(F.col("first_name")) )
Let’s create a test to verify that with_clean_first_name
removes all the non-word characters in the first_name
field of this DataFrame:
+----------+------+ |first_name|letter| +----------+------+ | jo&&se| a| | ##li| b| | !!sam**| c| +----------+------+
This test will use the create_df
SparkSession extension defined in quinn that makes it easier to create DataFrames.
from pyspark.sql.types import * from quinn.extensions import * import angelou.sparksession as S import angelou.transformations as T class TestTransformations(object): def test_with_clean_first_name(self): source_df = S.spark.create_df( [("jo&&se", "a"), ("##li", "b"), ("!!sam**", "c")], [("first_name", StringType(), True), ("letter", StringType(), True)] ) actual_df = T.with_clean_first_name(source_df) expected_df = S.spark.create_df( [("jo&&se", "a", "jose"), ("##li", "b", "li"), ("!!sam**", "c", "sam")], [("first_name", StringType(), True), ("letter", StringType(), True), ("clean_first_name", StringType(), True)] ) assert(expected_df.collect() == actual_df.collect())
Packaging wheel file
You can build a wheel file with the poetry build
command.
This will output the following files:
dist/ angelou-0.1.0-py3-none-any.whl angelou-0.1.0.tar.gz
You can run poetry publish
to publish the package to PyPi.
Conclusion
Dependency management has been a pain point for Python developers for years and the debate on how to solve the issue goes on.
Thankfully, Python tooling has come a long way and Poetry makes it easy to manage project dependencies.
Poetry is great option for PySpark projects. It makes it easy to build public libraries that are uploaded to PyPi or to build private wheel files so you can run your private projects on Spark clusters.
Permalink
Hi,
Your posts are very good, my pyspark codes are much better thanks to you.
I read some articles about applying Solid principles in spark programming, but has fews examples on the internet. Could you help me about this topic?
I use to write my spark jobs with many small functions but I don’t use classes and abstractions, should I use?
Thank you
Permalink
I generally try to avoid classes in Spark. Small functions are typically good enough. The main pain point I’ve struggled with when there are a lot of small functions is order dependencies. It’s hard to use a codebase if you need to run 20 functions in a specific order to get the desired result. Classes might help with order dependencies but I think a DAG data structure would be even better. I need to make an open source project with a good solution here.
Permalink
Have you attempted to publish this to azure devops rather than pypi ? And after you’ve published it are you able to just from Angelou import * ?
Permalink
Hi there. Great article. I’m curious about the support for wheel files in Spark; reading some of the JIRAs, it seems this is a still a work-in-progress. Are they now fully supported? thanks, Brian
Permalink
Thanks for sharing.
May I ask how this packaging makes it easy for pyspark?
I failed to make the connection.
With this set up, what is the correct spark-submit command we should use?