Chapter 1: Introduction
COMP9313: Big Data Management
Lecturer: Xin Cao
Course web site: http://www.cse.unsw.edu.au/~cs9313/
4.‹#›
1
About the First Assignment
Problem setting
Example input and output are given
Number of reducers: 1
Make sure that each file can be compiled independently
Remove all debugging relevant code
Submission
Two java files
Two ways
Deadline: 01 Apr 2018, 09:59:59 pm
4.‹#›
Review of Lab 2
Package a MapReduce job as a jar via command line
Eclipse + Hadoop plugin
Connect to HDFS and manage files
Create MapReduce project
Writing MapReduce program
Debugging MapReduce job
Eclipse debug perspective
Print debug info to stdout/stderr and Hadoop system logs
Package a MapReduce job as a jar
Check logs of a MapReduce job
Count the number of words that start with each letter
4.‹#›
Letter Count
Identify the input and output for a given problem:
Input: (docid, doc)
Output: (letter, count)
Mapper design:
Input: (docid, doc)
Output: (letter, 1)
Map idea: for each word in doc, emit a pair in which the key is the starting letter, and the value is 1
Reducer design:
Input: (letter, (1,1,…,1))
Output: (letter, count)
Reduce idea: aggregate all the values for the same key “letter”
Combiner, Reducer and Main are the same as that in WordCount.java
4.‹#›
Mapper
public static class TokenizerMapper
extends Mapper
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
//convert to lowercase
char c = itr.nextToken().toLowerCase().charAt(0);
//check whether the first letter is a character
if(c <= 'z' && c >= ‘a’){
word.set(String.valueOf(c));
context.write(word, one);
}
}
}
}
4.‹#›
MapReduce Algorithm Design Patterns
In-mapper combining, where the functionality of the combiner is moved into the mapper.
The related patterns “pairs” and “stripes” for keeping track of joint events from a large number of observations.
“Order inversion”, where the main idea is to convert the sequencing of computations into a sorting problem.
“Value-to-key conversion”, which provides a scalable solution for secondary sorting.
4.‹#›
Chapter 4: MapReduce III
4.‹#›
Design Pattern 4: Value-to-key Conversion
4.‹#›
Secondary Sort
MapReduce sorts input to reducers by key
Values may be arbitrarily ordered
What if want to sort value as well?
E.g., k → (v1, r), (v3, r), (v4, r), (v8, r)…
Google’s MapReduce implementation provides built-in functionality
Unfortunately, Hadoop does not support
Secondary Sort: sorting values associated with a key in the reduce phase, also called “value-to-key conversion”
4.‹#›
Secondary Sort
Sensor data from a scientific experiment: there are m sensors each taking readings on continuous basis
(t1, m1, r80521)
(t1, m2, r14209)
(t1, m3, r76742)
…
(t2, m1, r21823)
(t2, m2, r66508)
(t2, m3, r98347)
We wish to reconstruct the activity at each individual sensor over time
In a MapReduce program, a mapper may emit the following pair as the intermediate result
m1 -> (t1, r80521)
We need to sort the value according to the timestamp
4.‹#›
Secondary Sort
Solution 1:
Buffer values in memory, then sort
Why is this a bad idea?
Solution 2:
“Value-to-key conversion” design pattern: form composite intermediate key, (m1, t1)
The mapper emits (m1, t1) -> r80521
Let execution framework do the sorting
Preserve state across multiple key-value pairs to handle processing
Anything else we need to do?
Sensor readings are split across multiple keys. Reducers need to know when all readings of a sensor have been processed
All pairs associated with the same sensor are shuffled to the same reducer (use partitioner)
4.‹#›
How to Implement Secondary Sort
in MapReduce?
4.‹#›
Secondary Sort: Another Example
Consider the temperature data from a scientific experiment. Columns are year, month, day, and daily temperature, respectively:
We want to output the temperature for every year-month with the values sorted in ascending order.
4.‹#›
Solutions to the Secondary Sort Problem
Use the Value-to-Key Conversion design pattern:
form a composite intermediate key, (K, V), where V is the secondary key. Here, K is called a natural key. To inject a value (i.e., V) into a reducer key, simply create a composite key
K: year-month
V: temperature data
Let the MapReduce execution framework do the sorting (rather than sorting in memory, let the framework sort by using the cluster nodes).
Preserve state across multiple key-value pairs to handle processing. Write your own partitioner: partition the mapper’s output by the natural key (year-month).
4.‹#›
Secondary Sorting Keys
4.‹#›
Customize The Composite Key
public class DateTemperaturePair
implements Writable, WritableComparable
private Text yearMonth = new Text(); // natural key
private IntWritable temperature = new IntWritable(); // secondary key
… …
@Override
/**
* This comparator controls the sort order of the keys.
*/
public int compareTo(DateTemperaturePair pair) {
int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(pair.getTemperature());
}
return compareValue; // sort ascending
}
… …
}
4.‹#›
Customize The Partitioner
public class DateTemperaturePartitioner
extends Partitioner
@Override
public int getPartition(DateTemperaturePair pair, Text text, int numberOfPartitions) { // make sure that partitions are non-negative
return Math.abs(pair.getYearMonth().hashCode() % numberOfPartitions);
}
}
Utilize the natural key
only for partitioning
4.‹#›
Grouping Comparator
Controls which keys are grouped together for a single call to Reducer.reduce() function.
Configure the grouping comparator using Job object:
public class DateTemperatureGroupingComparator extends WritableComparator {
… …
protected DateTemperatureGroupingComparator(){
super(DateTemperaturePair.class, true);
}
@Override
/* This comparator controls which keys are grouped together into a single call to the reduce() method */
public int compare(WritableComparable wc1, WritableComparable wc2) {
DateTemperaturePair pair = (DateTemperaturePair) wc1;
DateTemperaturePair pair2 = (DateTemperaturePair) wc2;
return pair.getYearMonth().compareTo(pair2.getYearMonth());
}
}
Consider the natural key
only for grouping
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
4.‹#›
MapReduce Algorithm Design
Aspects that are not under the control of the designer
Where a mapper or reducer will run
When a mapper or reducer begins or finishes
Which input key-value pairs are processed by a specific mapper
Which intermediate key-value paris are processed by a specific reducer
Aspects that can be controlled
Construct data structures as keys and values
Execute user-specified initialization and termination code for mappers and reducers (pre-process and post-process)
Preserve state across multiple input and intermediate keys in mappers and reducers (in-mapper combining)
Control the sort order of intermediate keys, and therefore the order in which a reducer will encounter particular keys (order inversion)
Control the partitioning of the key space, and therefore the set of keys that will be encountered by a particular reducer (partitioner)
4.‹#›
MapReduce in Real World: Search Engine
Information retrieval (IR)
Focus on textual information (= text/document retrieval)
Other possibilities include image, video, music, …
Boolean Text retrieval
Each document or query is treated as a “bag” of words or terms. Word sequence is not considered
Query terms are combined logically using the Boolean operators AND, OR, and NOT.
E.g., ((data AND mining) AND (NOT text))
Retrieval
Given a Boolean query, the system retrieves every document that makes the query logically true.
Called exact match
The retrieval results are usually quite poor because term frequency is not considered and results are not ranked
4.‹#›
Boolean Text Retrieval: Inverted Index
The inverted index of a document collection is basically a data structure that
attaches each distinctive term with a list of all documents that contains the term.
The documents containing a term are sorted in the list
Thus, in retrieval, it takes constant time to
find the documents that contains a query term.
multiple query terms are also easy handle as we will see soon.
4.‹#›
Boolean Text Retrieval: Inverted Index
4.‹#›
Search Using Inverted Index
Given a query q, search has the following steps:
Step 1 (vocabulary search): find each term/word in q in the inverted index.
Step 2 (results merging): Merge results to find documents that contain all or some of the words/terms in q.
Step 3 (Rank score computation): To rank the resulting documents/pages, using:
content-based ranking
link-based ranking
Not used in Boolean retrieval
4.‹#›
Boolean Query Processing: AND
Consider processing the query: Brutus AND Caesar
Locate Brutus in the Dictionary;
Retrieve its postings.
Locate Caesar in the Dictionary;
Retrieve its postings.
“Merge” the two postings:
Walk through the two postings simultaneously, in time linear in the total number of postings entries
34
128
2
4
8
16
32
64
1
2
3
5
8
13
21
128
34
2
4
8
16
32
64
1
2
3
5
8
13
21
Brutus
Caesar
2
8
If the list lengths are x and y, the merge takes O(x+y) operations.
Crucial: postings sorted by docID.
4.‹#›
MapReduce it?
The indexing problem
Scalability is critical
Must be relatively fast, but need not be real time
Fundamentally a batch operation
Incremental updates may or may not be important
For the web, crawling is a challenge in itself
The retrieval problem
Must have sub-second response time
For the web, only need relatively few results
Perfect for MapReduce!
Uh… not so good…
4.‹#›
MapReduce: Index Construction
Input: documents: (docid, doc), ..
Output: (term, [docid, docid, …])
E.g., (long, [1, 23, 49, 127, …])
The docid are sorted !! (used in query phase)
docid is an internal document id, e.g., a unique integer. Not an external document id such as a URL
How to do it in MapReduce?
4.‹#›
MapReduce: Index Construction
A simple approach:
Each Map task is a document parser
Input: A stream of documents
(1, long ago …), (2, once upon …)
Output: A stream of (term, docid) tuples
(long, 1) (ago, 1) … (once, 2) (upon, 2) …
Reducers convert streams of keys into streams of inverted lists
Input: (long, [1, 127, 49, 23, …])
The reducer sorts the values for a key and builds an inverted list
Longest inverted list must fit in memory
Output: (long, [1, 23, 49, 127, …])
Problems?
Inefficient
docids are sorted in reducers
4.‹#›
Ranked Text Retrieval
Order documents by how likely they are to be relevant
Estimate relevance(q, di)
Sort documents by relevance
Display sorted results
User model
Present hits one screen at a time, best results first
At any point, users can decide to stop looking
How do we estimate relevance?
Assume document is relevant if it has a lot of query terms
Replace relevance(q, di) with sim(q, di)
Compute similarity of vector representations
Vector space model/cosine similarity, language models, …
4.‹#›
Term Weighting
Term weights consist of two components
Local: how important is the term in this document?
Global: how important is the term in the collection?
Here’s the intuition:
Terms that appear often in a document should get high weights
Terms that appear in many documents should get low weights
How do we capture this mathematically?
TF: Term frequency (local)
IDF: Inverse document frequency (global)
4.‹#›
TF.IDF Term Weighting
weight assigned to term i in document j
number of occurrence of term i in document j
number of documents in entire collection
number of documents with term i
4.‹#›
30
Retrieval in a Nutshell
Look up postings lists corresponding to query terms
Traverse postings for each query term
Store partial query-document scores in accumulators
Select top k results to return
4.‹#›
MapReduce: Index Construction
Input: documents: (docid, doc), ..
Output: (t, [(docid, wt), (docid, w), …])
wt represents the term weight of t in docid
E.g., (long, [(1, 0.5), (23, 0.2), (49, 0.3), (127,0.4), …])
The docid are sorted !! (used in query phase)
How this problem differs from the previous one?
TF computing
Easy. Can be done within the mapper
IDF computing
Known only after all documents containing a term t processed
Input and output of map and reduce?
4.‹#›
Inverted Index: TF-IDF
4.‹#›
MapReduce: Index Construction
A simple approach:
Each Map task is a document parser
Input: A stream of documents
(1, long ago …), (2, once upon …)
Output: A stream of (term, [docid, tf]) tuples
(long, [1,1]) (ago, [1,1]) … (once, [2,1]) (upon, [2,1]) …
Reducers convert streams of keys into streams of inverted lists
Input: (long, {[1,1], [127,2], [49,1], [23,3] …})
The reducer sorts the values for a key and builds an inverted list
Compute TF and IDF in reducer!
Output: (long, [(1, 0.5), (23, 0.2), (49, 0.3), (127,0.4), …])
4.‹#›
MapReduce: Index Construction
4.‹#›
MapReduce: Index Construction
Inefficient: terms as keys, postings as values
docids are sorted in reducers
IDF can be computed only after all relevant documents received
Reducers must buffer all postings associated with key (to sort)
What if we run out of memory to buffer postings?
Improvement?
4.‹#›
The First Improvement
How to make Hadoop sort the docid, instead of doing it in reducers?
Design pattern: value-to-key conversion, secondary sort
Mapper output a stream of ([term, docid], tf) tuples
Remember: you must implement a partitioner on term!
4.‹#›
The Second Improvement
How to avoid buffering all postings associated with key?
We’d like to store the DF at the front of the postings list
But we don’t know the DF until we’ve seen all postings!
Sound familiar?
Design patter: Order inversion
4.‹#›
The Second Improvement
Getting the DF
In the mapper:
Emit “special” key-value pairs to keep track of DF
In the reducer:
Make sure “special” key-value pairs come first: process them to determine DF
Remember: proper partitioning!
Emit normal key-value pairs…
Emit “special” key-value pairs to keep track of df…
Doc1: one fish, two fish
4.‹#›
The Second Improvement
First, compute the DF by summing contributions from all “special” key-value pair…
Write the DF…
Important: properly define sort order to make sure “special” key-value pairs come first!
4.‹#›
Retrieval with MapReduce?
MapReduce is fundamentally batch-oriented
Optimized for throughput, not latency
Startup of mappers and reducers is expensive
MapReduce is not suitable for real-time queries!
Use separate infrastructure for retrieval…
Real world search engines much more complex and sophisticated
4.‹#›
Miscellaneous
4.‹#›
MapReduce Counters
Instrument Job’s metrics
Gather statistics
Quality control – confirm what was expected.
E.g., count invalid records
Application level statistics.
Problem diagnostics
Try to use counters for gathering statistics instead of log files
Framework provides a set of built-in metrics
For example bytes processed for input and output
User can create new counters
Number of records consumed
Number of errors or warnings
4.‹#›
Built-in Counters
Hadoop maintains some built-in counters for every job.
Several groups for built-in counters
File System Counters – number of bytes read and written
Job Counters – documents number of map and reduce tasks launched, number of failed tasks
Map-Reduce Task Counters– mapper, reducer, combiner input and output records counts, time and memory statistics
4.‹#›
User-Defined Counters
You can create your own counters
Counters are defined by a Java enum
serves to group related counters
E.g.,
enum Temperature {
MISSING,
MALFORMED
}
Increment counters in Reducer and/or Mapper classes
Counters are global: Framework accurately sums up counts across all maps and reduces to produce a grand total at the end of the job
4.‹#›
Implement User-Defined Counters
Retrieve Counter from Context object
Framework injects Context object into map and reduce methods
Increment Counter’s value
Can increment by 1 or more
4.‹#›
Implement User-Defined Counters
Get Counters from a finished job in Java
Counter counters = job.getCounters()
Get the counter according to name
Counter c1 = counters.findCounter(Temperature.MISSING)
Enumerate all counters after job is completed
for (CounterGroup group : counters) {
System.out.println(“* Counter Group: ” + group.getDisplayName() + ” (” + group.getName() + “)”);
System.out.println(” number of counters in this group: ” + group.size());
for (Counter counter : group) {
System.out.println(” – ” + counter.getDisplayName() + “: ” + counter.getName() + “: “+counter.getValue());
}
}
4.‹#›
MapReduce SequenceFile
File operations based on binary format rather than text format
SequenceFile class prvoides a persistent data structure for binary key-value pairs, e.g.,
Key: timestamp represented by a LongWritable
Value: quantity being logged represented by a Writable
Use SequenceFile in MapReduce:
job.setinputFormatClass(SequenceFileOutputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
In Mapreduce by default TextInputFormat
4.‹#›
MapReduce Input Formats
InputSplit
A chunk of the input processed by a single map
Each split is divided into records
Split is just a reference to the data (doesn’t contain the input data)
RecordReader
Iterate over records
Used by the map task to generate record key-value pairs
As a MapReduce application programmer, we do not need to deal with InputSplit directly, as they are created in InputFormat
In MapReduce, by default TextInputFormat and LineRecordReader
4.‹#›
MapReduce InputFormat
4.‹#›
MapReduce OutputFormat
4.‹#›
Detailed Hadoop MapReduce Data Flow
4.‹#›
Creating Inverted Index
Given you a large text file containing the contents of huge amount of webpages, in which each webpage starts with “
A sample file
Procedure:
Implement a custom RecordReader
Implement a custom InputFormat, and overwrite the CreateRecordReader() function to return your self-defined RecordReader object
Configure the InputFormat class in the main function using job.setInputFormatClass()
Try to finish this task using the sample file
4.‹#›
Methods to Write MapReduce Jobs
Typical – usually written in Java
MapReduce 2.0 API
MapReduce 1.0 API
Streaming
Uses stdin and stdout
Can use any language to write Map and Reduce Functions
C#, Python, JavaScript, etc…
Pipes
Often used with C++
Abstraction libraries
Hive, Pig, etc… write in a higher level language, generate one or more MapReduce jobs
4.‹#›
Number of Maps and Reduces
Maps
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks.
If you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
Reduces
The right number of reduces seems to be 0.95 or 1.75 multiplied by (
With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.
Use job.setNumReduceTasks(int) to set the number
4.‹#›
MapReduce Advantages
Automatic Parallelization:
Depending on the size of RAW INPUT DATA instantiate multiple MAP tasks
Similarly, depending upon the number of intermediate
Run-time:
Data partitioning
Task scheduling
Handling machine failures
Managing inter-machine communication
Completely transparent to the programmer/analyst/user
EDBT 2011 Tutorial
4.‹#›
56
The Need
Special-purpose programs to process large amounts of data: crawled documents, Web Query Logs, etc.
At Google and others (Yahoo!, Facebook):
Inverted index
Graph structure of the WEB documents
Summaries of #pages/host, set of frequent queries, etc.
Ad Optimization
Spam filtering
EDBT 2011 Tutorial
4.‹#›
57
Map Reduce vs Parallel DBMS
Pavlo et al., SIGMOD 2009, Stonebraker et al., CACM 2010, …
4.‹#›
Practice: Design MapReduce Algorithms
Counting total enrollments of two specified courses
Input Files: A list of students with their enrolled courses
Jamie: COMP9313, COMP9318
Tom: COMP9331, COMP9313
… …
Mapper selects records and outputs initial counts
Input: Key – student, value – a list of courses
Output: (COMP9313, 1), (COMP9318, 1), …
Reducer accumulates counts
Input: (COMP9313, [1, 1, …]), (COMP9318, [1, 1, …])
Output: (COMP9313, 16), (COMP9318, 35)
4.‹#›
Remove duplicate records
Input: a list of records
2013-11-01 aa
2013-11-02 bb
2013-11-03 cc
2013-11-01 aa
2013-11-03 dd
Mapper
Input (record_id, record)
Output (record, “”)
E.g., (2013-11-01 aa, “”), (2013-11-02 bb, “”), …
Reducer
Input (record, [“”, “”, “”, …])
E.g., (2013-11-01 aa, [“”, “”]), (2013-11-02 bb, [“”]), …
Output (record, “”)
Practice: Design MapReduce Algorithms
4.‹#›
Assume that in an online shopping system, a huge log file stores the information of each transaction. Each line of the log is in format of “userID\t product\t price\t time”. Your task is to use MapReduce to find out the top-5 expensive products purchased by each user in 2016
Mapper:
Input(transaction_id, transaction)
initialize an associate array H(UserID, priority queue Q of log record based on price)
map(): get local top-5 for each user
cleanup(): emit the entries in H
Reducer:
Input(userID, list of queues[])
get top-5 products from the list of queues
Practice: Design MapReduce Algorithms
4.‹#›
Practice: Design MapReduce Algorithms
Reverse graph edge directions & output in node order
Input: adjacency list of graph (3 nodes and 4 edges)
(3, [1, 2]) (1, [3])
(1, [2, 3]) (2, [1, 3])
(3, [1])
Note, the node_ids in the output values are also sorted. But Hadoop only sorts on keys!
Solutions: Secondary sort
4.‹#›
Map
Input: (3, [1, 2]), (1, [2, 3]).
Intermediate: (1, [3]), (2, [3]), (2, [1]), (3, [1]). (reverse direction)
Output: (<1, 3>, [3]), (<2, 3>, [3]), (<2, 1>, [1]), (<3, 1>, [1]).
Copy node_ids from value to key.
Partition on Key.field1, and Sort on whole Key (both fields)
Input: (<1, 3>, [3]), (<2, 3>, [3]), (<2, 1>, [1]), (<3, 1>, [1])
Output: (<1, 3>, [3]), (<2, 1>, [1]), (<2, 3>, [3]), (<3, 1>, [1])
Grouping comparator
Merge according to part of the key
Output: (<1, 3>, [3]), (<2, 1>, [1, 3]), (<3, 1>, [1])
this will be the reducer’s input
Reducer
Merge according to part of the key
Output: (1, [3]), (2, [1, 3]), (3, [1])
Practice: Design MapReduce Algorithms
4.‹#›
63
Calculate the common friends for each pair of users in Facebook. Assume the friends are stored in format of Person->[List of Friends], e.g.: A -> [B C D], B -> [A C D E], C -> [A B D E], D -> [A B C E], E -> [B C D]. Your result should be like:
(A B) -> (C D)
(A C) -> (B D)
(A D) -> (B C)
(B C) -> (A D E)
(B D) -> (A C E)
(B E) -> (C D)
(C D) -> (A B E)
(C E) -> (B D)
(D E) -> (B C)
Practice: Design MapReduce Algorithms
4.‹#›
Mapper:
Input(user u, List of Friends [f1, f2, …,])
map(): for each friend fi, emit (, List of Friends [f1, f2, …,])
Reducer:
Input(user u, list of friends lists[])
Get the intersection from all friends lists
Example: http://stevekrenzel.com/articles/finding-friends
Practice: Design MapReduce Algorithms
4.‹#›
References
Data-Intensive Text Processing with MapReduce. Jimmy Lin and Chris Dyer. University of Maryland, College Park.
Hadoop The Definitive Guide. Hadoop I/O, and MapReduce Features chapters.
4.‹#›
End of Chapter 4
4.‹#›
i
j
i
j
i
n
N
w
log
tf
,
,
×
=
j
i
w
,
j
i
,
tf
N
i
n