Registering Native Spark Functions

This post explains how Spark registers native functions internally and the public facing APIs for you to register your own functions.

Registering native functions is important if you want to access functions via the SQL API. You don’t need to register functions if you’re using the PySpark or Scala DSLs.

This post is organized as follows:

  • registering functions to an existing SparkSession
  • registering functions via SparkSessionExt in Databricks
  • command line workflow
  • how Spark core registers functions

This is a low level post for advanced Spark users and talks about code written by Spark core maintainers. You’ll need to grok it hard. Studying this code is highly recommended if you really want to understand how Spark works under the hood.

Registering functions to existing SparkSession

The itachi project provides access to Postgres / Presto SQL syntax in Spark. Let’s look at how to use itachi and then dig into the implementation details.

Suppose you have the following DataFrame:

+------+------+
|  arr1|  arr2|
+------+------+
|[1, 2]|    []|
|[1, 2]|[1, 3]|
+------+------+

Concatenate the two arrays with the array_cat function that’s defined in itachi:

yaooqinn.itachi.registerPostgresFunctions

spark
  .sql("select array_cat(arr1, arr2) as both_arrays from some_data")
  .show()
+------------+
| both_arrays|
+------------+
|      [1, 2]|
|[1, 2, 1, 3]|
+------------+

array_cat is a Postgres function and itachi provides syntax that Postgres developers are familiar with. This makes for an easy transition to Spark.

itachi also defines an age function similar to Postgres.

Here’s the Age Catalyst code:

case class Age(end: Expression, start: Expression)
  extends BinaryExpression with ImplicitCastInputTypes {
  override def left: Expression = end

  override def right: Expression = start

  override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, TimestampType)

  override def nullSafeEval(e: Any, s: Any): Any = {
    DateTimeUtils.age(e.asInstanceOf[Long], s.asInstanceOf[Long])
  }

  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val dtu = DateTimeUtils.getClass.getCanonicalName.stripSuffix("$")
    defineCodeGen(ctx, ev, (ed, st) => s"$dtu.age($ed, $st)")
  }

  override def dataType: DataType = CalendarIntervalType
}

We need to take this Catalyst code and package it properly for registerFunction in Spark.

registerFunction takes three arguments:

  • name: FunctionIdentifier
  • info: ExpressionInfo
  • builder: FunctionBuilder

itachi defines a package object that helps organize the required arguments:

package object extra {

  type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)

  type Extensions = SparkSessionExtensions => Unit

}

Let’s define the FunctionDescription for Age:

object Age {
  val fd: FunctionDescription = (
    new FunctionIdentifier("age"),
    ExpressionUtils.getExpressionInfo(classOf[Age], "age"),
    (children: Seq[Expression]) => if ( children.size == 1) {
      Age(CurrentDate(), children.head)
    } else {
      Age(children.head, children.last)
    })
}

Now register the function with the current SparkSession:

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)

Library users don’t need to concern themselves with any low level function registration of course. They can just attach itachi to their cluster and run the following line to access the Postgres-style SQL functions:

yaooqinn.itachi.registerPostgresFunctions

Big thanks to Alex Ott for showing me this solution!

registering functions via SparkSessionExt in Databricks

There is a SparkSessionExt#injectFunction method that also allows you to register functions. This approach isn’t as user friendly because it can’t register functions with an existing SparkSession. It can only register functions to a new SparkSession. Let’s look at the workflow and the usability issues will be apparent.

Create an init script in DBFS:

dbutils.fs.mkdirs("dbfs:/databricks/scripts/")

dbutils.fs.put("/databricks/scripts/itachi-install.sh","""
#!/bin/bash
wget --quiet -O /mnt/driver-daemon/jars/itachi_2.12-0.1.0.jar https://repo1.maven.org/maven2/com/github/yaooqinn/itachi_2.12/0.1.0/itachi_2.12-0.1.0.jar""", true)

Before starting the cluster, set the Spark config:

spark.sql.extensions org.apache.spark.sql.extra.PostgreSQLExtensions

Also set the DBFS file path to the init script before starting the cluster:

dbfs:/databricks/scripts/itachi-install.sh

You can now attach a notebook to the cluster using Postgres SQL syntax.

This workflow is nice in some circumstances because all notebooks attached to the cluster can access the Postgres syntax without doing any imports. In general, it’s much easier to register functions to an existing SparkSession, like we did with the first approach.

All you need to write to support the spark.sql.extensions approach is a class that extends Extensions:

class PostgreSQLExtensions extends Extensions {
  override def apply(ext: SparkSessionExtensions): Unit = {
    ext.injectFunction(Age.fd)
  }
}

Age.fd returns the tuple that injectFunction needs.

command line workflow

It’s work noting that the SparkSessionExt approach isn’t as bulky on the command line (compared to Databricks);

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions

SparkSessionExt may be easier for runtimes other than Databricks that don’t require init scripts.

How Spark registers native functions

Here’s a snippet of the Spark FunctionRegistry code:

object FunctionRegistry {

  type FunctionBuilder = Seq[Expression] => Expression

  val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
    expression[Abs]("abs"),
    expression[Coalesce]("coalesce"),
    expression[Explode]("explode"),
    // all the other Spark SQL functions
  )

  private def expression[T <: Expression : ClassTag](name: String, setAlias: Boolean = false)
      : (String, (ExpressionInfo, FunctionBuilder)) = {
    val (expressionInfo, builder) = FunctionRegistryBase.build[T](name)
    val newBuilder = (expressions: Seq[Expression]) => {
      val expr = builder(expressions)
      if (setAlias) expr.setTagValue(FUNC_ALIAS, name)
      expr
    }
    (name, (expressionInfo, newBuilder))
  }

  // bunch of other stuff

}

The expression method is passed an expression class name and a function name and returns a String, ExpressionInfo, and FunctionBuilder.

The code eventually loops over the expressions map and registers each function.

expressions.foreach {
  case (name, (info, builder)) => fr.registerFunction(FunctionIdentifier(name), info, builder)
}

We’ve made a full loop back to registerFunction in the original approach 😉

Thanks

Big shout out to cloud-fan for helping with my basic questions. Thanks to yaooqinn for making itachi and Sim for also talking about registering native functions.

The Spark open source community is a pleasure to work with.

Conclusion

Spark doesn’t make it easy to register native functions, but this isn’t a common task, so the existing interfaces are fine.

Registering Spark native functions is only for advanced Spark programmers that are comfortable writing Catalyst expressions and want to expose functionality via the SQL API (rather than the Python / Scala DSLs).

The SparkSessionExt#injectFunction approach is problematic because it’s difficult to use and init scripts are fragile.

spark.sessionState.functionRegistry.registerFunction is a better approach because it gives end users a smoother interface.

itachi shows how this design pattern can provide powerful functionality to end users.

A company with Spark experts and SQL power users could also benefit from this design pattern. Spark experts can register native SQL functions with for custom business use cases and hypercharge the productivity of the SQL analysts.

Registration

Comments are closed, but trackbacks and pingbacks are open.