732A54/TDDE31 Big Data Analytics
Lecture 11: Machine Learning with Spark
Jose M. Pen ̃a
IDA, Linko ̈ping University, Sweden
1/18
Contents
▸ Spark Framework
▸ Machine Learning with Spark
L K -Means
L Logistic Regression L MLlib
L Experiments
▸ Lab with Spark ▸ Summary
2/18
Literature
▸ Main sources
L Zaharia, M. et al. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. In Proceedings of the 9th USENIX Symposium on Networked Systems Design and Implementation, 15-28, 2012.
L Meng, X. et al. MLlib: Machine Learning in Apache Spark. Journal of Machine Learning Research, 17(34):17, 2016.
▸ Additional sources
L Zaharia, M. et al. Apache Spark: A Unified Engine for Big Data Processing. Communications of the ACM, 59(11):56-65, 2016.
L Spark programming guide available at https://spark.apache.org/docs/latest/rdd-programming-guide.html
L MLlib manual available at http://spark.apache.org/docs/latest/ml-guide.html
L Slides for 732A99/TDDE01 Machine Learning.
3/18
Spark Framework
▸ Recall from the previous lecture that MapReduce can emulate any distributed computation, since this can be divided into a sequence of MapReduce calls.
▸ However, the emulation may be inefficient since the message exchange relies on external storage, e.g. disk.
▸ This is a problem for iterative machine learning algorithms. Even worse: Each iteration (i.e., MapReduce call) loads the data anew from disk.
▸ Apache Spark is a framework to process large amounts of data by parallelizing computations across a cluster of nodes.
▸ It builds on MapReduce’s ability to emulate any distributed computation but it makes it more efficiently by emulating in-memory data sharing across MapReduce calls.
▸ It includes MLlib, a library for machine learning that uses linear algebra libraries on each node.
4/18
Spark Framework
▸ Data sharing is achieved via resilient distributed datasets (RDDs).
▸ RDD is a read-only, partitioned collection of records that can only be defined through transformations applied to external storage or to other RDDs.
5/18
Spark Framework
▸ Note that some transformations and actions do not require RDDs of (key, value) pairs, i.e. so-called pair RDDs.
▸ Note also that the transformations and actions for non-pair RDDs work on pair RDDs too.
6/18
Spark Framework
▸ Data sharing is achieved via resilient distributed datasets (RDDs).
▸ RDD is a read-only, partitioned collection of records that can only be
defined through transformations applied to external storage or to other RDDs.
▸ The sequence of transformations that defines a RDD is called its lineage. It is used to rebuild it in case of failure, i.e. there is no data replication unlike in MapReduce.
▸ RDDs are created only when an action is executed. Why ? E.g., [read + filter] more memory efficient than [read, filter].
▸ Actually, RDDs are created each time an action is executed, unless the user persist them in memory and/or disk.
▸ Actions write to disk or return values to the master/driver. 7/18
Spark Framework
▸ Example in Scala to find error lines in a log file:
1.lines=spark.textFile(“hdfs://…”)
2.errors=lines.filter( .startsWith(“ERROR”))
3.errors.persist() //Store in memory
4.errors.count() //Materialize
5.errors.filter( .contains(“HDFS”)).map( .split(’ t’)(3)).collect()
▸ Note that:
L Line 3 indicates to store the error lines in memory. Note persist() = persist(MEMORY ONLY) = cache() x persist(MEMORY AND DISK) x . . .
L However, this does not happen until line 4, when the RDDs are computed. L The rest of the RDDs (e.g., lines) are discarded after being used.
L Line 5 does not access disk because the data are in memory.
L If any partition of the in-memory data has gone lost, it can be rebuilt with
the help of the lineage graph.
8/18
Spark Framework
▸ When an action is executed, the lineage graph is used by the driver to schedule jobs similarly to MapReduce, with the difference that as many transformations as possible are pipelined and assigned to the same worker.
9/18
Machine Learning with Spark: K-Means
▸ K-Means in Python (data should have been persisted when read from file):
10/18
Machine Learning with Spark: Logistic Regression
▸ Consider a binary classification problem, i.e. t ∈ {−1, +1}. Then,
p(x∣t = +1)p(t = +1)
p(x∣t = +1)p(t = +1) + p(x∣t = −1)p(t = −1)
= σ(s(x)) where s(x) = log p(x∣t=+1)p(t=+1) , and σ(a) = 1 is called logistic
p(t = +1∣x) =
p(x∣t=−1)p(t=−1) 1+exp(−a)
sigmoid function.
▸ We assume that p(x∣t) is a member of the exponential family (e.g.
Gaussian, multinomial), which implies that s(x)=wTx. The model y(x) = p(t = +1∣x) = σ(wTx) is called logistic regression.
▸ Note that
1
p(t =+1∣x)=σ(s(x))= 1+exp(−wTx)
1
p(t =−1∣x)=1−p(t =+1∣x)=1−σ(s(x))=σ(−s(x))= 1+exp(wTx)
1
p(tn∣xn) = T . 1 + exp(−tnw xn)
▸ We determine the parameters w by minimizing the negative log-likelihood: L(w ) = − ∑ log p(tn ∣x n ) = ∑ log(1 + exp(−tnw T x n ))
nn
whose gradient is −∑n tn(1−1/(1+exp(−tnwTxn)))xn.
11/18
Machine Learning with Spark: Logistic Regression
▸ Logistic regression in Scala (note the use of persist, map and reduce):
▸ Logistic regression in Python:
12/18
Machine Learning with Spark: MLlib
▸ Many machine learning methods are already implemented in MLlib, i.e. the user does not need to specify the map and reduce functions.
▸ Logistic regression in Python:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) lrModel = lr.fit(training)
▸ SVMs in Python:
model = SVMWithSGD.train(parsedData, iterations=100)
▸ NNs in Python:
layers = [4, 5, 4, 3]
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
model = trainer.fit(train)
▸ MMs in Python:
gmm = GaussianMixture().setK(2) model = gmm.fit(dataset)
▸ K -Means in Python:
kmeans = KMeans().setK(2).setSeed(1) model = kmeans.fit(dataset)
13/18
Machine Learning with Spark: Experiments
14/18
Lab with Spark
▸ Implement a kernel model to predict the hourly temperatures for a date and place in Sweden. To do so, you are provided with the files stations.csv and temps.csv. These files contain information about weather stations and temperature measurements for the stations at different days and times. The data have been kindly provided by the Swedish Meteorological and Hydrological Institute (SMHI) and processed by Zlatan Dragisic.
▸ You are asked to provide a temperature forecast for a date and place in Sweden. The forecast should consist of the predicted temperatures from 4 am to 24 pm in an interval of 2 hours. Use a kernel that is the sum of three Gaussian kernels:
L The first to account for the distance from a station to the point of interest. L The second to account for the distance between the day a temperature
measurement was made and the day of interest.
L The third to account for the distance between the hour of the day a
temperature measurement was made and the hour of interest.
▸ Repeat the exercise about multiplying instead of summing the three kernels above.
15/18
Lab with Spark
▸ Consider regressing an unidimensional continuous random variable Y on a D-dimensional continuous random variable X.
▸ The best regression function under the squared error loss function is y∗(x) = EY [y∣x].
▸ Since x may not appear in the finite training set {(xn,yn)} available, then we output a weighted average over all the training points. That is
∑ k(x−xn)yn nh
y(x) =
∑ k(x−xn )
nh
where k ∶ RD → R is a kernel function, which is usually non-negative and monotone decreasing along rays starting from the origin. The parameter h is called smoothing factor or width.
▸ Gaussian kernel: k(u) = exp(−∣∣u∣∣2) where ∣∣ ⋅ ∣∣ is the Euclidean norm.
16/18
Lab with Spark
▸ Bear in mind that a join operation may trigger a shuffle operation, which is time and memory consuming.
▸ Instead, broadcast one of the RDDs to join, if small. This sends a copy of the RDD to each node, and the join can be performed locally (or even skipped).
rdd = rdd.collectAsMap()
bc = sc.broadcast(rdd)
bc.value[i]
17/18
Summary
▸ Spark is a framework to process large datasets by parallelizing computations.
▸ It is particularly suitable for iterative distributed computations, since data can be store in memory.
▸ It includes MLlib, a machine learning library.
18/18