K-means Clustering¶
Scenario¶
A large technology firm has been recently hacked. Luckily, the forensic engineers at the company have been able to grab metadata about each session used by the hackers to connect to RhinoTech servers. This data includes information such as session time, locations, words-per-minute typing speed, etc.
• You have been informed that there are three potential hackers that perpetrated the attack.
• The forensic team are certain that the first two hackers were involved, but they want to know whether the third hacker was involved as well.
• One last key piece of information you’ve been given by the forensic engineers is that the hackers divide the number of hack instances equally among themselves.
In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
The dataset contains 334 attack instances, with the following information for each one:
• Session_Connection_Time (How long the session lasted in minutes)
• Bytes Transferred (Megabytes transferred during session)
• Kali_Trace_Used (Whether the hacker was using Kali Linux)
• Servers_Corrupted (Number of server corrupted during the attack)
• Pages_Corrupted (Number of pages illegally accessed)
• Location (Location attack came from)
• WPM_Typing_Speed (Estimated typing speed based on session logs)
In [2]:
spark = SparkSession.builder.appName(‘hack-clustering’).getOrCreate()
data = spark.read.csv(‘hack_data.csv’, header=True, inferSchema=True)
In [3]:
data.printSchema()
root
|– Session_Connection_Time: double (nullable = true)
|– Bytes Transferred: double (nullable = true)
|– Kali_Trace_Used: integer (nullable = true)
|– Servers_Corrupted: double (nullable = true)
|– Pages_Corrupted: double (nullable = true)
|– Location: string (nullable = true)
|– WPM_Typing_Speed: double (nullable = true)
In [4]:
data.toPandas().head(8)
Out[4]:
Session_Connection_Time
Bytes Transferred
Kali_Trace_Used
Servers_Corrupted
Pages_Corrupted
Location
WPM_Typing_Speed
0
8.0
391.09
1
2.96
7.0
Slovenia
72.37
1
20.0
720.99
0
3.04
9.0
British Virgin Islands
69.08
2
31.0
356.32
1
3.71
8.0
Tokelau
70.58
3
2.0
228.08
1
2.48
8.0
Bolivia
70.80
4
20.0
408.50
0
3.57
8.0
Iraq
71.28
5
1.0
390.69
1
2.79
9.0
Marshall Islands
71.57
6
18.0
342.97
1
5.10
7.0
Georgia
72.32
7
22.0
101.61
1
3.03
7.0
Timor-Leste
72.03
Here’s we put every feature into one Python list.
In [5]:
cols = [‘Session_Connection_Time’,
‘Bytes Transferred’,
‘Kali_Trace_Used’,
‘Servers_Corrupted’,
‘Pages_Corrupted’,
‘WPM_Typing_Speed’]
We use VectorAssembler to create a new column named ‘features’ that will house all the above features in a single column.
In [6]:
assembler = VectorAssembler(inputCols=cols, outputCol=’features’)
Next, we will use the assembler that we have created to transform our data.
In [7]:
assembled_data = assembler.transform(data)
Most of the time, your dataset will contain features highly varying in magnitudes, units and range. However, the common machine learning algorithms use Eucledian distance between two data points in their computations, Therefore, you need to scale your features so that all the features will be on the same scale and hence comparable to one another.
The StandardScaler transforms a dataset of rows, normalizing each feature to have unit standard deviation and/or zero mean. https://spark.apache.org/docs/latest/ml-features.html#standardscaler
In [8]:
scaler = StandardScaler(inputCol=’features’, outputCol=’scaledFeatures’)
We have created a new column named scaledFeatures using the standard scaler.
In [9]:
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)
scaled_data.printSchema()
root
|– Session_Connection_Time: double (nullable = true)
|– Bytes Transferred: double (nullable = true)
|– Kali_Trace_Used: integer (nullable = true)
|– Servers_Corrupted: double (nullable = true)
|– Pages_Corrupted: double (nullable = true)
|– Location: string (nullable = true)
|– WPM_Typing_Speed: double (nullable = true)
|– features: vector (nullable = true)
|– scaledFeatures: vector (nullable = true)
Note that the features (shown in the cell below) are in within similar ranges and scales to one another after the scaling process.
In [10]:
scaled_data.select(‘scaledFeatures’).show(8,False)
+——————————————————————————————————————-+
|scaledFeatures |
+——————————————————————————————————————-+
|[0.5678510846650524,1.3658432518957642,1.9975768336483841,1.2858903881191532,2.2849485348398866,5.396290958577967] |
|[1.419627711662631,2.517986463945197,0.0,1.320644182392644,2.9377909733655687,5.150971112595909] |
|[2.2004229530770782,1.2444124562517545,1.9975768336483841,1.611707209433128,2.6113697541027276,5.262819066691072] |
|[0.1419627711662631,0.7965469045293562,1.9975768336483841,1.0773676224782096,2.6113697541027276,5.279223433291696] |
|[1.419627711662631,1.4266459597520256,0.0,1.5508880694545193,2.6113697541027276,5.315014778602148] |
|[0.07098138558313155,1.3644462913476594,1.9975768336483841,1.2120385752879856,2.9377909733655687,5.336638716393879]|
|[1.277664940496368,1.197788897958757,1.9975768336483841,2.2155543849350274,2.2849485348398866,5.3925626934414606] |
|[1.561590482828894,0.35486290323232145,1.9975768336483841,1.3162999581084576,2.2849485348398866,5.370938755649729] |
+——————————————————————————————————————-+
only showing top 8 rows
Next, we create two separate K-means Clustering models with two different K values. Reminder: K is the number of clusters or the number of centroids that you want the eventual clustering model to have.
In [11]:
k_means_2 = KMeans(featuresCol=’scaledFeatures’, k=2)
k_means_3 = KMeans(featuresCol=’scaledFeatures’, k=3)
In [12]:
%%time
model_k2 = k_means_2.fit(scaled_data)
CPU times: user 17.2 ms, sys: 8.62 ms, total: 25.9 ms
Wall time: 5.77 s
In [13]:
%%time
model_k3 = k_means_3.fit(scaled_data)
CPU times: user 9.52 ms, sys: 9.11 ms, total: 18.6 ms
Wall time: 2.69 s
Now that we have created our two K-means cluster models, we use the models to cluster our scaled data.
We observed that when k=3, then the cluster sizes are unequal. According to the scenario that we are dealing with (the hacking), this is not in line with what we were told by the forensic team.
In [14]:
model_k3_data = model_k3.transform(scaled_data)
model_k3_data.groupBy(‘prediction’).count().show()
+———-+—–+
|prediction|count|
+———-+—–+
| 1| 167|
| 2| 79|
| 0| 88|
+———-+—–+
In [15]:
model_k3_data.toPandas().head()
Out[15]:
Session_Connection_Time
Bytes Transferred
Kali_Trace_Used
Servers_Corrupted
Pages_Corrupted
Location
WPM_Typing_Speed
features
scaledFeatures
prediction
0
8.0
391.09
1
2.96
7.0
Slovenia
72.37
[8.0, 391.09, 1.0, 2.96, 7.0, 72.37]
[0.5678510846650524, 1.3658432518957642, 1.997…
1
1
20.0
720.99
0
3.04
9.0
British Virgin Islands
69.08
[20.0, 720.99, 0.0, 3.04, 9.0, 69.08]
[1.419627711662631, 2.517986463945197, 0.0, 1….
1
2
31.0
356.32
1
3.71
8.0
Tokelau
70.58
[31.0, 356.32, 1.0, 3.71, 8.0, 70.58]
[2.2004229530770782, 1.2444124562517545, 1.997…
1
3
2.0
228.08
1
2.48
8.0
Bolivia
70.80
[2.0, 228.08, 1.0, 2.48, 8.0, 70.8]
[0.1419627711662631, 0.7965469045293562, 1.997…
1
4
20.0
408.50
0
3.57
8.0
Iraq
71.28
[20.0, 408.5, 0.0, 3.57, 8.0, 71.28]
[1.419627711662631, 1.4266459597520256, 0.0, 1…
1
We can see that there are 2 equal size clusters when k=2. According the scenario that we are trying solve, we are more confident that k=2 is the correct model to use since the prediction results indicate the hackers sharing the number of hacking instances equally among themselves.
In [16]:
model_k2_data = model_k2.transform(scaled_data)
model_k2_data.groupBy(‘prediction’).count().show()
+———-+—–+
|prediction|count|
+———-+—–+
| 1| 167|
| 0| 167|
+———-+—–+
In [17]:
model_k2_data.toPandas().head()
Out[17]:
Session_Connection_Time
Bytes Transferred
Kali_Trace_Used
Servers_Corrupted
Pages_Corrupted
Location
WPM_Typing_Speed
features
scaledFeatures
prediction
0
8.0
391.09
1
2.96
7.0
Slovenia
72.37
[8.0, 391.09, 1.0, 2.96, 7.0, 72.37]
[0.5678510846650524, 1.3658432518957642, 1.997…
1
1
20.0
720.99
0
3.04
9.0
British Virgin Islands
69.08
[20.0, 720.99, 0.0, 3.04, 9.0, 69.08]
[1.419627711662631, 2.517986463945197, 0.0, 1….
1
2
31.0
356.32
1
3.71
8.0
Tokelau
70.58
[31.0, 356.32, 1.0, 3.71, 8.0, 70.58]
[2.2004229530770782, 1.2444124562517545, 1.997…
1
3
2.0
228.08
1
2.48
8.0
Bolivia
70.80
[2.0, 228.08, 1.0, 2.48, 8.0, 70.8]
[0.1419627711662631, 0.7965469045293562, 1.997…
1
4
20.0
408.50
0
3.57
8.0
Iraq
71.28
[20.0, 408.5, 0.0, 3.57, 8.0, 71.28]
[1.419627711662631, 1.4266459597520256, 0.0, 1…
1
Therefore, we conclude that the k=2 K-means Clustering model is the one that we should be using to solve this case.
In [18]:
spark.stop()
In [ ]: