Introduction to Spark Broadcast Joins

Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame.

Broadcast joins cannot be used when joining two large DataFrames.

This post explains how to do a simple broadcast join and how the broadcast() function helps Spark optimize the execution plan.

Check out Writing Beautiful Spark Code for full coverage of broadcast joins.

Conceptual overview

Spark splits up data on different nodes in a cluster so multiple computers can process data in parallel. Traditional joins are hard with Spark because the data is split.

Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the small DataFrame is broadcasted, Spark can perform a join without shuffling any of the data in the large DataFrame.

Simple example

Let’s create a DataFrame with information about people and another DataFrame with information about cities. In this example, both DataFrames will be small, but let’s pretend that the peopleDF is huge and the citiesDF is tiny.

val peopleDF = Seq(
  ("andrea", "medellin"),
  ("rodolfo", "medellin"),
  ("abdul", "bangalore")
).toDF("first_name", "city")

peopleDF.show()
+----------+---------+
|first_name|     city|
+----------+---------+
|    andrea| medellin|
|   rodolfo| medellin|
|     abdul|bangalore|
+----------+---------+
val citiesDF = Seq(
  ("medellin", "colombia", 2.5),
  ("bangalore", "india", 12.3)
).toDF("city", "country", "population")

citiesDF.show()
+---------+--------+----------+
|     city| country|population|
+---------+--------+----------+
| medellin|colombia|       2.5|
|bangalore|   india|      12.3|
+---------+--------+----------+

Let’s broadcast the citiesDF and join it with the peopleDF.

peopleDF.join(
  broadcast(citiesDF),
  peopleDF("city") <=> citiesDF("city")
).show()
+----------+---------+---------+--------+----------+
|first_name|     city|     city| country|population|
+----------+---------+---------+--------+----------+
|    andrea| medellin| medellin|colombia|       2.5|
|   rodolfo| medellin| medellin|colombia|       2.5|
|     abdul|bangalore|bangalore|   india|      12.3|
+----------+---------+---------+--------+----------+

The Spark null safe equality operator (<=>) is used to perform this join.

Analyzing physical plans of joins

Let’s use the explain() method to analyze the physical plan of the broadcast join.

