Working with dates and times in Spark

Spark supports DateType and TimestampType columns and defines a rich API of functions to make working with dates and times easy. This blog post will demonstrates how to make DataFrames with DateType / TimestampType columns and how to leverage Spark’s functions for working with these columns.

Creating DateType columns

Import the java.sql.Date library to create a DataFrame with a DateType column.

import java.sql.Date
import org.apache.spark.sql.types.{DateType, IntegerType}

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("2016-09-30")),
    (2, Date.valueOf("2016-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)
sourceDF.show()

+---------+----------+
|person_id|birth_date|
+---------+----------+
|        1|2016-09-30|
|        2|2016-12-14|
+---------+----------+

sourceDF.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- birth_date: date (nullable = true)

The cast() method can create a DateType column by converting a StringType column into a date.

val sourceDF = spark.createDF(
  List(
    (1, "2013-01-30"),
    (2, "2012-01-01")
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", StringType, true)
  )
).withColumn(
  "birth_date",
  col("birth_date").cast("date")
)
sourceDF.show()

+---------+----------+
|person_id|birth_date|
+---------+----------+
|        1|2013-01-30|
|        2|2012-01-01|
+---------+----------+

sourceDF.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- birth_date: date (nullable = true)

year(), month(), dayofmonth()

Let’s create a DataFrame with a DateType column and use built in Spark functions to extract the year, month, and day from the date.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("2016-09-30")),
    (2, Date.valueOf("2016-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "birth_year",
  year(col("birth_date"))
).withColumn(
  "birth_month",
  month(col("birth_date"))
).withColumn(
  "birth_day",
  dayofmonth(col("birth_date"))
).show()
+---------+----------+----------+-----------+---------+
|person_id|birth_date|birth_year|birth_month|birth_day|
+---------+----------+----------+-----------+---------+
|        1|2016-09-30|      2016|          9|       30|
|        2|2016-12-14|      2016|         12|       14|
+---------+----------+----------+-----------+---------+

minute(), second()

Let’s create a DataFrame with a TimestampType column and use built in Spark functions to extract the minute and second from the timestamp.

import java.sql.Timestamp

val sourceDF = spark.createDF(
  List(
    (1, Timestamp.valueOf("2017-12-02 03:04:00")),
    (2, Timestamp.valueOf("1999-01-01 01:45:20"))
  ), List(
    ("person_id", IntegerType, true),
    ("fun_time", TimestampType, true)
  )
)

sourceDF.withColumn(
  "fun_minute",
  minute(col("fun_time"))
).withColumn(
  "fun_second",
  second(col("fun_time"))
).show()
+---------+-------------------+----------+----------+
|person_id|           fun_time|fun_minute|fun_second|
+---------+-------------------+----------+----------+
|        1|2017-12-02 03:04:00|         4|         0|
|        2|1999-01-01 01:45:20|        45|        20|
+---------+-------------------+----------+----------+

datediff()

The datediff() and current_date() functions can be used to calculate the number of days between today and a date in a DateType column. Let’s use these functions to calculate someone’s age in days.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("1990-09-30")),
    (2, Date.valueOf("2001-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "age_in_days",
  datediff(current_timestamp(), col("birth_date"))
).show()
+---------+----------+-----------+
|person_id|birth_date|age_in_days|
+---------+----------+-----------+
|        1|1990-09-30|       9946|
|        2|2001-12-14|       5853|
+---------+----------+-----------+

date_add()

The date_add() function can be used to add days to a date. Let’s add 15 days to a date column.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("1990-09-30")),
    (2, Date.valueOf("2001-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "15_days_old",
  date_add(col("birth_date"), 15)
).show()
+---------+----------+-----------+
|person_id|birth_date|15_days_old|
+---------+----------+-----------+
|        1|1990-09-30| 1990-10-15|
|        2|2001-12-14| 2001-12-29|
+---------+----------+-----------+

Next steps

Look at the Spark SQL functions for the full list of methods available for working with dates and times in Spark.

The Spark date functions aren’t comprehensive and Java / Scala datetime libraries are notoriously difficult to work with. We should think about filling in the gaps in the native Spark datetime libraries by adding functions to spark-daria.

Leave a Reply

Your email address will not be published. Required fields are marked *