Working with Spark MapType Columns

Spark DataFrame columns support maps, which are great for key / value pairs with an arbitrary length.

This blog post describes how to create MapType columns, demonstrates built-in functions to manipulate MapType columns, and explain when to use maps in your analyses.

Make sure to read Writing Beautiful Spark Code for a detailed overview of how to use MapType columns in production applications.

Scala maps

Let’s begin with a little refresher on Scala maps.

Create a Scala map that connects some English and Spanish words.

val wordMapping = Map("one" -> "uno", "dog" -> "perro")

Fetch the value associated with the dog key:

wordMapping("dog") // "perro"

Creating MapType columns

Let’s create a DataFrame with a MapType column.

val singersDF = spark.createDF(
  List(
    ("sublime", Map(
      "good_song" -> "santeria",
      "bad_song" -> "doesn't exist")
    ),
    ("prince_royce", Map(
      "good_song" -> "darte un beso",
      "bad_song" -> "back it up")
    )
  ), List(
    ("name", StringType, true),
    ("songs", MapType(StringType, StringType, true), true)
  )
)
singersDF.show(false)

+------------+----------------------------------------------------+
|name        |songs                                               |
+------------+----------------------------------------------------+
|sublime     |[good_song -> santeria, bad_song -> doesn't exist]  |
|prince_royce|[good_song -> darte un beso, bad_song -> back it up]|
+------------+----------------------------------------------------+

Let’s examine the DataFrame schema and verify that the songs column has a MapType:

singersDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- songs: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

We can see that songs is a MapType column.

Let’s explore some built-in Spark methods that make it easy to work with MapType columns.

Fetching values from maps with element_at()

Let’s use the singersDF DataFrame and append song_to_love as a column.

singersDF
  .withColumn("song_to_love", element_at(col("songs"), "good_song"))
  .show(false)
+------------+----------------------------------------------------+-------------+
|name        |songs                                               |song_to_love |
+------------+----------------------------------------------------+-------------+
|sublime     |[good_song -> santeria, bad_song -> doesn't exist]  |santeria     |
|prince_royce|[good_song -> darte un beso, bad_song -> back it up]|darte un beso|
+------------+----------------------------------------------------+-------------+

The element_at() function fetches a value from a MapType column.

Appending MapType columns

We can use the map() method defined in org.apache.spark.sql.functions to append a MapType column to a DataFrame.

val countriesDF = spark.createDF(
  List(
    ("costa_rica", "sloth"),
    ("nepal", "red_panda")
  ), List(
    ("country_name", StringType, true),
    ("cute_animal", StringType, true)
  )
).withColumn(
  "some_map",
  map(col("country_name"), col("cute_animal"))
)
countriesDF.show(false)

+------------+-----------+---------------------+
|country_name|cute_animal|some_map             |
+------------+-----------+---------------------+
|costa_rica  |sloth      |[costa_rica -> sloth]|
|nepal       |red_panda  |[nepal -> red_panda] |
+------------+-----------+---------------------+

Let’s verify that some_map is a MapType column:

countriesDF.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- cute_animal: string (nullable = true)
 |-- some_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Creating MapType columns from two ArrayType columns

We can create a MapType column from two ArrayType columns.

val df = spark.createDF(
  List(
    (Array("a", "b"), Array(1, 2)),
    (Array("x", "y"), Array(33, 44))
  ), List(
    ("letters", ArrayType(StringType, true), true),
    ("numbers", ArrayType(IntegerType, true), true)
  )
).withColumn(
  "strange_map",
  map_from_arrays(col("letters"), col("numbers"))
)
df.show(false)

+-------+--------+------------------+
|letters|numbers |strange_map       |
+-------+--------+------------------+
|[a, b] |[1, 2]  |[a -> 1, b -> 2]  |
|[x, y] |[33, 44]|[x -> 33, y -> 44]|
+-------+--------+------------------+

Let’s take a look at the df schema and verify strange_map is a MapType column:

df.printSchema()

 |-- letters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- strange_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)

The Spark way of converting to arrays to a map is different that the “regular Scala” way of converting two arrays to a map.

Converting Arrays to Maps with Scala

Here’s how you’d convert two collections to a map with Scala.

val list1 = List("a", "b")
val list2 = List(1, 2)

list1.zip(list2).toMap // Map(a -> 1, b -> 2)

We could wrap this code in a User Defined Function and define our own map_from_arrays function if we wanted.

In general, it’s best to rely on the standard Spark library instead of defining our own UDFs.

The key takeaway is that the Spark way of solving a problem is often different from the Scala way. Read the API docs and always try to solve your problems the Spark way.

Merging maps with map_concat()

map_concat() can be used to combine multiple MapType columns to a single MapType column.

val df = spark.createDF(
  List(
    (Map("a" -> "aaa", "b" -> "bbb"), Map("c" -> "ccc", "d" -> "ddd"))
  ), List(
    ("some_data", MapType(StringType, StringType, true), true),
    ("more_data", MapType(StringType, StringType, true), true)
  )
)

df
  .withColumn("all_data", map_concat(col("some_data"), col("more_data")))
  .show(false)
+--------------------+--------------------+----------------------------------------+
|some_data           |more_data           |all_data                                |
+--------------------+--------------------+----------------------------------------+
|[a -> aaa, b -> bbb]|[c -> ccc, d -> ddd]|[a -> aaa, b -> bbb, c -> ccc, d -> ddd]|
+--------------------+--------------------+----------------------------------------+

Using StructType columns instead of MapType columns

Let’s create a DataFrame that stores information about athletes.

val athletesDF = spark.createDF(
  List(
    ("lebron",
      Map(
        "height" -> "6.67",
        "units" -> "feet"
      )
    ),
    ("messi",
      Map(
        "height" -> "1.7",
        "units" -> "meters"
      )
    )
  ), List(
    ("name", StringType, true),
    ("stature", MapType(StringType, StringType, true), true)
  )
)

athletesDF.show(false)
+------+--------------------------------+
|name  |stature                         |
+------+--------------------------------+
|lebron|[height -> 6.67, units -> feet] |
|messi |[height -> 1.7, units -> meters]|
+------+--------------------------------+
athletesDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- stature: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

stature is a MapType column, but we can also store stature as a StructType column.

val data = Seq(
  Row("lebron", Row("6.67", "feet")),
  Row("messi", Row("1.7", "meters"))
)

val schema = StructType(
  List(
    StructField("player_name", StringType, true),
    StructField(
      "stature",
      StructType(
        List(
          StructField("height", StringType, true),
          StructField("unit", StringType, true)
        )
      ),
      true
    )
  )
)

val athletesDF = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  schema
)
athletesDF.show(false)

