Running Logistic Regressions with Spark
Logistic regression models are a powerful way to predict binary outcomes (e.g. winning a game or surviving a shipwreck).
Multiple explanatory variables (aka "features") are used to train the model that predicts the outcome.
This episode shows how to train a Spark logistic regression model with the Titanic dataset and use the model to predict if a passenger survived or died.
We'll run our model on a test dataset and demonstrate that the model predicts the passenger survivorship accurately 83% of the time.
Titanic Dataset
The train.csv
file contains 891 rows of data in this schema:
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S
PassengerId
is a unique identifier for each passengerSurvived
is0
for passengers that died and1
for passengers that survivedPclass
is the ticket class from 1 - 3 (1 is the highest class, 2 is the middle class, and 3 is the lowest class)Name
is the passenger's full nameSex
is male or femaleAge
is the passenger's age in yearsSibSp
is the number of siblings / spouses on boardParch
is the number of parents / children on boardTicket
is the ticket numberFare
is the passenger fareCabin
is the cabin numberEmbarked
is the port where the passenger got on the ship
Selecting Features
We need to select the explanatory variables that will be able to predict if passengers survived or died. It's always good to have a "plain English" reason why your explanatory variables are capable of predicting the outcome.
Gender
because females were more likely to be put on lifeboatsAge
because children were more likely to be savedSibSp
because families we more likely to be saved togetherParch
because parent / children combinations were more likely to be savedFare
because the richer passengers were more likely to be saved
Explanatory variables are referred to as "features" in machine learning.
Prepping the Training Dataset
We will create a trainingDF()
method that returns a DataFrame with all of the features converted to floating point numbers, so they can be plugged into the machine learning model.
object TitanicData extends SparkSessionWrapper {
def trainingDF(
titanicDataDirName: String = "./src/test/resources/titanic/"
): DataFrame = {
spark
.read
.option("header", "true")
.csv(titanicDataDirName + "train.csv")
.withColumn(
"Gender",
when(
col("Sex").equalTo("male"), 0
)
.when(col("Sex").equalTo("female"), 1)
.otherwise(null)
)
.select(
col("Gender").cast("double"),
col("Survived").cast("double"),
col("Pclass").cast("double"),
col("Age").cast("double"),
col("SibSp").cast("double"),
col("Parch").cast("double"),
col("Fare").cast("double")
)
.filter(
col("Gender").isNotNull &&
col("Survived").isNotNull &&
col("Pclass").isNotNull &&
col("Age").isNotNull &&
col("SibSp").isNotNull &&
col("Parch").isNotNull &&
col("Fare").isNotNull
)
}
}
Train the Model
Let's write a function that will convert all of the features into a single vector.
def withVectorizedFeatures(
featureColNames: Array[String] = Array("Gender", "Age", "SibSp", "Parch", "Fare"),
outputColName: String = "features"
)(df: DataFrame): DataFrame = {
val assembler: VectorAssembler = new VectorAssembler()
.setInputCols(featureColNames)
.setOutputCol(outputColName)
assembler.transform(df)
}
Now let's write a function that will convert the Survived
column into a label.
def withLabel(
inputColName: String = "Survived",
outputColName: String = "label"
)(df: DataFrame) = {
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol(inputColName)
.setOutputCol(outputColName)
labelIndexer
.fit(df)
.transform(df)
}
We can train the model by vectorizing the features, adding a label, and fitting a logistic regression model with a DataFrame that has feature
and label
columns.
def model(df: DataFrame = TitanicData.trainingDF()): LogisticRegressionModel = {
val trainFeatures: DataFrame = df
.transform(withVectorizedFeatures())
.transform(withLabel())
.select("features", "label")
new LogisticRegression()
.fit(trainFeatures)
}
The model has coefficients and a y-intercept.
println(model().coefficients) // [2.5334201606150444,-0.021514292982670942,-0.40830426011779103,-0.23251735366607038,0.017246642519055992]
println(model().intercept) // -0.9761016366658759
Evaluating Model Accuracy
Let's create a method that does all the data munging and return a properly formatted test dataset so we can run our logistic regresssion model.
object TitanicData extends SparkSessionWrapper {
def testDF(
titanicDataDirName: String = "./src/test/resources/titanic/"
): DataFrame = {
val rawTestDF = spark
.read
.option("header", "true")
.csv(titanicDataDirName + "test.csv")
val genderSubmissionDF = spark
.read
.option("header", "true")
.csv(titanicDataDirName + "gender_submission.csv")
rawTestDF
.join(
genderSubmissionDF,
Seq("PassengerId")
)
.withColumn(
"Gender",
when(col("Sex").equalTo("male"), 0)
.when(col("Sex").equalTo("female"), 1)
.otherwise(null)
)
.select(
col("Gender").cast("double"),
col("Survived").cast("double"),
col("Pclass").cast("double"),
col("Age").cast("double"),
col("SibSp").cast("double"),
col("Parch").cast("double"),
col("Fare").cast("double")
)
.filter(
col("Gender").isNotNull &&
col("Pclass").isNotNull &&
col("Age").isNotNull &&
col("SibSp").isNotNull &&
col("Parch").isNotNull &&
col("Fare").isNotNull
)
}
}
Just like we did with the training dataset, let's add one column with the vectorized features and another column with the label.
val testDF: DataFrame = TitanicData
.testDF()
.transform(withVectorizedFeatures())
.transform(withLabel())
.select("features", "label")
Now we're ready to run the logistic regression model.
val predictions: DataFrame = TitanicLogisticRegression
.model()
.transform(testDF)
.select(
col("label"),
col("rawPrediction"),
col("prediction")
)
We can use the BinaryClassificationEvaluator
class to test the accuracy of our model.
new BinaryClassificationEvaluator().evaluate(predictions) // 0.83
Persisting the model
We can write the logistic regression model to disc with this command.
model().save("./tmp/titanic_model/")
The predictions can be generated with the model that's been persisted in the filesystem.
val predictions: DataFrame = LogisticRegressionModel
.load("./tmp/titanic_model/")
.transform(testDF)
.select(
col("label"),
col("rawPrediction"),
col("prediction")
)
Training a model can be expensive and you'll want to persist your model rather than generating it on the fly every time it's needed.
Next steps
Spark makes it easy to run logistic regression analyses at scale.
From a code organization standpoint, it's easier to separate the data munging and machine learning code in separate objects.
You'll need to understand how Spark executes programs to performance tune your models. Training a logistic regression model once and persisting the results will obviously be a lot faster than retraining the model every time it's run.