CIS 455/555: Internet and Web Systems
MapReduce October 28, 2020
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
1
Plan for today
n XFilter
n Google File System (also Hadoop DFS)
n GFS/HDFS and MapReduce
n Key assumptions
n Architecture and operation NEXT
n Introduction to MapReduce n Programming model
n Data flow
n A few simple example tasks
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
2
Primary/backup (in general)
X¬7 X?
Ordered requests X¬2, X?, X¬7
Backups
X¬2
n Scenario: Multiple replicas, concurrent
requests from different clients n How to ensure consistency?
n Idea: Designate one replica as the primary n Accepts all requests, orders them, and then forwards
ordered requests to the backups
n When should the primary report success to a client? n What has to happen when the primary fails?
n What kind of consistency does this provide? © 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
3
Primary
© 2020 A. Haeberlen, Z. Ives, V. Liu
University of Pennsylvania
4
Chunkservers
n Each holds replicas of some of the chunks
n For write operations, one of the owners of
the chunk gets a lease – becomes the
primary and all others the secondary n Receives requests for mutations
n Assigns an order
n Notifies the secondary nodes
n Waitsforalltosaytheyreceivedthemessage
n Responds with a write-succeeded message
n Failures during writes can result in inconsistency!! n Howbadisthis?
© 2020 A. Haeberlen, Z. Ives, V. Liu
University of Pennsylvania
5
Step-by-step: A write operation
1. Client asks Master for lease-owning chunkserver
2. Master gives ID of primary, secondary chunkservers; client caches
3. Client sends its data to all replicas, in any order (in a chain; why?)
4. Once client gets ACK, it requests primary to do a write of those data items. Primary assigns serial numbers to these operations.
5. Primary forwards write to secondaries
6. Secondaries reply “SUCCESS”
7. Primary replies to client
Append
n GFS supports atomic append that multiple machines can use at the same time
n Primary will interleave the requests in any
order
n Will be written “at least once”!
n GFS may insert padding or record duplicates!
n Primary determines a position for the write, forwards this to the secondaries
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
6
Failures and the client
n If there is a failure in a record write or append, the client will generally retry
n If there was “partial success” in a previous append, there might be more than one copy on some replicas – and inconsistency
n Client must handle this through checksums, record IDs, and periodic checkpointing
n Why does GFS choose this weak consistency
model? Wouldn’t strong consistency be better? n What are the costs and benefits of strong consistency?
n Do Google’s applications need the main benefits?
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
7
Interesting details (see paper)
n Where to create new chunks? n Try to place replicas in different racks
n Try to balance disk utilization on the chunk servers
n Try to avoid too many recent creations on any individual svr
n Re-replication
n Prioritizes chunks with lower replication level
n Periodic re-balancing n Garbage collection
n Undelete
n Shadow masters
n Checksumming and scanning while idle © 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
8
GFS performance
n Many performance numbers in the paper
n Not enough context here to discuss them in much detail – would need to see how they compare with other approaches!
n But: validate high scalability in terms of
concurrent reads, concurrent appends, with data partitioned and replicated across many machines
n Also show fast recovery from failed nodes
n Not the only approach to many of these problems, but one shown to work at industrial-strength!
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
9
A Google Retrospective
https://cloud.google.com/files/storage_architecture_and_challenge s.pdf
n Ultimately,GFSscaledtoabout10PB,50Mfilesbeforereachingits limits
n Master(s) didn’t scale enough, data sometimes got corrupted
n ReplacedbyColossus
n Here data was automatically sharded: file.0, file.1, … n Reed-Solomon error correcting codes
https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction n Got rid of centralized metadata – sharded instead
n Client-side caching, wide-area
n
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
10
Recap: Google File System (GFS)
n An interesting reduced-consistency model n Google uses it to stage data in processing /
analysis
n A record-oriented file storage system, where the records can sometimes be corrupted or duplicated
n Relaxed consistency model
n Later: We’ll revisit consistency in the context of replication
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
11
Plan for today
n Google File System
n Introduction to MapReduce
n Programming model n Data flow
n Example tasks
n Introduction to MapReduce n Programming model
n Data flow
n A few simple example tasks
NEXT
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
12
Analogy: National census
n Suppose we have 10,000 employees, whose job is to collate census forms and to determine how many people live in each city
n How would you organize this task? © 2020 A. Haeberlen, Z. Ives, V. Liu
13
https://www.census.gov/programs-surveys/decennial-census/technical-documentation/questionnaires/2020.html
A lot of tricky aspects!
n What is the main challenge?
n Are the individual tasks complicated?
n If not, what makes this so challenging?
n Suppose people take vacations, get sick, work at different rates
n Suppose some forms are incorrectly filled out and require corrections or need to be thrown away
n How big should the stacks be? n How do we monitor progress?
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
14
I don’t want to deal with all this!!!
n Wouldn’t it be nice if there were some system that took care of all these details for you?
n But every task is different!
n Or is it? The details are different (what to compute, etc.),
but the data flow is often the same!
n Maybe we can have a ‘generic’ solution?
n Ideally, you’d just tell the system what needs to be done
n That’s the MapReduce framework. © 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
15
What is MapReduce?
n A famous distributed programming model
n In many circles, considered the key building block for much of Google’s data analysis
n A programming language built on it: Sawzall, http://labs.google.com/papers/sawzall.html
n … Sawzall has become one of the most widely used programming languages at Google. … [O]n one dedicated Workqueue cluster with 1500 Xeon CPUs, there were 32,580 Sawzall jobs launched, using an average of 220 machines each. While running those jobs, 18,636 failures occurred (application failure, network outage, system crash, etc.) that triggered rerunning some portion of the job. The jobs read a total of 3.2×1015 bytes of data (2.8PB) and wrote 9.9×1012 bytes (9.3TB).
n Other similar languages: Yahoo’s Pig Latin and Pig; Microsoft’s Dryad
n Cloned in open source: Hadoop, http://hadoop.apache.org/core/
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
16
The MapReduce programming model
n Simple distributed functional programming primitives n Modeled after Lisp primitives:
map (apply function to all items in a collection) and reduce (apply function to set of items with a common key)
n We start with:
n A user-defined function to be applied to all data,
map: (item_key, value) à (stack_key, value’) n Another user-specified operation
reduce: (stack_key, {set of value’}) à result n A set of n nodes, each with data
n All nodes run map on all of their data, producing new data with keys
n This data is collected by key, then shuffled, reduced
n Dataflow is through temp files on GFS © 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
17
map(String key, String value) {
// key: document name, line no
// value: contents of line
for each word w in value:
emit(w, “1”)
}
Simple example: Word count
reduce(String key, Iterator values) {
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
emit(key, result)
}
n Goal: Given a set of documents, count how often each word occurs
n Input: Key-value pairs (document:lineNumber, text) n Output: Key-value pairs (word, #occurrences)
n What should be the intermediate key-value pairs?
Key design question!
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
18
Simple example: Word count
Key range the node is responsible for
(1, the apple) (2, is an apple)
(3, not an orange) (4, because the)
(5, orange)
(6, unlike the apple) (7, is orange)
(8, not green)
1 Each mapper receives some
of the KV-pairs as input
3 Each KV-pair output by the mapper is sent
to the reducer that is responsible for it
4 The reducers sort their input
by key and group it
5 The reducers process their
input one group at a time
19
Mapper ( 1 – 2 )
Mapper (3-4)
Mapper (5-6)
Mapper (7-8)
2 The mappers process the
KV-pairs one by one
(apple,1)(ap(applep,le1,)({a1p,p1le,,11}) ( a ( n a , n 1 , ) { ( 1a n, , 1 1} ) ) (be(bcaeucasue,se{,11}) ( g r( eg er en e, n{ , 1 1} ) )
(i(sis, ,1{)1(i,s,11})
(not, 1)(not, 1) (not, {1, 1})
(orange, 1)(ora(nogrea,n1g)e(,o{r1a,ng1e, ,11}) (the, 1)(the, 1{1)(,t1h,e,11}) (un(ulinkelik, e{,11})
Reducer ( A – G )
Reducer (H-N)
Reducer (O-U)
Reducer (V-Z)
(apple, 3) (an, 2) (because, 1) (green, 1)
(is, 2) (not, 2)
(orange, 3) (the, 3) (unlike, 1)
© 2020 A. Haeberlen, Z. Ives, V. Liu
University of Pennsylvania
MapReduce dataflow
In practice, mappers and reducers usually run on the same set of machines!
Intermediate (key,value) pairs
Mapper Mapper Mapper Mapper
Reducer
Reducer
Reducer
Reducer
What makes this so scalable?
© 2020 A. Haeberlen, Z. Ives, V. Liu
University of Pennsylvania
20
“The Shuffle”
Input data
Output data
© 2020 A. Haeberlen, Z. Ives, V. Liu
MapReduce system components
n To make this work, we need a few more parts… n The file system (distributed across all nodes):
n Stores the inputs, outputs, and temporary results
n The driver program (executes on one node): n Specifies where to find the inputs, the outputs
n Specifies what mapper and reducer to use
n Can customize behavior of the execution
n The runtime system (controls nodes): n Supervises the execution of tasks
n Esp. JobTracker
21
The Underlying MapReduce data flow
Coordinator
Reduce
computation partitions
(Default MapReduce uses Filesystem)
Data partitions by key
Map computation partitions
© 2020 A. Haeberlen, Z. Ives, V. Liu
University of Pennsylvania
Redistribution by output’s key (“shuffle”)
22
What if a node crashes?
n How will we know?
n Master pings every worker periodically
n What to do when a worker crashes?
n Failed map task on node A: Reexecute on another node B, and notify all the workers executing reduce tasks
n IfreducetaskhasnotreadallthedatafromAyet,itwillreadfromB
n Failed reduce task: If not complete yet, reexecute on another node
n Intermediateoutputsfrommaptasksarestoredlocallyonthemapper, whereas outputs from reduce tasks are in the distributed file system
n What to do when the master crashes?
n Could periodically checkpoint state & restart from there
n Or just abort the computation – is this a good idea? © 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
23
Other challenges
n Locality
n Try to schedule map task on machine that already has data
n Task granularity
n How many map tasks? How many reduce tasks?
n Dealing with stragglers n Schedule some backup tasks
n Saving bandwidth n E.g., with combiners
n Handling bad records
n “Last gasp” packet with current sequence number
© 2020 A. Haeberlen, Z. Ives, V. Liu University of Pennsylvania
24