COMP9313: Big Data Management
Course web site: http://www.cse.unsw.edu.au/~cs9313/
Chapter 2.1: MapReduce
MapReduce Example
❖ Hadoop MapReduce is an implementation of MapReduce ➢ MapReduce is a computing paradigm (Google)
➢ Hadoop MapReduce is an open-source software
Data Structures in MapReduce
❖ Key-value pairs are the basic data structure in MapReduce
➢ Keys and values can be: integers, float, strings, raw bytes
➢ They can also be arbitrary data structures, but must be comparable (for sorting)
❖ The design of MapReduce algorithms involves:
➢ Imposing the key-value structure on arbitrary datasets
E.g.: for a collection of Web pages, input keys may be URLs and values may be the HTML content
➢ In some algorithms, input keys are not used (e.g., wordcount), in others they uniquely identify a record
➢ Keys can be combined in complex ways to design various algorithms
Map and Reduce Functions
❖ Programmers specify two functions: ➢ map (k1, v1) → list [
Map transforms the input into key-value pairs to process ➢ reduce (k2, list [v2]) → [
Reduce aggregates the list of values for each key
All values with the same key are sent to the same reducer ➢ list [
❖ The MapReduce environment takes in charge of everything else…
❖ A complex program can be decomposed as a succession of Map and Reduce tasks
Understanding MapReduce
➢ (K1, V1)→ Info in
Input Split ➢ list (K2, V2)
Key / Value out (intermediate values)
One list per local node
Can implement local Reducer (or
Shuffle/Sort>>
(K2, list(V2))→
Shuffle / Sort phase
precedes Reduce phase
Combines Map output into a list
list (K3, V3)
Usually aggregates intermediate values
(input)
Let’s count number of each word in documents (e.g., Tweets/Blogs) ➢ Reads input pair
The input to the mapper is in format of
➢ Outputs pairs
The output of the mapper is in format of
➢ After shuffling and sort, reducer receives
➢ The output is in format of
A Brief View of MapReduce
Shuffle and Sort
➢ Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
➢ The framework groups Reducer inputs by keys (since different
Mappers may have output the same key) in this stage. ❖ Hadoop framework handles the Shuffle and Sort step .
“Hello World” in MapReduce
“Hello World” in MapReduce
➢ Key-value pairs: (docid, doc) of a file stored on the distributed
filesystem
➢ docid : unique identifier of a document
➢ doc: is the text of the document itself
➢ Takes an input key-value pair, tokenize the line
➢ Emits intermediate key-value pairs: the word is the key, and the integer is the value
❖ The framework:
➢ Guarantees all values associated with the same key (the word)
are brought to the same reducer ❖ The reducer:
➢ Receives all values associated to some keys
➢ Sums the values and writes output key-value pairs: the key is the word, and the value is the number of occurrences
Write Your Own
MapReduce Program
❖ A MapReduce program consists of the following 3 parts:
➢ Driver → main (would trigger the map and reduce methods)
➢ It is better to include the map reduce and main methods in 3 different classes
❖ Check detailed information of all classes at: https://hadoop.apache.org/docs/r3.3.1/api/allclasses-noframe.html
public class TokenizerMapper
extends Mapper
Mapper Explanation
❖ Maps input key/value pairs to a set of intermediate key/value pairs. //Map class header
public class TokenizerMapper
extends Mapper
What is Writable?
❖ Hadoop defines its own “box” classes for strings (Text), integers (IntWritable), etc.
❖ All values must implement interface Writable
❖ All keys must implement interface WritableComparable
❖ Writable is a serializable object which implements a simple, efficient, serialization protocol
Mapper Explanation (
//Map method header
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
➢ Object key/Text value: Data type of the input Key and Value to the mapper
➢ Context: An inner class of Mapper, used to store the context of a running task. Here it is used to collect data output by either the Mapper or the Reducer, i.e. intermediate outputs or the output of the job
➢ Exceptions: IOException, InterruptedException
➢ This function is called once for each key/value pair in the input
split. Your application should override this to do your job.
Mapper Explanation (
//Use a string tokenizer to split the document into words
StringTokenizer itr = new StringTokenizer(value.toString());
//Iterate through each word and a form key value pairs
while (itr.hasMoreTokens()) {
//Assign each work from the tokenizer(of String type) to a Text ‘word’
word.set(itr.nextToken());
//Form key value pairs for each word as
context.write(word, one);
❖ Map function produces Map.Context object ➢ Map.context() takes (k, v) elements
❖ Any (WritableComparable, Writable) can be used
public class IntSumReducer
extends Reducer
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable
Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
result.set(sum); context.write(key, result);
Reducer Explanation
//Reduce Header similar to the one in map with different key/value data type
public class IntSumReducer
extends Reducer
//data from map will be <”word”,{1,1,..}>, so we get it with an Iterator and thus we can go through the sets of values
public void reduce(Text key, Iterable
Context context) throws IOException, InterruptedException{
//Initaize a variable ‘sum’ as 0
int sum = 0;
//Iterate through all the values with respect to a key and sum up all of them
for (IntWritable val : values) { sum += val.get();
// Form the final key/value pairs results for each word using context
result.set(sum); context.write(key, result);
Main (Driver)
public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
Job job = Job.getInstance(conf, “word count”); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
Main(The Driver)
❖ Given the Mapper and Reducer code, the short main() starts the MapReduce running
❖ The Hadoop system picks up a bunch of values from the command line on its own
❖ Then the main() also specifies a few key parameters of the problem in the Job object
❖ Job is the primary interface for a user to describe a map-reduce job to the Hadoop framework for execution (such as what Map and Reduce classes to use and the format of the input and output files)
❖ Other parameters, i.e. the number of machines to use, are optional and the system will determine good values for them if not specified
❖ Then the framework tries to faithfully execute the job as-is described by Job
Main Explanation
//Creating a Configuration object and a Job object, assigning a job name for identification purposes
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, “word count”);
➢ Job Class: It allows the user to configure the job, submit it, control its execution, and query the state. Normally the user creates the application, describes various facets of the job via Job and then submits the job and monitor its progress.
//Setting the job’s jar file by finding the provided class location
job.setJarByClass(WordCount.class);
//Providing the mapper and reducer class names
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
//Setting configuration object with the Data Type of output Key and Value for map and reduce
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
Main Explanation (
//The hdfs input and output directory to be fetched from the command line
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //Submit the job to the cluster and wait for it to finish. System.exit(job.waitForCompletion(true) ? 0 : 1);
Make It Running
❖ Configure environment variables
export JAVA_HOME=…
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
❖ Compile WordCount.java and create a jar: $ hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
❖ Put files to HDFS
$ hdfs dfs –put YOURFILES input
❖ Run the application
$ hadoop jar wc.jar WordCount input output
❖ Check the results $ hdfs dfs –cat output/*
❖ Often a Map task will produce many pairs of the form (k,v1), (k,v2), … for the same key k
➢ E.g., popular words in the word count example
❖ Combiners are a general mechanism to reduce the amount of
intermediate data, thus saving network time
➢ They could be thought of as “mini-reducers”
❖ Warning!
➢ The use of combiners must be thought carefully
Optional in Hadoop: the correctness of the algorithm cannot depend on computation (or even execution) of the combiners
A combiner operates on each map output key. It must have the same output key-value types as the Mapper class.
A combiner can produce summary information from a large dataset because it replaces the original Map output
➢ Works only if reduce function is commutative and associative (explained later)
In general, reducer and combiner are not interchangeable 2.26
❖ Combiner combines the values of all keys of a single mapper node (single machine):
❖ Much less data needs to be copied and shuffled!
❖ If combiners take advantage of all opportunities for local aggregation
we have at most m × V intermediate key-value pairs ➢ m: number of mappers
➢ V: number of unique terms in the collection
❖ Note: not all mappers will see all terms 2.27
❖ In WordCount.java, you only need to add the follow line to Main: job.setCombinerClass(IntSumReducer.class);
➢ This is because in this example, Reducer and Combiner do the same thing
➢ Note: Most cases this is not true!
➢ You need to write an extra combiner class ❖ Given two files:
➢ file1: Hello World Bye World
➢ file2: Hello Hadoop Bye Hadoop ❖ The first map emits:
➢ < Hello, 1> < World, 2> < Bye, 1> ❖ The second map emits:
➢ < Hello, 1> < Hadoop, 2> < Bye, 1>
Partitioner
❖ Partitioner controls the partitioning of the keys of the intermediate map-outputs.
➢ The key (or a subset of the key) is used to derive the partition, typically by a hash function.
➢ The total number of partitions is the same as the number of reduce tasks for the job.
This controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
❖ System uses HashPartitioner by default: ➢ hash(key) mod R
❖ Sometimes useful to override the hash function:
➢ E.g., hash(hostname(URL)) mod R ensures URLs from a host
end up in the same output file
https://www.unsw.edu.au/faculties and
https://www.unsw.edu.au/about-us will be stored in one file ❖ Job sets Partitioner implementation (in Main)
MapReduce: Recap
❖ Programmers must specify:
➢ map (k1, v1) → [(k2, v2)]
➢ reduce (k2, [v2]) → [
➢ All values with the same key are reduced together
❖ Optionally, also:
➢ combine (k2, [v2]) → [
Mini-reducers that run in memory after the map phase
Used as an optimization to reduce network traffic ➢ partition (k2, number of partitions) → partition for k2
Often a simple hash of the key, e.g., hash(k2) mod n
Divides up key space for parallel reduce operations ❖ The execution framework handles everything else…
MapReduce: Recap
Write Your Own
in Python?
Hadoop Streaming
❖ Hadoop streaming allows us to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. For example:
➢ -input: specify the input folder
➢ -output: specify the output folder
➢ -mapper: specify the mapper script/executable
➢ -reducer: specify the reducer script/executable
➢ The mapper and reducer read the input from stdin (line by line) and emit the output to stdout
❖ Thus, you can use other languages such as C++ or Python to write MapReduce programs
Hadoop Streaming
❖ When an executable is specified for mappers, each mapper task will launch the executable as a separate process when the mapper is initialized.
❖ As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process.
❖ In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper.
❖ By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value.
❖ If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized by setting – inputformat command option, as discussed later.
Hadoop Streaming
❖ When an executable is specified for reducers, each reducer task will launch the executable as a separate process then the reducer is initialized.
❖ As the reducer task runs, it converts its input key/values pairs into lines and feeds the lines to the stdin of the process.
❖ In the meantime, the reducer collects the line oriented outputs from the stdout of the process, converts each line into a key/value pair, which is collected as the output of the reducer.
❖ By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value. However, this can be customized by setting -outputformat command option, as discussed later.
❖ Let’s test our mapper.py locally that it is working fine or not. ➢ Make it executable by “chmod +x mapper.py”
➢ cat inputText | python mapper.py
➢ The output of the mapper is shown below
➢ Let’s store it in a temporal file “intermediateResult”: cat inputText | python mapper.py > intermediateResult
❖ Let’s test our reducer.py locally that it is working fine or not.
➢ Make it executable by “chmod +x reducer.py”
➢ Run “cat intermediateResult | sort -k1,1 | python reducer.py”
➢ sort is a Linux command, used to sort a file, arranging the records in a particular order
-k[n,m] Option: sorting the records on the basis of columns n to m. Here, “sort -k1,1” means sorting the key-value pairs based on the keys (the first column)
Run On Hadoop
❖ Start HDFS and YARN
❖ Store your input files into a folder in HDFS
❖ Utilize the hadoop-streaming jar file to run MapReduce streaming jobs:
➢ -input: The input folder in HDFS
➢ -output: The output folder in HDFS storing the results ➢ -mapper: the mapper class
➢ -reducer: the reducer class
❖ Check your result on HDFS: hdfs dfs –cat output/part*
hadoop jar hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar \ -input input \
-output output \
-mapper /home/comp9313/mapper.py \ -reducer /home/comp9313/reducer.py
Run On Hadoop
❖ The python file do not need to pre-exist on the machines in the cluster; however, if they don’t, you will need to use “-file” option to tell the framework to pack them as a part of job submission. For example:
➢ The option “-file /home/comp9313/mapper.py” causes the python executable shipped to the cluster machines as a part of job submission.
❖ Using a combiner: add the “-combiner” option ➢ -combiner /home/comp9313/combiner.py
hadoop jar hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar \ -input input \
-output output \
-mapper /home/comp9313/mapper.py \ -reducer /home/comp9313/reducer.py \ -file /home/comp9313/mapper.py \
-file /home/comp9313/reducer.py
❖ MRJob is the easiest route to writing Python programs that run on Hadoop. If you just need to run local MapReduce jobs, you even do not need to install Hadoop.
➢ You can test your code locally without installing Hadoop
➢ You can run it on a cluster of your choice.
➢ MRJob has extensive integration with AWS EMR and Google Dataproc. Once you’re set up, it’s as easy to run your job in the cloud as it is to run it on your laptop.
❖ MRJob has a number of features that make writing MapReduce jobs easier. In MRJob, you can:
➢ Keep all MapReduce code for one job in a single class.
➢ Easily upload and install code and data dependencies at runtime.
➢ Switch input and output formats with a single line of code.
➢ Automatically download and parse error logs for Python tracebacks.
➢ Put command line filters before or after your Python code. 2.42
❖ Open a file called mr_word_count.py and type this into it:
❖ Run the code locally: python mr_word_count.py inputText 2.43
❖ A job is defined by a class that inherits from MRJob. This class contains methods that define the steps of your job.
❖ A step consists of a mapper, a combiner and a reducer. All of these are optional, though you must have at least one. So you could have a step that’s just a mapper, or just a combiner and a reducer.
❖ When you only have one step, all you have to do is write methods called mapper(), combiner() and reducer().
❖ The mapper() method takes a key and a value as args and yields as many key-value pairs as it likes.
❖ The reduce() method takes a key and an iterator of values, and also yields as many key-value pairs as it likes.
❖ The final required component of a job file is to include the following two lines at the end of the file, every time:
➢ These lines pass control over the command line arguments and execution to mrjob. Without them, your job will not work.
if __name__ == ‘__main__’:
MRWordCounter.run() # where MRWordCounter is your job class
Run in Different Ways
❖ The most basic way to run your job is on the command line, using: ➢ python my_job.py input.txt
➢ By default, the output will be written to stdout.