Decision Trees and Random Forests¶
In this notebook, we will use Decision Trees and Random Forests for classification purposes. However, please note that decision trees and random forests can also be used to predict numerical outcomes via regression. Therefore, decision trees and random forests are supervised learning algorithms.
In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
In [2]:
# Create a Spark Session
spark=SparkSession.builder.appName(‘Congressional Voting’).getOrCreate()
Here, we define the Schema of the dataset since the inferSchema option is set to False and the CSV file does not contain column names. This is a good chance to see how Schema can be manually defined instead of relying on the automatic inferSchema option all the time.
Here is the original source of this dataset: https://archive.ics.uci.edu/ml/datasets/congressional+voting+records
In [3]:
# Load the Congressional Voting dataset (house-votes-84.csv) into a Spark DataFrame
my_schema = StructType([
StructField(“party”, StringType()),
StructField(“handicapped_infants”, StringType()),
StructField(“water_project_cost_sharing”, StringType()),
StructField(“adoption_of_the_budget_resolution”, StringType()),
StructField(“physician_fee_freeze”, StringType()),
StructField(“el_salvador_aid”, StringType()),
StructField(“religious_groups_in_schools”, StringType()),
StructField(“anti_satellite_test_ban”, StringType()),
StructField(“aid_to_nicaraguan_contras”, StringType()),
StructField(“mx_missile”, StringType()),
StructField(“immigration”, StringType()),
StructField(“synfuels_corporation_cutback”, StringType()),
StructField(“education_spending”, StringType()),
StructField(“superfund_right_to_sue”, StringType()),
StructField(“crime”, StringType()),
StructField(“duty_free_exports”, StringType()),
StructField(“export_administration_act_south_africa”, StringType())
])
congressional_voting_df = spark.read.csv(‘house-votes-84.csv’,
inferSchema=False,
header=False,
schema=my_schema)
In the cell below, we momentarily convert the Spark DataFrame into a Pandas DataFrame using the toPandas() method. For display purposes, the Pandas DataFrame may be preferred because it is more visually pleasing. The head() method actually belongs to the Pandas DataFrame and it displays the first 5 rows of data by default.
Please scroll through the columns and observe the data types of each column. You will notice that many of them are actually categorical features instead of numerical features that we have been seeing previously. Categorical features require a different method of treating them compared to numerical features. Often, the machine learning Python libraries expect the contents of the columns in the DataFrame to be numerals (floats, integers) instead of strings or text. Even in the situation in which the categorical columns are integers, we will need to pre-process them using the StringIndexer to be sure that the contents of these columns are read in correctly. The StringIndexer essentially converts each unique category in each column into a float value (usually starting from 0.0, followed by 1.0, 2.0, … ).
After converting the category names into float values, we need to further pre-process them by encoding them using the OneHotEncoderEstimator. Why do you think just converting the categories into float values is insufficient? In other words, why do you think we need one-hot encoding after using StringIndexer? Read more about this here: https://machinelearningmastery.com/why-one-hot-encode-data-in-machine-learning
After the One Hot Encoding process has been completed, we can observe that for each unique category in each column, a brand new column is generated. In such a brand new column, the contents of the rows only consist of 1 or 0, indicating absence or presence of that specific category. Sometimes, each of these new columns are also called dummy variables.
In [4]:
congressional_voting_df.toPandas().head()
Out[4]:
party
handicapped_infants
water_project_cost_sharing
adoption_of_the_budget_resolution
physician_fee_freeze
el_salvador_aid
religious_groups_in_schools
anti_satellite_test_ban
aid_to_nicaraguan_contras
mx_missile
immigration
synfuels_corporation_cutback
education_spending
superfund_right_to_sue
crime
duty_free_exports
export_administration_act_south_africa
0
republican
n
y
n
y
y
y
n
n
n
y
?
y
y
y
n
y
1
republican
n
y
n
y
y
y
n
n
n
n
n
y
y
y
n
?
2
democrat
?
y
y
?
y
y
n
n
n
n
y
n
y
y
n
n
3
democrat
n
y
y
n
?
y
n
n
n
n
y
n
y
n
n
y
4
democrat
y
y
y
n
y
y
n
n
n
n
y
?
y
y
y
y
Here, we make use of the shape attribute of the Pandas DataFrame to find out that there are 435 rows of data and 17 columns or features.
In [5]:
congressional_voting_df.toPandas().shape
Out[5]:
(435, 17)
The contents of the cell below may seem somewhat daunting at first glance but it is not as bad as it seems. Let’s go through step-by-step. In the first step, we create a Python list named categorical_columns and it contains the column names of the categorical columns. We will use this list later, for now it is merely a collection of column names. Next, we will create a new empty Python list named pipeline_stages and we will eventually fill it up (using a for loop) with pre-processing steps that we want to apply onto each of the categorical columns. What are these pre-processing steps then?
For each categorical column, we will use the StringIndexer ( https://spark.apache.org/docs/2.4.0/ml-features.html#stringindexer ) to convert the text category names into index numbers (in the float data type). The new output column will have a suffix of ‘Index’ appended to the end of its original column name. For example, after indexing on the ‘crime’ column, the new output column is named ‘crimeIndex’. This is the first step/stage in the pre-processing pipeline. You can think of this pipeline as a workflow consisting of a sequence of steps executed in a specific sequence.
Next, we take the output of the string_indexer and use it as input for the OneHotEncoderEstimator ( https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator ). As described above, we have to convert the index numbers into dummy variables in separate new columns. This is the second step/stage in the pre-processing pipeline. The new output columns will have a suffix of ‘classVec’ appended to the end of its original column name.
Next, we add these 2 steps into the pipeline_stages list, using pipeline_stages += [string_indexer, encoder]. However, please note that these stages in the pipeline have not been executed. They are just instructions for the actual pipeline that will be executed eventually in the later part of our codes, not in this cell. You can read more about Pipelines here: https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works
In [6]:
# Index the relevant categorical and label variables using a Pipeline of stages
categorical_columns = [‘handicapped_infants’,
‘water_project_cost_sharing’,
‘adoption_of_the_budget_resolution’,
‘physician_fee_freeze’,
‘el_salvador_aid’,
‘religious_groups_in_schools’,
‘anti_satellite_test_ban’,
‘aid_to_nicaraguan_contras’,
‘mx_missile’,
‘immigration’,
‘synfuels_corporation_cutback’,
‘education_spending’,
‘superfund_right_to_sue’,
‘crime’,
‘duty_free_exports’,
‘export_administration_act_south_africa’]
pipeline_stages = []
for categorial_column in categorical_columns:
string_indexer = StringIndexer(inputCol = categorial_column,
outputCol = categorial_column + ‘Index’)
encoder = OneHotEncoderEstimator(
inputCols = [string_indexer.getOutputCol()],
outputCols=[categorial_column + “classVec”])
pipeline_stages += [string_indexer, encoder]
Target and Problem Statement¶
In the previous cell, we were handling the feature columns. Clearly, we have not done anything to the target column yet. What is our target column? How do we decide which column to use as the target? Before we proceed, we need to figure out what is the problem that we are trying to answer using this dataset.
This dataset includes votes for each of the U.S. House of Representatives Congressmen on the 16 key votes identified by the CQA. The CQA lists nine different types of votes: voted for, paired for, and announced for (these three simplified to yea), voted against, paired against, and announced against (these three simplified to nay), voted present, voted present to avoid conflict of interest, and did not vote or otherwise make a position known (these three simplified to an unknown disposition).
Problem Statement¶
Using these congressional voting records from the U.S. House of Representatives, we want to build a classfication model that can predict whether a given congressman or congresswoman is a Republican or Democrat (political party affiliation). Therefore, the column named party will be the target column for this problem statement.
Pre-processing the Target column¶
Next, we will pre-process the target column to keep it consistent with the treatments received by the feature columns. Similar to what has been done to the feature columns, we need to convert target party column using the StringIndexer into numerical Index values. We add this step into the pipeline_stages list using pipeline_stages += [label_string_idx].
In [7]:
label_string_idx = StringIndexer(inputCol = ‘party’,
outputCol = ‘label’)
pipeline_stages += [label_string_idx]
Next, we use the VectorAssembler to consolidate feature columns into single vectors inside a single column named features instead of keeping them as separate columns. We add this step into the pipeline_stages list using pipeline_stages += [vector_assembler].
In [8]:
vector_assembler_inputs = [c + “classVec” for c in categorical_columns]
vector_assembler = VectorAssembler(inputCols = vector_assembler_inputs,
outputCol = “features”)
pipeline_stages += [vector_assembler]
In the cell below, we are just taking a quick view of the stages that we have added into the pipeline. Once again, please be aware that the stages or steps in the pipeline are not executed yet. We will do that soon but not yet.
In [9]:
print(pipeline_stages)
[StringIndexer_860bb715e485, OneHotEncoderEstimator_0fdc631ecbb1, StringIndexer_4c214be09066, OneHotEncoderEstimator_b76ea4d13504, StringIndexer_ae73e34e2b7f, OneHotEncoderEstimator_9856b02ccd15, StringIndexer_990a2d740555, OneHotEncoderEstimator_a46e9f316af3, StringIndexer_f62baa7836a0, OneHotEncoderEstimator_212c6d17a573, StringIndexer_a8ab119b57cf, OneHotEncoderEstimator_0e18f2532ec0, StringIndexer_bb125751a83e, OneHotEncoderEstimator_4fe8b9545299, StringIndexer_1f7f804ab94a, OneHotEncoderEstimator_b2b905aed10b, StringIndexer_1c783933effd, OneHotEncoderEstimator_729394d33b22, StringIndexer_8bda194fe130, OneHotEncoderEstimator_1db1d522a1a8, StringIndexer_54601c8387b2, OneHotEncoderEstimator_45a7ca8168db, StringIndexer_6cea1756eb1f, OneHotEncoderEstimator_f499f1c23b7f, StringIndexer_a406bc1ea6d6, OneHotEncoderEstimator_84cf76587ad3, StringIndexer_40e696d6b5de, OneHotEncoderEstimator_6c1919d15173, StringIndexer_1edcc0e11065, OneHotEncoderEstimator_eae4313a1514, StringIndexer_670bc4f6a962, OneHotEncoderEstimator_21f0fd3d2473, StringIndexer_f1551cbabcce, VectorAssembler_6baa5736b0da]
Next, we will generate consolidated input Feature Vectors from the Spark DataFrame by executing the previously constructed Pipeline stages. In other words, this is the actual step that executes all those steps that we have accumulated previously.
In [10]:
pipeline = Pipeline(stages = pipeline_stages)
pipeline_model = pipeline.fit(congressional_voting_df)
Here, we take a quick look at the input DataFrame.
In [11]:
label_column = ‘label’
congressional_voting_features_df = pipeline_model \
.transform(congressional_voting_df) \
.select([‘features’, label_column, ‘party’])
congressional_voting_features_df.toPandas().head(8)
Out[11]:
features
label
party
0
(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, …
1.0
republican
1
(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, …
1.0
republican
2
(0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, …
0.0
democrat
3
(1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, …
0.0
democrat
4
(0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, …
0.0
democrat
5
(1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, …
0.0
democrat
6
(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, …
0.0
democrat
7
(1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, …
1.0
republican
Here, we check the class balance. In other words, we want to know whether there are similar number of Democrats versus Republicans. Obviously, for real-world data, we cannot possibly expect the ratio between the different classes or categories to be exactly equal. We are merely checking if that there is a majority class that is significantly larger than the others.
In [12]:
congressional_voting_features_df.groupBy(‘party’).count().toPandas()
Out[12]:
party
count
0
democrat
267
1
republican
168
Next, we will conduct the train-test split. In this split, the rows of the input DataFrame are randomly split into the Training Set and the Test Set. Typically, the Training Set will receive 60% to 70% of the total number of rows. The randomSplit() method does the randomization for you, so there is no need for any further randomization of your own.
In [13]:
# Split the Raw Features and Labelled DataFrame into a Training DataFrame and a Test DataFrame
train_df, test_df = congressional_voting_features_df.randomSplit([0.7, 0.3], seed=98765)
train_df.count(), test_df.count()
Out[13]:
(314, 121)
Here, we are just doing a quick check for the class balance in the Training Set.
In [14]:
train_df.groupBy(‘party’).count().toPandas()
Out[14]:
party
count
0
democrat
196
1
republican
118
Here, we are just doing a quick check for the class balance in the Test Set.
In [15]:
test_df.groupBy(‘party’).count().toPandas()
Out[15]:
party
count
0
democrat
71
1
republican
50
Decision Tree¶
Next, we will train the Decision Tree model using the input DataFrame, specifically informing the model which column contains the features and which column contains the labels. In other words, this is the actual step in which the model training is carried out.
In [16]:
# Train a Classification Tree Model on the Training DataFrame
decision_tree = DecisionTreeClassifier(featuresCol = ‘features’,
labelCol = label_column)
decision_tree_model = decision_tree.fit(train_df)
Once the training of the Decision Tree model is complete, we will use the Test Set as the input data for the model to carry its predictions. We can also verify that everything is working as expected by checking on the probabilities that the model has calculated. Effectively, for each class, the model will output the corresponding probability that the specific data point will belong to that particular class. The prediction column is the predicted class label based on the calculated probabilities.
In [17]:
# Apply the Trained Classification Tree Model to the Test DataFrame to make predictions
test_decision_tree_predictions_df = decision_tree_model.transform(test_df)
print(“TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: “)
test_decision_tree_predictions_df.select(“probability”, “prediction”, label_column).toPandas().head(15)
TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL:
Out[17]:
probability
prediction
label
0
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
1
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
2
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
3
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
4
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
5
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
6
[0.0, 1.0]
1.0
1.0
7
[0.994475138121547, 0.0055248618784530384]
0.0
0.0
8
[0.0, 1.0]
1.0
1.0
9
[0.0, 1.0]
1.0
1.0
10
[0.0, 1.0]
1.0
1.0
11
[0.0, 1.0]
1.0
1.0
12
[0.0, 1.0]
1.0
1.0
13
[0.0, 1.0]
1.0
1.0
14
[0.0, 1.0]
1.0
1.0
AUC (Area Under The Curve) of the ROC (Receiver Operating Characteristics) curve¶
It is one of the most important evaluation metrics for checking any classification model’s performance. It is also written as AUROC (Area Under the Receiver Operating Characteristics).
The higher the value of AUC under ROC Curve, the better the model is assuming that the classes are somewhat balanced.
In [18]:
evaluator_roc_area = BinaryClassificationEvaluator(rawPredictionCol = “rawPrediction”,
labelCol = label_column,
metricName = “areaUnderROC”)
print(“Area Under ROC Curve on Test Data = %g” % evaluator_roc_area.evaluate(test_decision_tree_predictions_df))
Area Under ROC Curve on Test Data = 0.946338
In [19]:
# Visualise the Classification Tree
print(str(decision_tree_model.toDebugString))
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_067ce5622adb) of depth 5 with 33 nodes
If (feature 7 in {0.0})
If (feature 6 in {1.0})
Predict: 0.0
Else (feature 6 not in {1.0})
If (feature 2 in {1.0})
Predict: 0.0
Else (feature 2 not in {1.0})
If (feature 16 in {1.0})
If (feature 9 in {0.0})
Predict: 0.0
Else (feature 9 not in {0.0})
Predict: 1.0
Else (feature 16 not in {1.0})
Predict: 1.0
Else (feature 7 not in {0.0})
If (feature 21 in {1.0})
If (feature 23 in {0.0})
If (feature 30 in {0.0})
Predict: 0.0
Else (feature 30 not in {0.0})
If (feature 4 in {0.0})
Predict: 0.0
Else (feature 4 not in {0.0})
Predict: 1.0
Else (feature 23 not in {0.0})
If (feature 4 in {1.0})
If (feature 2 in {1.0})
Predict: 0.0
Else (feature 2 not in {1.0})
Predict: 1.0
Else (feature 4 not in {1.0})
Predict: 1.0
Else (feature 21 not in {1.0})
If (feature 29 in {1.0})
If (feature 18 in {0.0})
If (feature 16 in {1.0})
Predict: 0.0
Else (feature 16 not in {1.0})
Predict: 1.0
Else (feature 18 not in {0.0})
Predict: 1.0
Else (feature 29 not in {1.0})
If (feature 4 in {1.0})
If (feature 30 in {0.0})
Predict: 0.0
Else (feature 30 not in {0.0})
Predict: 1.0
Else (feature 4 not in {1.0})
Predict: 1.0
Random Forest¶
After trying out the Decision Tree algorithm, we are curious to find out if using the Random Forest algorithm will improve the results.
Here, we will train the Random Forest model using the Training Set.
In [20]:
# Train a Random Forest Classifier Model on the Training DataFrame
random_forest = RandomForestClassifier(featuresCol = ‘features’,
labelCol = label_column)
random_forest_model = random_forest.fit(train_df)
Once the training is complete, we will use the Test Set as the input data for the Random Forest model.
In [21]:
# Apply the Trained Random Forest Classifier Model to the Test DataFrame to make predictions
test_random_forest_predictions_df = random_forest_model.transform(test_df)
print(“TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: “)
test_random_forest_predictions_df.select(“probability”, “prediction”, label_column).toPandas().head(15)
TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL:
Out[21]:
probability
prediction
label
0
[0.7881692868534973, 0.21183071314650262]
0.0
0.0
1
[0.8869513799971547, 0.11304862000284535]
0.0
0.0
2
[0.907866256550467, 0.09213374344953293]
0.0
0.0
3
[0.9879667116509221, 0.012033288349077822]
0.0
0.0
4
[1.0, 0.0]
0.0
0.0
5
[1.0, 0.0]
0.0
0.0
6
[0.2495784923537217, 0.7504215076462784]
1.0
1.0
7
[0.9, 0.1]
0.0
0.0
8
[0.256763737663967, 0.7432362623360331]
1.0
1.0
9
[0.012687149758153764, 0.9873128502418462]
1.0
1.0
10
[0.07031716059238995, 0.9296828394076101]
1.0
1.0
11
[0.0626753934963975, 0.9373246065036025]
1.0
1.0
12
[0.012687149758153764, 0.9873128502418462]
1.0
1.0
13
[0.012687149758153764, 0.9873128502418462]
1.0
1.0
14
[0.03768714975815375, 0.9623128502418462]
1.0
1.0
In [22]:
# Evaluate the performance of our Random Forest Classifier Model on the Test DataFrame
# using Area under a ROC curve
evaluator_rf_roc_area = BinaryClassificationEvaluator(rawPredictionCol = “rawPrediction”,
labelCol = label_column,
metricName = “areaUnderROC”)
print(“Area Under ROC Curve on Test Data = %g” % evaluator_rf_roc_area.evaluate(test_random_forest_predictions_df))
Area Under ROC Curve on Test Data = 0.990563
Based on the area under the ROC curve, the Random Forest model has outperformed the Decision Tree model. However, since ROC curve is still based on the concept of accuracy, it should not be used to evaluate models that are built using the imbalanced datasets. Imbalanced datasets are datasets in which there is a significant majority class among the data points. For imbalanced datasets, other evaluation metrics such as Precision, Recall should be used instead.
In [23]:
# Stop the Spark Session
spark.stop()