Creating and reusing the SparkSession with PySpark

This post explains how to create a SparkSession with getOrCreate and how to reuse the SparkSession with getActiveSession.

You need a SparkSession to read data stored in files, when manually creating DataFrames, and to run arbitrary SQL queries.

The SparkSession should be instantiated once and then reused throughout your application. Most applications should not create multiple sessions or shut down an existing session.

When you’re running Spark workflows locally, you’re responsible for instantiating the SparkSession yourself. Spark runtime providers build the SparkSession for you and you should reuse it. You need to write code that properly manages the SparkSession for both local and production workflows.

This post shows you how to build a resilient codebase that properly manages the SparkSession in the development, test, and production environments.

getOrCreate

Here’s an example of how to create a SparkSession with the builder:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .master("local")
  .appName("chispa")
  .getOrCreate())

getOrCreate will either create the SparkSession if one does not already exist or reuse an existing SparkSession.

Let’s look at a code snippet from the chispa test suite that uses this SparkSession.

import pytest

from spark import *
from chispa import *

def test_assert_df_equality():
    data1 = [(1, "jose"), (2, "li")]
    df1 = spark.createDataFrame(data1, ["num", "name"])
    data2 = [(2, "li"), (1, "jose")]
    df2 = spark.createDataFrame(data2, ["num", "name"])
    assert_df_equality(df1, df2, ignore_row_order=True)

from spark import * gives us access to the spark variable that contains the SparkSession used to create the DataFrames in this test.

Reusing the same SparkSession throughout your test suite is important for your test suite performance. Shutting down and recreating SparkSessions is expensive and causes test suites to run painfully slowly.

getActiveSession

Some functions can assume a SparkSession exists and should error out if the SparkSession does not exist.

You should only be using getOrCreate in functions that should actually be creating a SparkSession. getActiveSession is more appropriate for functions that should only reuse an existing SparkSession.

The show_output_to_df function in quinn is a good example of a function that uses getActiveSession. This function converts the string that’s outputted from DataFrame#show back into a DataFrame object. It’s useful when you only have the show output in a Stackoverflow question and want to quickly recreate a DataFrame.

Let’s take a look at the function in action:

import quinn

s = """+----+---+-----------+------+
|name|age|     stuff1|stuff2|
+----+---+-----------+------+
|jose|  1|nice person|  yoyo|
|  li|  2|nice person|  yoyo|
| liz|  3|nice person|  yoyo|
+----+---+-----------+------+"""

df = quinn.show_output_to_df(s)

show_output_to_df uses a SparkSession under the hood to create the DataFrame, but does not force the user to pass the SparkSession as a function argument because that’d be tedious.

Let’s look at the function implementation:

def show_output_to_df(show_output):
    l = show_output.split("\n")
    ugly_column_names = l[1]
    pretty_column_names = [i.strip() for i in ugly_column_names[1:-1].split("|")]
    pretty_data = []
    ugly_data = l[3:-1]
    for row in ugly_data:
        r = [i.strip() for i in row[1:-1].split("|")]
        pretty_data.append(tuple(r))
    return SparkSession.getActiveSession().createDataFrame(pretty_data, pretty_column_names)

show_output_to_df takes a String as an argument and returns a DataFrame. It’s a great example of a helper function that hides complexity and makes Spark easier to manage.

SparkSession from DataFrame

You can also grab the SparkSession that’s associated with a DataFrame.

data1 = [(1, "jose"), (2, "li")]
df1 = spark.createDataFrame(data1, ["num", "name"])
df1.sql_ctx.sparkSession

The SparkSession that’s associated with df1 is the same as the active SparkSession and can also be accessed as follows:

from pyspark.sql import SparkSession

SparkSession.getActiveSession()

If you have a DataFrame, you can use it to access the SparkSession, but it’s best to just grab the SparkSession with getActiveSession().

Let’s shut down the active SparkSession to demonstrate the getActiveSession() returns None when no session exists.

spark.stop()

SparkSession.getActiveSession() # None

Here’s the error you’ll get if you try to create a DataFrame now that the SparkSession was stopped.

spark.createDataFrame(data1, ["num", "name"])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/powers/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/session.py", line 675, in createDataFrame
    return self._create_dataframe(data, schema, samplingRatio, verifySchema)
  File "/Users/powers/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/session.py", line 700, in _create_dataframe
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/Users/powers/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/session.py", line 526, in _createFromLocal
    return self._sc.parallelize(data), schema
  File "/Users/powers/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/context.py", line 530, in parallelize
    numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism
  File "/Users/powers/spark/spark-3.1.2-bin-hadoop3.2/python/pyspark/context.py", line 442, in defaultParallelism
    return self._jsc.sc().defaultParallelism()
AttributeError: 'NoneType' object has no attribute 'sc'

Next steps

You’ve learned how to effectively manage the SparkSession in your PySpark applications.

You can create a SparkSession that’s reused throughout your test suite and leverage SparkSessions created by third party Spark runtimes.

Registration

Comments are closed, but trackbacks and pingbacks are open.