732A54/TDDE31 Big Data Analytics
Lecture 10: Machine Learning with MapReduce
Jose M. Pen ̃a
IDA, Linko ̈ping University, Sweden
1/24
Contents
▸ MapReduce Framework
▸ Machine Learning with MapReduce ▸ Neural Networks
▸ Linear Support Vector Machines ▸ K-Means Algorithm
▸ EM Algorithm
▸ Summary
2/24
Literature
▸ Main sources
▸ Dean, J. and Ghemawat, S. MapReduce: Simplified Data Processing on
▸ Large Clusters. Communications of the ACM, 51(1):107-113, 2008.
Chu, C.-T. et al. Map-Reduce for Machine Learning on Multicore. In
Proceedings of the 19th International Conference on Neural Information Processing Systems, 281-288, 2006.
▸ Additional sources
▸ Dean, J. and Ghemawat, S. MapReduce: Simplified Data Processing on
Large Clusters. In Proceedings of the 6th Symposium on Operating Systems
▸ Design and Implementation, 2004.
Gillick, D., Faria, A. and DeNero, J. MapReduce: Distributed Computing
▸ for Machine Learning. Technical Report, Berkley, 2006.
Yahoo tutorial at
▸ https://developer.yahoo.com/hadoop/tutorial/module4.html Slides for 732A99/TDDE01 Machine Learning.
3/24
MapReduce Framework
▸ Programming framework developed at Google to process large amounts of data by parallelizing computations across a cluster of computers.
▸ Easy to use, since the parallelization happens automatically.
▸ Easy to speed up by using/adding more computers to the cluster.
▸ Typical uses at Google:
▸ Large-scale machine learning problems, e.g. clustering documents from
▸ Google News.
▸ Extracting properties of web pages, e.g. web access log data.
▸ Large-scale graph computations, e.g. web link graph.
▸ Statistical machine translation.
▸ Processing satellite images.
Production of the indexing system used for Google’s web search engine.
▸ Google replaced it with Cloud Dataflow/Dataproc/Platform, since it could not process the amount of data they produce.
▸ However, it is still the processing core of Apache Hadoop, another framework for distributed storage and distributed processing of large datasets on computer clusters.
▸ Moreover, it is a straightforward way to adapt some machine learning algorithms to cope with big data.
▸ Apache Mahout is a project to produce distributed implementations of machine learning algorithms. Many available implementations build on Hadoop’s MapReduce. However, these implementations are deprecated.
4/24
MapReduce Framework
▸ The user only has to implement the following two functions: ▸ Map function:
▸ ▸
▸ Input: A pair (in key,in value).
▸ Output: A list list(out key,intermediate value).
Reduce function:
▸ Input: A pair (out key,list(intermediate value)). ▸ Output: A list list(out value).
All intermediate values associated with the same intermediate key are grouped together before passing them to the reduce function.
▸ Example for counting word occurrences in a collection of documents:
5/24
MapReduce Framework
6/24
MapReduce Framework
1. Split the input file in M pieces and store them on the local disks of the nodes of the cluster. Start up many copies of the user’s program on the nodes.
2. One copy (the master) assigns tasks to the rest of the copies (the workers). To reduce communication, it tries to assign map workers to nodes with input data.
7/24
MapReduce Framework
3. Each map worker processes a piece of input data by running the user’s map function on each pair (key, value). The results are buffered in memory.
4. The buffered results are written to local disk. The disk is partitioned in R
pieces, e.g. hash(out key) mod R. The location of the partitions on disk are passed back to the master so that they can be forwarded to the reduce workers.
8/24
MapReduce Framework
5. The reduce worker reads its partition remotely (a.k.a shuffle) and sorts it by key.
6. The reduce worker processes each key using the user’s reduce function. The result is written to the global file system.
7. The output of a MapReduce call may be the input to another. Note that we have performed M map tasks and R reduce tasks.
9/24
MapReduce Framework
▸ MapReduce can emulate any distributed computation, since this consists of nodes that perform local computations and occasionally exchange messages.
▸ Therefore, any distributed computation can be divided into a sequence of MapReduce calls:
▸ First, nodes perform local computations (map), and ▸ then, they exchange messages (reduce).
▸ However, the emulation may be inefficient since the message exchange relies on external storage, e.g. disk.
10/24
MapReduce Framework
▸ Fault tolerance:
▸ Necessary since thousands of nodes may be used.
▸ The master pings the workers periodically. No answer means failure.
▸ If a worker fails then its completed and in-progress map tasks are
▸ re-executed, since its local disk is inaccessible.
Note the importance of storing several copies (typically 3) of the input data
▸ on different nodes.
If a worker fails then its in-progress reduce task is re-executed. The results
of its completed reduce tasks are stored on the global file system and, thus,
▸ they are accessible.
To be able to recover from the unlikely event of a master failure, the master periodically saves the state of the different tasks (idle, in-progress, completed) and the identity of the worker for the non-idle tasks.
▸ Task granularity:
▸ M and R are larger than the number of nodes available.
▸ Large M and R values benefit dynamic load balance and fast failure
▸ recovery.
Too large values may imply too many scheduling decisions, and too many ▸ output files.
For instance, M = 200000 and R = 5000 for 2000 available nodes.
11/24
Machine Learning with MapReduce: Neural Networks
hidden units
zM
wMD (2)
(1)
w
yK
xD inputs x1 x0
KM
outputs
y1 z1 w(2)
10
z0
▸ Activations: aj = ∑ w(1)xi + w(1)
iji j0
▸ Hidden units and activation function: zj = h(aj )
▸ Output activations: ak = ∑ w (2) zj + w (2) jkj k0
▸ Output activation function for regression: yk (x) = ak
▸ Output activation function for classification: yk (x) = σ(ak )
▸ Sigmoid function: σ(a) = 1
▸ Two-layer NN:
yk(x)=σ(∑w(2)h(∑w(1)xi +w(1))+w(2))
▸ Evaluating the previous expression is known as forward propagation. The NN is said to have a feed-forward architecture.
▸ All the previous is, of course, generalizable to more layers.
1+exp(−a)
kj ji j0 k0 ji
12/24
Machine Learning with MapReduce: Neural Networks
▸ Consider regressing an K-dimensional continuous random variable on a D-dimensional continuous random variable.
▸ Consider a training set {(xn,tn)} of size N. Consider minimizing the error function
E(w)=∑En(w)=∑1(y(xn)−tn)2 =∑∑1(yk(xn)−tnk)2 nn2nk2
▸ The weight space is highly multimodal and, thus, we have to resort to approximate iterative methods to minimize the previous expression.
▸ Batch gradient descent
wt+1 =wt −η∇E(wt)=wt −η∑∇En(wt)
n
where η > 0 is the learning rate, and ∇En (w t ) can be computed efficiently
thanks to the backpropagation algorithm.
▸ Each iteration of batch gradient descent can easily be casted into
▸ Map function: Compute the gradient for a training point. Note that this ▸ implies forward and backward propagation.
Reduce function: Sum the partial gradients and update w accordingly.
▸ Note that 1≤M ≤N, whereas R =1.
▸ What is the key and what is the value ? What needs to be broadcasted ?
MapReduce terms:
13/24
Machine Learning with MapReduce: Linear Support Vector Machines
▸ Consider binary classification with input space RD . Consider a training set {(xn,tn)} where tn ∈ {−1,+1}. Consider using the linear model
y (x ) = w T x + b
so that a new point x is classified according to the sign of y(x).
▸ If the training data is linearly separable, the separating hyperplane with the largest margin (i.e. the largest smallest perpendicular distance from any point to the hyperplane) is given by
arg min 1 ∣∣w ∣∣2 w,b 2
y = −1
y = −1 y=0
y=1
y = 1 y=0
margin
▸ The motivation is that the larger the margin, the smaller the generalization error.
14/24
Machine Learning with MapReduce: Linear Support Vector Machines
▸ Without the assumption of linearly separability and with a quadratic penalty for (almost-)misclassified points, the optimal separating hyperplane is given by
y = −1 y=0
argmin
1∣∣w∣∣2 +C ∑ w 2
(wTxn −tn)2 n∈E
ξn
ξ>1
ξ<1
ξ=0
y=1
ξ=0
where C is a user-defined parameter, and n ∈ E if and only if tny(xn) < 1.
▸ Note that the previous expression is a quadratic function and, thus, it is
concave (up) and, thus, ”easy” to minimize. For instance, we can use
again batch gradient descent.
▸ The gradient is given by
w +2C ∑(wTxn −tn)xn n∈E
▸ Each iteration of batch gradient descent can easily be casted into MapReduce terms:
▸ Map function: Compute the gradient for a training point.
▸ Reduce function: Sum the partial gradients and update w accordingly. ▸ Note that 1≤M ≤N, whereas R =1. What is the key and what is the
value ? What needs to be broadcasted ?
15/24
Machine Learning with MapReduce: K-Means Algorithm
▸ Consider data clustering (a.k.a. unsupervised learning) via the K-means algorithm.
1 2 3 4 5
Assign each point to a cluster (a.k.a. subpopulation) at random
Compute the cluster centroids as the averages of the points assigned to each cluster Repeat until the centroids do not change
Assign each point to the cluster with the closest centroid
Update the cluster centroids as the averages of the points assigned to each cluster
16/24
Machine Learning with MapReduce: K-Means Algorithm
1 2 3 4 5
Assign each point to a cluster (a.k.a. subpopulation) at random
Compute the cluster centroids as the averages of the points assigned to each cluster Repeat until the centroids do not change
Assign each point to the cluster with the closest centroid
Update the cluster centroids as the averages of the points assigned to each cluster
▸ Each iteration of the K-means algorithm can easily be casted into MapReduce terms:
▸ Map function: Assign a training point to the population with the closest ▸ mean.
Reduce function: Recalculate the population means from the assignments of the map tasks.
▸ R=K Notethat1≤M≤N,whereasR=1orR=K dependingonwhetherwe
decide to use the population assignment as intermediate key or not.
▸ What is the key and what is the value ? What needs to be broadcasted ?
17/24
Machine Learning with MapReduce: EM Algorithm
▸ The K-means algorithm partitions the data, i.e. it hard-assigns instances to subpopulations. Model-based clustering on the other hand aims to soft-assign instances to the subpopulations by applying Bayes theorem as follows:
p(k∣x,θ,π) = πkp(x∣θk) ∑k πkp(x∣θk)
where p(x∣θk ) are called mixture components, and πk = p(k) are called mixing coefficients. A component models the data distribution for a chosen subpopulation, and a coefficient represents the probability of a subpopulation being chosen.
▸ More specifically, for components modeled as multivariate Gaussian distributions, we have that:
2kkk p(x∣θk)=N(x∣μk,Σk)= 1 1 e−1(x−μ )TΣ1(x−μ ).
▸ To solve model-based clustering, we have to estimate the model parameters (θ,π) from data. To this end, we use the EM algorithm.
2πD/2 ∣Σk∣1/2
18/24
Machine Learning with MapReduce: EM Algorithm
▸ Given a sample {xn} of size N from a mixture of multivariate Gaussian distributions, the expected log likelihood function is maximized when
πML = ∑n p(znk∣xn,π,μ,Σ) kN
∑n xnp(znk∣xn,π,μ,Σ)
μML k
∑n(xn −μk
where zn is a K-dimensional binary vector indicating component
= =
∑n p(znk∣xn,π,μ,Σ)
ML ML T
ΣML k
)(xn −μk ) p(znk∣xn,π,μ,Σ) ∑n p(znk∣xn,π,μ,Σ)
memberships (one-hot encoding):
p(znk∣xn,π,μ,Σ) = p(xn∣znk,π,μ,Σ)p(znk∣π,μ,Σ) = πkp(xn∣μk,Σk)
∑k p(xn∣znk,π,μ,Σ)p(znk∣π,μ,Σ) ∑k πkp(xn∣μk,Σk) ▸ This is not a closed form solution, but it suggests the following algorithm.
EM algorithm
Set π, μ and Σ to some initial values Repeat until π, μ and Σ do not change Compute p(znk∣xn,π,μ,Σ) for all n
Set πk to πML, μk to μML, and Σk to ΣML for all k kkk
/* E step */ /* M step */
19/24
Machine Learning with MapReduce: EM Algorithm
K-means algorithm EM algorithm
20/24
Machine Learning with MapReduce: EM Algorithm
▸ Each iteration of the EM algorithm can easily be casted into two chained MapReduce jobs:
▸ Map function I: For the n-th training point, compute
p(znk∣xn,π,μ,Σ) (1)
and
▸ xnp(znk∣xn,π,μ,Σ). (2)
Reduce function I: Sum up the results (1) of the map tasks and divide it by
N. Sum up the results (2) of the map tasks and divide it by the sum of the ▸ results (1). This gives πML and μML.
kk
Map function II: For the n-th training point, compute
and
p(znk∣xn,π,μ,Σ) (3) (xn −μML)(xn −μML)Tp(z ∣xn,π,μ,Σ). (4)
▸ kknk
Reduce function II: Sum up the results (4) of the map tasks and divide it by
the sum of the results (3). This gives ΣML. k
▸ R=K Notethat1≤M≤N,whereasR=1orR=K inbothjobs,dependingon
whether we decide to use the component index as intermediate key or not. What is the key and what is the value ? What needs to be broadcasted ?
21/24
Machine Learning with MapReduce
22/24
Machine Learning with MapReduce
23/24
Summary
▸ MapReduce is a framework to process large datasets by parallelizing computations.
▸ The user only has to specify the map and reduce functions, and parallelization happens automatically.
▸ Many machine learning algorithms (e.g. SVMs, NNs, MMs, K-means and EM algorithms) can easily be reformulated in terms of such functions.
▸ This does not apply for algorithms based on stochastic gradient descent.
▸ Moreover, MapReduce is inefficient for iterative tasks on the same dataset: Each iteration is a MapReduce call that loads the data anew from disk.
▸ Such iterative tasks are common in many machine learning algorithms, e.g. gradient descent, K-means and EM algorithms.
▸ Solution: Spark framework, in the next lecture.
24/24