Deduplicating and Collapsing Records in Spark DataFrames
This blog post explains how to filter duplicate records from Spark DataFrames with the dropDuplicates()
and killDuplicates()
methods. It also demonstrates how to collapse duplicate records into a single row with the collect_list()
and collect_set()
functions.
Make sure to read Writing Beautiful Spark Code for a detailed overview of how to deduplicate production datasets and for background information on the ArrayType columns that are returned when DataFrames are collapsed.
Deduplicating DataFrames
Let's create a DataFrame with letter1
, letter2
, and number1
columns.
val df = Seq(
("a", "b", 1),
("a", "b", 2),
("a", "b", 3),
("z", "b", 4),
("a", "x", 5)
).toDF("letter1", "letter2", "number1")
df.show()
+-------+-------+-------+
|letter1|letter2|number1|
+-------+-------+-------+
| a| b| 1|
| a| b| 2|
| a| b| 3|
| z| b| 4|
| a| x| 5|
+-------+-------+-------+
Some rows in the df
DataFrame have the same letter1
and letter2
values. Let's use the Dataset#dropDuplicates()
method to remove duplicates from the DataFrame.
df.dropDuplicates("letter1", "letter2").show()
+-------+-------+-------+
|letter1|letter2|number1|
+-------+-------+-------+
| a| x| 5|
| z| b| 4|
| a| b| 1|
+-------+-------+-------+
The dropDuplicates
method chooses one record from the duplicates and drops the rest. This is useful for simple use cases, but collapsing records is better for analyses that can't afford to lose any valuable data.
Killing duplicates
We can use the spark-daria killDuplicates()
method to completely remove all duplicates from a DataFrame.
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.killDuplicates("letter1", "letter2").show()
+-------+-------+-------+
|letter1|letter2|number1|
+-------+-------+-------+
| a| x| 5|
| z| b| 4|
+-------+-------+-------+
Killing duplicates is similar to dropping duplicates, just a little more aggressive.
Collapsing records
Let's use the collect_list()
method to eliminate all the rows with duplicate letter1
and letter2
rows in the DataFrame and collect all the number1
entries as a list.
df
.groupBy("letter1", "letter2")
.agg(collect_list("number1") as "number1s")
.show()
+-------+-------+---------+
|letter1|letter2| number1s|
+-------+-------+---------+
| a| x| [5]|
| z| b| [4]|
| a| b|[1, 2, 3]|
+-------+-------+---------+
Let's create a more realitic example of credit card transactions and use collect_set()
to aggregate unique records and eliminate pure duplicates.
val ccTransactionsDF = Seq(
("123", "20180102", 10.49),
("123", "20180102", 10.49),
("123", "20180102", 77.33),
("555", "20180214", 99.99),
("888", "20180214", 1.23)
).toDF("person_id", "transaction_date", "amount")
ccTransactionsDF.show()
+---------+----------------+------+
|person_id|transaction_date|amount|
+---------+----------------+------+
| 123| 20180102| 10.49|
| 123| 20180102| 10.49|
| 123| 20180102| 77.33|
| 555| 20180214| 99.99|
| 888| 20180214| 1.23|
+---------+----------------+------+
Let's eliminate the duplicates with collect_set()
.
ccTransactionsDF
.groupBy("person_id", "transaction_date")
.agg(collect_set("amount") as "amounts")
.show()
+---------+----------------+--------------+
|person_id|transaction_date| amounts|
+---------+----------------+--------------+
| 555| 20180214| [99.99]|
| 888| 20180214| [1.23]|
| 123| 20180102|[10.49, 77.33]|
+---------+----------------+--------------+
collect_set()
let's us retain all the valuable information and delete the duplicates. The best of both worlds!
Collapsing records to datamarts
Let's examine a DataFrame of with data on hockey players and how many goals they've scored in each game.
val playersDF = Seq(
("123", 11, "20180102", 0),
("123", 11, "20180102", 0),
("123", 13, "20180105", 3),
("555", 11, "20180214", 1),
("888", 22, "20180214", 2)
).toDF("player_id", "game_id", "game_date", "goals_scored")
playersDF.show()
+---------+-------+---------+------------+
|player_id|game_id|game_date|goals_scored|
+---------+-------+---------+------------+
| 123| 11| 20180102| 0|
| 123| 11| 20180102| 0|
| 123| 13| 20180105| 3|
| 555| 11| 20180214| 1|
| 888| 22| 20180214| 2|
+---------+-------+---------+------------+
Let's create a StructType
column that encapsulates all the columns in the DataFrame and then collapse all records on the player_id
column to create a player datamart.
playersDF
.withColumn("as_struct", struct("game_id", "game_date", "goals_scored"))
.groupBy("player_id")
.agg(collect_set("as_struct") as "as_structs")
.show(false)
+---------+----------------------------------+
|player_id|as_structs |
+---------+----------------------------------+
|888 |[[22,20180214,2]] |
|555 |[[11,20180214,1]] |
|123 |[[11,20180102,0], [13,20180105,3]]|
+---------+----------------------------------+
A player datamart like this can simplify a lot of queries. We don't need to write window functions if all the data is already aggregated in a single row.
Next steps
Deduplicating DataFrames is relatively straightforward. Collapsing records is more complicated, but worth the effort.
Data lakes are notoriously granular and programmers often write window functions to analyze historical results.
Collapsing records into datamarts is the best way to simplify your code logic.