+-----------+-------------+
|player_name|stature      |
+-----------+-------------+
|lebron     |[6.67, feet] |
|messi      |[1.7, meters]|
+-----------+-------------+
athletesDF.printSchema()

root
 |-- player_name: string (nullable = true)
 |-- stature: struct (nullable = true)
 |    |-- height: string (nullable = true)
 |    |-- unit: string (nullable = true)

Sometimes both StructType and MapType columns can solve the same problem and you can choose between the two.

Writing MapType columns to disk

The CSV file format cannot handle MapType columns.

This code will error out.

val outputPath = new java.io.File("./tmp/csv_with_map/").getCanonicalPath

spark.createDF(
  List(
    (Map("a" -> "aaa", "b" -> "bbb"))
  ), List(
    ("some_data", MapType(StringType, StringType, true), true)
  )
).write.csv(outputPath)

Here’s the error message:

writing to disk
- cannot write maps to disk with the CSV format *** FAILED ***
  org.apache.spark.sql.AnalysisException: CSV data source does not support map<string,string> data type.;
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
  at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:34)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)

MapType columns can be written out with the Parquet file format. This code runs just fine:

val outputPath = new java.io.File("./tmp/csv_with_map/").getCanonicalPath

spark.createDF(
  List(
    (Map("a" -> "aaa", "b" -> "bbb"))
  ), List(
    ("some_data", MapType(StringType, StringType, true), true)
  )
).write.parquet(outputPath)

Conclusion

MapType columns are a great way to store key / value pairs of arbitrary lengths in a DataFrame column.

Spark 2.4 added a lot of native functions that make it easier to work with MapType columns. Prior to Spark 2.4, developers were overly reliant on UDFs for manipulating MapType columns.

StructType columns can often be used instead of a MapType column. Study both of these column types closely so you can understand the tradeoffs and intelligently select the best column type for your analysis.

Registration

Leave a Reply

Your email address will not be published. Required fields are marked *