COMP9313: Big Data Management
Course web site: http://www.cse.unsw.edu.au/~cs9313/
: MapReduce III
Design Pattern 3: Order Inversion
Computing Relative Frequencies
❖ “Relative” Co-occurrence matrix construction
➢ Similar problem as before, same matrix
➢ Instead of absolute counts, we take into consideration the fact that some words appear more frequently than others
Word wi may co-occur frequently with word wj simply because one of the two is very common
➢ We need to convert absolute counts to relative frequencies f(wj|wi) What proportion of the time does wj appear in the context of wi?
❖ Formally, we compute:
➢ N(·, ·) is the number of times a co-occurring word pair is observed ➢ The denominator is called the marginal
❖ In the reducer, the counts of all words that co-occur with the conditioning variable (wi) are available in the associative array
❖ Hence, the sum of all those counts gives the marginal
❖ Then we divide the joint counts by the marginal and we’re done
a → {b1:3, b2 :12, b3 :7, b4 :1, … }
f(b1|a) = 3 / (3 + 12 + 7 + 1 + …)
❖ Problems? ➢ Memory
: “Stripes”
❖ The reducer receives the pair (wi, wj) and the count
❖ From this information alone it is not possible to compute f(wj|wi)
➢ Computing relative frequencies requires marginal counts
➢ But the marginal cannot be computed until you see all counts
((a, b1), {1, 1, 1, …})
No way to compute f(b1|a) because the marginal is unknown
❖ Solution 1: Fortunately, as for the mapper, also the reducer can preserve state across multiple keys
➢ We can buffer in memory all the words that co-occur with wi and their counts
➢ This is basically building the associative array in the stripes method
a → {b1:3, b2 :12, b3 :7, b4 :1, … } is now buffered in the reducer side
➢ Problems?
If reducers receive pairs not sorted
((a, b1), {1, 1, 1, …}) ((c, d1), {1, 1, 1, …}) ((a, b2), {1, 1, 1, …}) ……
When can we compute the marginal?
❖ We must define the sort order of the pair !!
➢ In this way, the keys are first sorted by the left word, and then by
the right word (in the pair)
➢ Hence, we could detect if all pairs associated with the word we are conditioning on (wi) have been seen
➢ At this point, we can use the in-memory buffer, compute the relative frequencies and emit
((a, b1), {1, 1, 1, …}) and ((a, b2), {1, 1, 1, …}) may be assigned to different reducers!
Default partitioner computed based on the whole key.
❖ We must define an appropriate partitioner
➢ The default partitioner is based on the hash value of the
intermediate key, modulo the number of reducers
➢ For a complex key, the raw byte representation is used to compute the hash value
Hence, there is no guarantee that the pair (dog, aardvark) and (dog,zebra) are sent to the same reducer
➢ What we want is that all pairs with the same left word are sent to the same reducer
❖ Still suffer from the memory problem! 3.9
❖ Better solutions? (a, *) → 32
Reducer holds this value in memory, rather than the stripe
(a, b1) → 3
(a, b2) → 12
(a, b3) → 7
(a, b4) → 1 ……
(a, b1) → 3 / 32 (a, b2) → 12 / 32 (a, b3) → 7 / 32 (a, b4) → 1 / 32
❖ The key is to properly sequence data presented to reducers
➢ If it were possible to compute the marginal in the reducer before processing the join counts, the reducer could simply divide the joint counts received from mappers by the marginal
➢ The notion of “before” and “after” can be captured in the ordering of key-value pairs
➢ The programmer can define the sort order of keys so that data needed earlier is presented to the reducer before data that is needed later
❖ A better solution based on order inversion
❖ The mapper:
➢ additionally emits a “special” key of the form (wi, ∗)
➢ The value associated to the special key is one, that represents the contribution of the word pair to the marginal
➢ Using combiners, these partial marginal counts will be aggregated before being sent to the reducers
❖ The reducer:
➢ We must make sure that the special key-value pairs are processed before any other key-value pairs where the left word is wi (define sort order)
➢ We also need to guarantee that all pairs associated with the same word are sent to the same reducer (use partitioner)
Order Inversion
❖ Example:
➢ The reducer finally receives:
Order Inversion
➢ The pairs come in order, and thus we can compute the relative frequency immediately.
❖ Memory requirements:
➢ Minimal, because only the marginal (an integer) needs to be
➢ No buffering of individual co-occurring word
➢ No scalability bottleneck
❖ Key ingredients for order inversion
➢ Emit a special key-value pair to capture the marginal
➢ Control the sort order of the intermediate key, so that the special key-value pair is processed first
➢ Define a custom partitioner for routing intermediate key-value pairs
Order Inversion
Order Inversion
❖ Common design pattern
➢ Computing relative frequencies requires marginal counts
➢ But marginal cannot be computed until you see all counts
➢ Buffering is a bad idea!
➢ Trick: getting the marginal counts to arrive at the reducer before the joint counts
❖ Optimizations
➢ Apply in-memory combining pattern to accumulate marginal counts
Synchronization: Pairs vs. Stripes
❖ Approach 1: turn synchronization into an ordering problem
➢ Sort keys into correct order of computation
➢ Partition key space so that each reducer gets the appropriate set of partial results
➢ Hold state in reducer across multiple key-value pairs to perform computation
➢ Illustrated by the “pairs” approach
❖ Approach 2: construct data structures that bring partial results
➢ Each reducer receives all the data it needs to complete the computation
➢ Illustrated by the “stripes” approach
How to Implement
in MapReduce?
Order Inversion
Implement a Custom
❖ You need to implement a “pair” class first as the key data type ❖ A customized partitioner extends the Partitioner class
public static class YourPatitioner extends Partitioner
➢ The key and value are the intermediate key and value produced by the map function
➢ In the relevant frequencies computing problem
public static class FirstPatitioner extends Partitioner
❖ It overrides the getPartition function, which has three parameters public int getPartition(WritableComparable key, Writable value, int numPartitions)
➢ The numPartitions is the number of reducers used in the MapReduce program and it is specified in the driver program (by default 1)
➢ In the relevant frequencies computing problem
Partitioner
public int getPartition(StringPair key, IntWritable value, int numPartitions){ return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
Partitioner in Hadoop Streaming
❖ Hadoop has a library class, KeyFieldBasedPartitioner, that is useful for many applications. This class allows the Map/Reduce framework to partition the map outputs based on certain key fields, not the whole keys.
➢ “-D stream.map.output.field.separator=.” specifies “.” as the field separator for the map outputs. By default, the separator is ‘\t’
➢ “-D stream.num.map.output.key.fields=4” means the prefix up to the fourth “.” in a line will be the key and the rest of the line (excluding the fourth “.”) will be the value.
Partitioner in Hadoop Streaming
➢ “-D map.output.key.field.separator=.” means the separator for the key is also “.”
➢ “-D mapreduce.partition.keypartitioner.options=-k1,2” means MapReduce will partition the map outputs by the first two fields of the keys
➢ This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
Partitioner in Hadoop Streaming
❖ For the relative frequency computation task, you can do like:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D stream.map.output.field.separator=\\t \
-D stream.num.map.output.key.fields=1 \
-D map.output.key.field.separator=, \
-D mapreduce.partition.keypartitioner.options=-k1,1 \ -D mapreduce.job.reduces=2 \
-input input \
-output output \
-mapper mapper.py \
-reducer reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -file mapper.py \
-file reducer.py
Partitioner in
❖ In your class, configure JOBCONF, like:
JOBCONF = { ‘map.output.key.field.separator’: ‘,’,
‘mapred.reduce.tasks’:2, ‘mapreduce.partition.keypartitioner.options’:’-k1,1′, ‘partitioner’:’org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner’
➢ You also need to add one line “SORT_VALUES = True” into your code.
➢ Assume each key is a pair of strings separated by “,” like “term,102”. Hadoop performs the sorting based on the whole key. However, the above configure would let Hadoop know that the partitioning is only based the first field of the key (i.e., “term”).
Design Pattern 4: Value
Secondary Sort
❖ MapReduce sorts input to reducers by key ➢ Values may be arbitrarily ordered
❖ What if want to sort value as well?
➢ E.g., k → (v1, r), (v3, r), (v4, r), (v8, r)…
➢ Google’s MapReduce implementation provides built-in functionality
➢ Unfortunately, Hadoop does not support
❖ Secondary Sort: sorting values associated with a key in the reduce
phase, also called “value-to-key conversion”
Secondary Sort
❖ Sensor data from a scientific experiment: there are m sensors each taking readings on continuous basis
(t1, m1, r80521) (t1, m2, r14209) (t1, m3, r76742) …
(t2, m1, r21823) (t2, m2, r66508) (t2, m3, r98347)
❖ We wish to reconstruct the activity at each individual sensor over time
❖ In a MapReduce program, a mapper may emit the following pair as
the intermediate result m1 -> (t1, r80521)
➢ We need to sort the value according to the timestamp 3.24
Secondary Sort
❖ Solution 1:
➢ Buffer values in memory, then sort ➢ Why is this a bad idea?
❖ Solution 2:
➢ “Value-to-key conversion” design pattern: form composite
intermediate key, (m1, t1)
The mapper emits (m1, t1) -> r80521
➢ Let execution framework do the sorting
➢ Preserve state across multiple key-value pairs to handle
processing
➢ Anything else we need to do?
Sensor readings are split across multiple keys. Reducers need to know when all readings of a sensor have been processed
All pairs associated with the same sensor are shuffled to the same reducer (use partitioner)
How to Implement
in MapReduce?
Secondary Sort
Secondary Sort
Another Example
❖ Consider the temperature data from a scientific experiment. Columns are year, month, day, and daily temperature, respectively:
❖ We want to output the temperature for every year-month with the values sorted in ascending order.
Solutions to the Secondary Sort Problem
❖ Use the Value-to-Key Conversion design pattern:
➢ form a composite intermediate key, (K, V), where V is the secondary key. Here, K is called a natural key. To inject a value (i.e., V) into a reducer key, simply create a composite key
K: year-month
V: temperature data
Let the MapReduce execution framework do the sorting (rather than sorting in memory, let the framework sort by using the cluster nodes).
❖ Preserve state across multiple key-value pairs to handle processing. Write your own partitioner: partition the mapper’s output by the natural key (year-month).
Secondary Sorting Keys
Customize The Composite Key
public class DateTemperaturePair
implements Writable, WritableComparable
private Text yearMonth = new Text(); // natural key
private IntWritable temperature = new IntWritable(); // secondary key ……
* This comparator controls the sort order of the keys.
public int compareTo(DateTemperaturePair pair) {
int compareValue = this.yearMonth.compareTo(pair.getYearMonth()); if (compareValue == 0) {
compareValue = temperature.compareTo(pair.getTemperature()); }
return compareValue; // sort ascending }
Customize The Partitioner
public class DateTemperaturePartitioner
extends Partitioner
public int getPartition(DateTemperaturePair pair, Text text, int numberOfPartitions) { // make sure that partitions are non-negative
return Math.abs(pair.getYearMonth().hashCode() % numberOfPartitions); }
Utilize the natural key only for partitioning
Grouping Comparator
❖ Controls which keys are grouped together for a single call to Reducer.reduce() function.
public class DateTemperatureGroupingComparator extends WritableComparator { ……
protected DateTemperatureGroupingComparator(){
super(DateTemperaturePair.class, true);
/* This comparator controls which keys are grouped together into a single call to the
reduce() method */
public int compare(WritableComparable wc1, WritableComparable wc2) {
DateTemperaturePair pair = (DateTemperaturePair) wc1; DateTemperaturePair pair2 = (DateTemperaturePair) wc2; return pair.getYearMonth().compareTo(pair2.getYearMonth());
❖ Configure the grouping comparator using Job object: job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
Consider the natural key only for grouping
Secondary Sort by Hadoop Streaming
❖ Hadoop has a library class, KeyFieldBasedComparator, that is useful for secondary sort.
➢ The map output keys of the above Map/Reduce job have four fields separated by “.”
➢ MapReduce will sort the outputs by the second field of the keys using the -D mapreduce.partition.keycomparator.options=-k2,2nr option
-n specifies that the sorting is numerical sorting -r specifies that the result should be reversed
MapReduce Algorithm Design
❖ Aspects that are not under the control of the designer
➢ Where a mapper or reducer will run
➢ When a mapper or reducer begins or finishes
➢ Which input key-value pairs are processed by a specific mapper
➢ Which intermediate key-value paris are processed by a specific reducer
❖ Aspects that can be controlled
➢ Construct data structures as keys and values
➢ Execute user-specified initialization and termination code for mappers and reducers (pre-process and post-process)
➢ Preserve state across multiple input and intermediate keys in mappers and reducers (in-mapper combining)
➢ Control the sort order of intermediate keys, and therefore the order in which a reducer will encounter particular keys (order inversion)
➢ Control the partitioning of the key space, and therefore the set of keys that will be encountered by a particular reducer (partitioner)
Application: Building
Inverted Index
MapReduce in Real World: Search Engine
❖ Information retrieval (IR)
➢ Focus on textual information (= text/document retrieval) ➢ Other possibilities include image, video, music, …
❖ Boolean Text retrieval
➢ Each document or query is treated as a “bag” of words or terms.
Word sequence is not considered
➢ Query terms are combined logically using the Boolean operators AND, OR, and NOT.
E.g., ((data AND mining) AND (NOT text))
➢ Retrieval
Given a Boolean query, the system retrieves every document that makes the query logically true.
Called exact match
➢ The retrieval results are usually quite poor because term
frequency is not considered and results are not ranked
Boolean Text Retrieval: Inverted Index
❖ The inverted index of a document collection is basically a data structure that
➢ attaches each distinctive term with a list of all documents that contains the term.
➢ The documents containing a term are sorted in the list
❖ Thus, in retrieval, it takes constant time to
➢ find the documents that contains a query term.
➢ multiple query terms are also easy handle as we will see soon.
Boolean Text Retrieval: Inverted Index
Search Using Inverted Index
❖ Given a query q, search has the following steps:
➢ Step 1 (vocabulary search): find each term/word in q in the
inverted index.
➢ Step 2 (results merging): Merge results to find documents that contain all or some of the words/terms in q.
➢ Step 3 (Rank score computation): To rank the resulting documents/pages, using:
content-based ranking
link-based ranking
Not used in Boolean retrieval
Boolean Query Processing: AND
Consider processing the query: ND Caesar ➢ Locate Brutus in the Dictionary;
Retrieve its postings.
➢ Locate Caesar in the Dictionary;
Retrieve its postings. ➢ “Merge” the two postings:
Walk through the two postings simultaneously, in time linear in the total number of postings entries
If the list lengths are x and y, the merge takes O(x+y) operations. Crucial: postings sorted by docID.
MapReduce it?
❖ The indexing problem
➢ Scalability is critical
➢ Must be relatively fast, but need not be real time ➢ Fundamentally a batch operation
➢ Incremental updates may or may not be important ➢ For the web, crawling is a challenge in itself
❖ The retrieval problem
➢ Must have sub-second response time
➢ For the web, only need relatively few results
MapReduce: Index Construction
❖ Input: documents: (docid, doc), ..
❖ Output: (term, [docid, docid, …])
➢ E.g., (long, [1, 23, 49, 127, …])
The docid are sorted !! (used in query phase)
➢ docid is an internal document id, e.g., a unique integer. Not an
external document id such as a URL ❖ How to do it in MapReduce?
MapReduce: Index Construction
❖ A simple approach:
➢ Each Map task is a document parser
Input: A stream of documents
– (1, long ago …), (2, once upon …)
Output: A stream of (term, docid) tuples
– (long, 1) (ago, 1) … (once, 2) (upon, 2) …
➢ Reducers convert streams of keys into streams of inverted lists
Input: (long, [1, 127, 49, 23, …])
The reducer sorts the values for a key and builds an inverted list
– Longest inverted list must fit in memory Output: (long, [1, 23, 49, 127, …])
❖ Problems?
➢ Inefficient
➢ docids are sorted in reducers 3.43
Ranked Text Retrieval
❖ Order documents by how likely they are to be relevant ➢ Estimate relevance(q, di)
➢ Sort documents by relevance
➢ Display sorted results
❖ User model
➢ Present hits one screen at a time, best results first ➢ At any point, users can decide to stop looking
❖ How do we estimate relevance?
➢ Assume document is relevant if it has a lot of query terms ➢ Replace relevance(q, di) with sim(q, di)
➢ Compute similarity of vector representations
❖ Vector space model/cosine similarity, language models, … 3.44
Term Weighting
❖ Term weights consist of two components
➢ Local: how important is the term in this document? ➢ Global: how important is the term in the collection?
❖ Here’s the intuition:
➢ Terms that appear often in a document should get high weights ➢ Terms that appear in many documents should get low weights
❖ How do we capture this mathematically?
➢ TF: Term frequency (local)
➢ IDF: Inverse document frequency (global)
TF.IDF Term Weighting
w =tf logN
weight assigned to term i in document j
number of occurrence of term i in document j number of documents in entire collection number of documents with term i
Retrieval in a Nutshell
❖ Look up postings lists corresponding to query terms ❖ Traverse postings for each query term
❖ Store partial query-document scores in accumulators ❖ Select top k results to return
MapReduce: Index Construction
❖ Input: documents: (docid, doc), ..
❖ Output: (t, [(docid, wt), (docid, w), …])
➢ wt represents the term weight of t in docid
➢ E.g., (long, [(1, 0.5), (23, 0.2), (49, 0.3), (127,0.4), …])
The docid are sorted !! (used