Chapter 1: Introduction
COMP9313: Big Data Management
Lecturer: Xin Cao
Course web site: http://www.cse.unsw.edu.au/~cs9313/
3.‹#›
1
Chapter 3: MapReduce II
3.‹#›
Overview of Previous Lecture
Motivation of MapReduce
Data Structures in MapReduce: (key, value) pairs
Map and Reduce Functions
Hadoop MapReduce Programming
Mapper
Reducer
Combiner
Partitioner
Driver
3.‹#›
Combiner Function
To minimize the data transferred between map and reduce tasks
Combiner function is run on the map output
Both input and output data types must be consistent with the output of mapper (or input of reducer)
But Hadoop do not guarantee how many times it will call combiner function for a particular map output record
It is just optimization
The number of calling (even zero) does not affect the output of Reducers
Applicable on problems that are commutative and associative
Commutative: max(a, b) = max(b, a)
Associative: max (max(a, b), c) = max(a, max(b, c))
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
3.‹#›
MapReduce Algorithm Design Patterns
3.‹#›
Design Pattern 1: In-mapper Combining
3.‹#›
Importance of Local Aggregation
Ideal scaling characteristics:
Twice the data, twice the running time
Twice the resources, half the running time
Why can’t we achieve this?
Data synchronization requires communication
Communication kills performance
Thus… avoid communication!
Reduce intermediate data via local aggregation
Combiners can help
3.‹#›
WordCount Baseline
What’s the impact of combiners?
3.‹#›
Word Count: Version 1
Are combiners still needed?
3.‹#›
Word Count: Version 2
Key: preserve state across
input key-value pairs!
3.‹#›
Design Pattern for Local Aggregation
“In-mapper combining”
Fold the functionality of the combiner into the mapper by preserving state across multiple map calls
Advantages
Speed
Why is this faster than actual combiners?
Disadvantages
Explicit memory management required
Potential for order-dependent bugs
3.‹#›
Combiner Design
Combiners and reducers share same method signature
Sometimes, reducers can serve as combiners
Often, not…
Remember: combiner are optional optimizations
Should not affect algorithm correctness
May be run 0, 1, or multiple times
Example: find average of all integers associated with the same key
3.‹#›
Computing the Mean: Version 1
Why can’t we use reducer as combiner?
Mean(1, 2, 3, 4, 5) != Mean(Mean(1, 2), Mean(3, 4, 5))
3.‹#›
Computing the Mean: Version 2
Why doesn’t this work?
Combiners must have the same input and output type, consistent with the input of reducers (output of mappers)
3.‹#›
Computing the Mean: Version 3
Fixed?
Check the correctness by removing the combiner
3.‹#›
Computing the Mean: Version 4
3.‹#›
How to Implement In-mapper Combiner
in MapReduce?
3.‹#›
Lifecycle of Mapper/Reducer
Lifecycle: setup -> map -> cleanup
setup(): called once at the beginning of the task
map(): do the map
cleanup(): called once at the end of the task.
We do not invoke these functions
In-mapper Combining:
Use setup() to initialize the state preserving data structure
Use clearnup() to emit the final key-value pairs
3.‹#›
Word Count: Version 2
setup()
cleanup()
3.‹#›
Design Pattern 2: Pairs vs Stripes
3.‹#›
Term Co-occurrence Computation
Term co-occurrence matrix for a text collection
M = N x N matrix (N = vocabulary size)
Mij: number of times i and j co-occur in some context
(for concreteness, let’s say context = sentence)
specific instance of a large counting problem
A large event space (number of terms)
A large number of observations (the collection itself)
Goal: keep track of interesting statistics about the events
Basic approach
Mappers generate partial counts
Reducers aggregate partial counts
How do we aggregate partial counts efficiently?
3.‹#›
First Try: “Pairs”
Each mapper takes a sentence
Generate all co-occurring term pairs
For all pairs, emit (a, b) → count
Reducers sum up counts associated with these pairs
Use combiners!
3.‹#›
“Pairs” Analysis
Advantages
Easy to implement, easy to understand
Disadvantages
Lots of pairs to sort and shuffle around (upper bound?)
Not many opportunities for combiners to work
3.‹#›
Another Try: “Stripes”
Idea: group together pairs into an associative array
Each mapper takes a sentence:
Generate all co-occurring term pairs
For each term, emit a → { b: countb, c: countc, d: countd … }
Reducers perform element-wise sum of associative arrays
(a, b) → 1
(a, c) → 2
(a, d) → 5
(a, e) → 3
(a, f) → 2
a → { b: 1, c: 2, d: 5, e: 3, f: 2 }
a → { b: 1, d: 5, e: 3 }
a → { b: 1, c: 2, d: 2, f: 2 }
a → { b: 2, c: 2, d: 7, e: 3, f: 2 }
+
Key: cleverly-constructed data structure
brings together partial results
3.‹#›
Stripes: Pseudo-Code
3.‹#›
“Stripes” Analysis
Advantages
Far less sorting and shuffling of key-value pairs
Can make better use of combiners
Disadvantages
More difficult to implement
Underlying object more heavyweight
Fundamental limitation in terms of size of event space
3.‹#›
Compare “Pairs” and “Stripes”
Cluster size: 38 cores
Data Source: Associated Press Worldstream (APW) of the English Gigaword Corpus (v3), which contains 2.27 million documents (1.8 GB compressed, 5.7 GB uncompressed)
3.‹#›
Pairs vs. Stripes
The pairs approach
Keep track of each team co-occurrence separately
Generates a large number of key-value pairs (also intermediate)
The benefit from combiners is limited, as it is less likely for a mapper to process multiple occurrences of a word
The stripe approach
Keep track of all terms that co-occur with the same term
Generates fewer and shorted intermediate keys
The framework has less sorting to do
Greatly benefits from combiners, as the key space is the vocabulary
More efficient, but may suffer from memory problem
These two design patterns are broadly useful and frequently observed in a variety of applications
Text processing, data mining, and bioinformatics
3.‹#›
How to Implement “Pairs” and “Stripes”
in MapReduce?
3.‹#›
Serialization
Process of turning structured objects into a byte stream for transmission over a network or for writing to persistent storage
Deserialization is the reverse process of serialization
Requirements
Compact
To make efficient use of storage space
Fast
The overhead in reading and writing of data is minimal
Extensible
We can transparently read data written in an older format
Interoperable
We can read or write persistent data using different language
3.‹#›
Writable Interface
Hadoop defines its own “box” classes for strings (Text), integers (IntWritable), etc.
Writable is a serializable object which implements a simple, efficient, serialization protocol
All values must implement interface Writable
All keys must implement interface WritableComparable
context.write(WritableComparable, Writable)
You cannot use java primitives here!!
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
3.‹#›
3.‹#›
Writable Wrappers for Java Primitives
There are Writable wrappers for all the Java primitive types except shot and char (both of which can be stored in an IntWritable)
get() for retrieving and set() for storing the wrapped value
Variable-length formats
If a value is between -122 and 127, use only a single byte
Otherwise, use first byte to indicate whether the value is positive or negative and how many bytes follow
3.‹#›
Writable Examples
Text
Writable for UTF-8 sequences
Can be thought of as the Writable equivalent of java.lang.String
Maximum size is 2GB
Use standard UTF-8
Text is mutable (like all Writable implementations, except NullWritable)
Different from java.lang.String
You can reuse a Text instance by calling one of the set() method
NullWritable
Zero-length serialization
Used as a placeholder
A key or a value can be declared as a NullWritable when you don’t need to use that position
3.‹#›
Stripes Implementation
A stripe key-value pair a → { b: 1, c: 2, d: 5, e: 3, f: 2 }:
Key: the term a
Value: the stripe { b: 1, c: 2, d: 5, e: 3, f: 2 }
In Java, easy, use map (hashmap)
How to represent this stripe in MapReduce?
MapWritable: the wrapper of Java map in MapReduce
put(Writable key, Writable value)
get(Object key)
containsKey(Object key)
containsValue(Object value)
entrySet(), returns Set
More details please refer to https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/MapWritable.html
3.‹#›
Pairs Implementation
Key-value pair (a, b) → count
Value: count
Key: (a, b)
In Java, easy, implement a pair class
How to store the key in MapReduce?
You must customize your own key, which must implement interface WritableComparable!
First start from a easier task: when the value is a pair, which must implement interface Writable
3.‹#›
Multiple Output Values
If we are to output multiple values for each key
E.g., a pair of String objects, or a pair of int
How do we do that?
WordCount output a single number as the value
Remember, our object containing the values needs to implement the Writable interface
We could use Text
Value is a string of comma separated values
Have to convert the values to strings, build the full string
Have to parse the string on input (not hard) to get the values
3.‹#›
Implement a Custom Writable
Suppose we wanted to implement a custom class containing a pair of integers. Call it IntPair.
How would we implement this class?
Needs to implement the Writable interface
Instance variables to hold the values
Construct functions
A method to set the values (two integers)
A method to get the values (two integers)
write() method: serialize the member variables (two integers) objects in turn to the output stream
readFields() method: deserialize the member variables (two integers) in turn from the input stream
As in Java: hashCode(), equals(), toString()
3.‹#›
Implement a Custom Writable
Implement the Writable interface
Instance variables to hold the values
Construct functions
set() method
public class IntPair implements Writable {
private int first, second;
public IntPair() {
}
public IntPair(int first, int second) {
set(first, second);
}
public void set(int left, int right) {
first = left;
second = right;
}
3.‹#›
Implement a Custom Writable
get() method
write() method
Write the two integers to the output stream in turn
readFields() method
Read the two integers from the input stream in turn
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
3.‹#›
Complex Key
If the key is not a single value
E.g., a pair of String objects, or a pair of int
How do we do that?
The co-occurrence matrix problem, a pair of terms as the key
Our object containing the values needs to implement the WritableComparable interface
Why Writable is not competent?
We could use Text again
Value is a string of comma separated values
Have to convert the values to strings, build the full string
Have to parse the string on input (not hard) to get the values
Objects are compared according to the full string!!
3.‹#›
Implement a Custom WritableComparable
Suppose we wanted to implement a custom class containing a pair of String objects. Call it StringPair.
How would we implement this class?
Needs to implement the WritableComparable interface
Instance variables to hold the values
Construct functions
A method to set the values (two String objects)
A method to get the values (two String objects)
write() method: serialize the member variables (i.e., two String) objects in turn to the output stream
readFields() method: deserialize the member variables (i.e., two String) in turn from the input stream
As in Java: hashCode(), equals(), toString()
compareTo() method: specify how to compare two objects of the self-defind class
3.‹#›
Implement a Custom WritableComparable
implement the Writable interface
Instance variables to hold the values
Construct functions
set() method
public class StringPair implements WritableComparable
private String first, second;
public StringPair() {
}
public StringPair(String first, String second) {
set(first, second);
}
public void set(String left, String right) {
first = left;
second = right;
}
3.‹#›
get() method
write() method
Utilize WritableUtils.
readFields() method
public String getFirst() {
return first;
}
public String getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
String[] strings = new String[] { first, second };
WritableUtils.writeStringArray(out, strings);
}
public void readFields(DataInput in) throws IOException {
String[] strings = WritableUtils.readStringArray(in);
first = strings[0];
second = strings[1];
}
Implement a Custom WritableComparable
3.‹#›
compareTo() method:
Implement a Custom WritableComparable
public int compareTo(StringPair o) {
int cmp = compare(first, o.getFirst());
if(cmp != 0){
return cmp;
}
return compare(second, o.getSecond());
}
private int compare(String s1, String s2){
if (s1 == null && s2 != null) {
return -1;
} else if (s1 != null && s2 == null) {
return 1;
} else if (s1 == null && s2 == null) {
return 0;
} else {
return s1.compareTo(s2);
}
}
3.‹#›
You can also make the member variables as Writable objects
Instance variables to hold the values
Construct functions
set() method
Implement a Custom WritableComparable
private Text first, second;
public StringPair() {
set(new Text(), new Text());
}
public StringPair(Text first, Text second) {
set(first, second);
}
public void set(Text left, Text right) {
first = left;
second = right;
}
3.‹#›
get() method
write() method
Delegated to Text
readFields() method
Delegated to Text
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
Implement a Custom WritableComparable
3.‹#›
In some cases such as secondary sort, we also need to override the hashCode() method.
Because we need to make sure that all key-value pairs associated with the first part of the key are sent to the same reducer!
By doing this, partitioner will only use the hashCode of the first part.
You can also write a paritioner to do this job
Implement a Custom WritableComparable
public int hashCode()
return first.hashCode();
}
3.‹#›
Design Pattern 3: Order Inversion
3.‹#›
Computing Relative Frequencies
“Relative” Co-occurrence matrix construction
Similar problem as before, same matrix
Instead of absolute counts, we take into consideration the fact that some words appear more frequently than others
Word wi may co-occur frequently with word wj simply because one of the two is very common
We need to convert absolute counts to relative frequencies f(wj|wi)
What proportion of the time does wj appear in the context of wi?
Formally, we compute:
N(·, ·) is the number of times a co-occurring word pair is observed
The denominator is called the marginal
3.‹#›
f(wj|wi) : “Stripes”
In the reducer, the counts of all words that co-occur with the conditioning variable (wi) are available in the associative array
Hence, the sum of all those counts gives the marginal
Then we divide the joint counts by the marginal and we’re done
Problems?
Memory
a → {b1:3, b2 :12, b3 :7, b4 :1, … }
f(b1|a) = 3 / (3 + 12 + 7 + 1 + …)
3.‹#›
f(wj|wi) : “Pairs”
The reducer receives the pair (wi, wj) and the count
From this information alone it is not possible to compute f(wj|wi)
Computing relative frequencies requires marginal counts
But the marginal cannot be computed until you see all counts
((a, b1), {1, 1, 1, …})
No way to compute f(b1|a) because the marginal is unknown
3.‹#›
f(wj|wi) : “Pairs”
Solution 1: Fortunately, as for the mapper, also the reducer can preserve state across multiple keys
We can buffer in memory all the words that co-occur with wi and their counts
This is basically building the associative array in the stripes method
Problems?
a → {b1:3, b2 :12, b3 :7, b4 :1, … }
is now buffered in the reducer side
3.‹#›
f(wj|wi) : “Pairs”
We must define the sort order of the pair !!
In this way, the keys are first sorted by the left word, and then by the right word (in the pair)
Hence, we can detect if all pairs associated with the word we are conditioning on (wi) have been seen
At this point, we can use the in-memory buffer, compute the relative frequencies and emit
If reducers receive pairs not sorted
((a, b1), {1, 1, 1, …})
((c, d1), {1, 1, 1, …})
((a, b2), {1, 1, 1, …})
… …
When we can compute the marginal?
3.‹#›
f(wj|wi) : “Pairs”
We must define an appropriate partitioner
The default partitioner is based on the hash value of the intermediate key, modulo the number of reducers
For a complex key, the raw byte representation is used to compute the hash value
Hence, there is no guarantee that the pair (dog, aardvark) and (dog,zebra) are sent to the same reducer
What we want is that all pairs with the same left word are sent to the same reducer
Still suffer from the memory problem!
((a, b1), {1, 1, 1, …}) and ((a, b2), {1, 1, 1, …}) may be
assigned to different reducers!
Default partitioner computed based on the whole key.
3.‹#›
f(wj|wi) : “Pairs”
Better solutions?
The key is to properly sequence data presented to reducers
If it were possible to compute the marginal in the reducer before processing the join counts, the reducer could simply divide the joint counts received from mappers by the marginal
The notion of “before” and “after” can be captured in the ordering of key-value pairs
The programmer can define the sort order of keys so that data needed earlier is presented to the reducer before data that is needed later
(a, b1) → 3
(a, b2) → 12
(a, b3) → 7
(a, b4) → 1
…
(a, *) → 32
(a, b1) → 3 / 32
(a, b2) → 12 / 32
(a, b3) → 7 / 32
(a, b4) → 1 / 32
…
Reducer holds this value in memory, rather than the stripe
3.‹#›
f(wj|wi) : “Pairs” – Order Inversion
A better solution based on order inversion
The mapper:
additionally emits a “special” key of the form (wi, ∗)
The value associated to the special key is one, that represents the contribution of the word pair to the marginal
Using combiners, these partial marginal counts will be aggregated before being sent to the reducers
The reducer:
We must make sure that the special key-value pairs are processed before any other key-value pairs where the left word is wi (define sort order)
We also need to guarantee that all pairs associated with the same word are sent to the same reducer (use partitioner)
3.‹#›
f(wj|wi) : “Pairs” – Order Inversion
Example:
The reducer finally receives:
The pairs come in order, and thus we can compute the relative frequency immediately.
3.‹#›
f(wj|wi) : “Pairs” – Order Inversion
Memory requirements:
Minimal, because only the marginal (an integer) needs to be stored
No buffering of individual co-occurring word
No scalability bottleneck
Key ingredients for order inversion
Emit a special key-value pair to capture the marginal
Control the sort order of the intermediate key, so that the special key-value pair is processed first
Define a custom partitioner for routing intermediate key-value pairs
3.‹#›
Order Inversion
Common design pattern
Computing relative frequencies requires marginal counts
But marginal cannot be computed until you see all counts
Buffering is a bad idea!
Trick: getting the marginal counts to arrive at the reducer before the joint counts
Optimizations
Apply in-memory combining pattern to accumulate marginal counts
3.‹#›
Synchronization: Pairs vs. Stripes
Approach 1: turn synchronization into an ordering problem
Sort keys into correct order of computation
Partition key space so that each reducer gets the appropriate set of partial results
Hold state in reducer across multiple key-value pairs to perform computation
Illustrated by the “pairs” approach
Approach 2: construct data structures that bring partial results together
Each reducer receives all the data it needs to complete the computation
Illustrated by the “stripes” approach
3.‹#›
How to Implement Order Inversion
in MapReduce?
3.‹#›
Implement a Custom Partitioner
You need to implement a “pair” class first as the key data type
A customized partitioner extends the Partitioner class
The key and value are the intermediate key and value produced by the map function
In the relevant frequencies computing problem
It overrides the getPartition function, which has three parameters
The numPartitions is the number of reducers used in the MapReduce program and it is specified in the driver program (by default 1)
In the relevant frequencies computing problem
public static class YourPatitioner extends Partitioner
public static class FirstPatitioner extends Partitioner
public int getPartition(WritableComparable key, Writable value, int numPartitions)
public int getPartition(StringPair key, IntWritable value, int numPartitions){
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
3.‹#›
References
Chapters 3.3, 3.4, 4.2, 4.3, and 4.4. Data-Intensive Text Processing with MapReduce. Jimmy Lin and Chris Dyer. University of Maryland, College Park.
Chapter 5 Hadoop I/O. Hadoop The Definitive Guide.
3.‹#›
End of Chapter3
3.‹#›
/docProps/thumbnail.jpeg