peopleDF.join(
  broadcast(citiesDF),
  peopleDF("city") <=> citiesDF("city")
).explain()
== Physical Plan ==
BroadcastHashJoin [coalesce(city#6, )], [coalesce(city#21, )], Inner, BuildRight, (city#6 <=> city#21)
:- LocalTableScan [first_name#5, city#6]
+- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, string, true], )))
   +- LocalTableScan [city#21, country#22, population#23]

In this example, Spark is smart enough to return the same physical plan, even when the broadcast() method isn’t used.

peopleDF.join(
  citiesDF,
  peopleDF("city") <=> citiesDF("city")
).explain()
== Physical Plan ==
BroadcastHashJoin [coalesce(city#6, )], [coalesce(city#21, )], Inner, BuildRight, (city#6 <=> city#21)
:- LocalTableScan [first_name#5, city#6]
+- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, string, true], )))
   +- LocalTableScan [city#21, country#22, population#23]

Spark isn’t always smart about optimally broadcasting DataFrames when the code is complex, so it’s best to use the broadcast() method explicitly and inspect the physical plan.

Eliminating the duplicate city column

We can pass a sequence of columns with the shortcut join syntax to automatically delete the duplicate column.

peopleDF.join(
  broadcast(citiesDF),
  Seq("city")
).show()
+---------+----------+--------+----------+
|     city|first_name| country|population|
+---------+----------+--------+----------+
| medellin|    andrea|colombia|       2.5|
| medellin|   rodolfo|colombia|       2.5|
|bangalore|     abdul|   india|      12.3|
+---------+----------+--------+----------+

Let’s look at the physical plan that’s generated by this code.

peopleDF.join(
  broadcast(citiesDF),
  Seq("city")
).explain()
== Physical Plan ==
Project [city#6, first_name#5, country#22, population#23]
+- BroadcastHashJoin [city#6], [city#21], Inner, BuildRight
   :- Project [_1#2 AS first_name#5, _2#3 AS city#6]
   :  +- Filter isnotnull(_2#3)
   :     +- LocalTableScan [_1#2, _2#3]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- Project [_1#17 AS city#21, _2#18 AS country#22, _3#19 AS population#23]
         +- Filter isnotnull(_1#17)
            +- LocalTableScan [_1#17, _2#18, _3#19]

Code that returns the same result without relying on the sequence join generates an entirely different physical plan.

peopleDF.join(
  broadcast(citiesDF),
  peopleDF("city") <=> citiesDF("city")
)
  .drop(citiesDF("city"))
  .explain()
== Physical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- BroadcastHashJoin [coalesce(city#6, )], [coalesce(city#21, )], Inner, BuildRight, (city#6 <=> city#21)
   :- LocalTableScan [first_name#5, city#6]
   +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, string, true], )))
      +- LocalTableScan [city#21, country#22, population#23]

It’s best to avoid the shortcut join syntax so your physical plans stay as simple as possible.

Diving deeper into explain()

You can pass the explain() method a true argument to see the parsed logical plan, analyzed logical plan, and optimized logical plan in addition to the physical plan.

peopleDF.join(
  broadcast(citiesDF),
  peopleDF("city") <=> citiesDF("city")
)
  .drop(citiesDF("city"))
  .explain(true)
== Parsed Logical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- Project [_1#2 AS first_name#5, _2#3 AS city#6]
   :  +- LocalRelation [_1#2, _2#3]
   +- ResolvedHint isBroadcastable=true
      +- Project [_1#17 AS city#21, _2#18 AS country#22, _3#19 AS population#23]
         +- LocalRelation [_1#17, _2#18, _3#19]

== Analyzed Logical Plan ==
first_name: string, city: string, country: string, population: double
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- Project [_1#2 AS first_name#5, _2#3 AS city#6]
   :  +- LocalRelation [_1#2, _2#3]
   +- ResolvedHint isBroadcastable=true
      +- Project [_1#17 AS city#21, _2#18 AS country#22, _3#19 AS population#23]
         +- LocalRelation [_1#17, _2#18, _3#19]

== Optimized Logical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- LocalRelation [first_name#5, city#6]
   +- ResolvedHint isBroadcastable=true
      +- LocalRelation [city#21, country#22, population#23]

== Physical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- BroadcastHashJoin [coalesce(city#6, )], [coalesce(city#21, )], Inner, BuildRight, (city#6 <=> city#21)
   :- LocalTableScan [first_name#5, city#6]
   +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, string, true], )))
      +- LocalTableScan [city#21, country#22, population#23]

Notice how the parsed, analyzed, and optimized logical plans all contain ResolvedHint isBroadcastable=true because the broadcast() function was used. This hint isn’t included when the broadcast() function isn’t used.

peopleDF.join(
  citiesDF,
  peopleDF("city") <=> citiesDF("city")
)
  .drop(citiesDF("city"))
  .explain(true)
== Parsed Logical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- Project [_1#2 AS first_name#5, _2#3 AS city#6]
   :  +- LocalRelation [_1#2, _2#3]
   +- Project [_1#17 AS city#21, _2#18 AS country#22, _3#19 AS population#23]
      +- LocalRelation [_1#17, _2#18, _3#19]

== Analyzed Logical Plan ==
first_name: string, city: string, country: string, population: double
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- Project [_1#2 AS first_name#5, _2#3 AS city#6]
   :  +- LocalRelation [_1#2, _2#3]
   +- Project [_1#17 AS city#21, _2#18 AS country#22, _3#19 AS population#23]
      +- LocalRelation [_1#17, _2#18, _3#19]

== Optimized Logical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- Join Inner, (city#6 <=> city#21)
   :- LocalRelation [first_name#5, city#6]
   +- LocalRelation [city#21, country#22, population#23]

== Physical Plan ==
Project [first_name#5, city#6, country#22, population#23]
+- BroadcastHashJoin [coalesce(city#6, )], [coalesce(city#21, )], Inner, BuildRight, (city#6 <=> city#21)
   :- LocalTableScan [first_name#5, city#6]
   +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, string, true], )))
      +- LocalTableScan [city#21, country#22, population#23]

Next steps

Broadcast joins are a great way to append data stored in relatively small single source of truth data files to large DataFrames. DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate.

Broadcast joins are a powerful technique to have in your Apache Spark toolkit.

Make sure to read up on broadcasting maps, another design pattern that’s great for solving problems in distributed systems.

Registration

Leave a Reply

Your email address will not be published. Required fields are marked *