程序代写 SOSP 2003, West Lafayette native(Purdue) )

Data management techniques depend heavily on the size of data relative to storage capacity
Does it fit in memory (fast)? SSD (pretty fast)? Disk (slow)? Multiple disks (very slow)?
Important Digression
Memorize this now! Why?

Copyright By PowCoder代写 加微信 powcoder

Range for this course

Data Management in the Cloud Module 2: Map Reduce
Module Outline
1. Unstructured data
●What is it? Why is it common
●What is Map Reduce? Why is it important
2. How does Map Reduce manage data
●What is a distributed file system
●Adding hardware to improve throughput ●Writing better software to improve throughput
Most slides are by of University of Waterloo and Elmasri/Navathe (Chap 26)

Unstructured Data
● Structured data
● Each entry is a record
● Formatted for access from a database
● Tabular, relational
● Limitation
– Not all data collected is structured
● Unstructured data
– Some data is formatted for a database, but:
● Some data is irrelevant or not well formatted for database access
● Multiple formats for each type of record
● Data for a single record is not stored near each other (either physically or logically)
● Attributes vary across records
– Limited indication of data type; Must “look” at records to distinguish types – E.g., a simple text file

Unstructured Data cont.
Great example of unstructured data: Files on your laptop
These files contain records about music interests. But, the records are in multiple formats
Can you think of other examples?

Unstructured Data cont.
https://www.dubber.net/unlocking-unstructured-data-voice-processing-power-zoe/

Unstructured Data Example cont.
1 website, many types of data
– Opinion pieces, box scores, summaries, ads, user comments, etc.
Business goals set by league president
– Does the content reflect business goals?

Unstructured Data Example cont.
Business goal: Each team should have a popular player
Business goal: Highlight principles shared across the fan base, e.g., winning, hard work, practice, etc.
Hire a data scientist to:
Download each web page (or frame) and store as a file content1 → “……”; content2 → “…. ”
Find out which players names appear most frequently?
Find correlations between names and words reflecting key principles?
How would you solve these problems?

Iterate over a large number of records Extract something of interest from each Shuffle and sort intermediate results Aggregate intermediate results Generate final output
Typical Big Data Problem
Key idea: provide a functional abstraction for these two operations

Map Reduce
Programmers specify two functions: map (k1, v1) → []
reduce (k2, [v2]) → []
 All values with the same key are sent to the same reducer
ki → file name
vi → file contents
or if the configuration is different  ki → file name & line number
vi → contents on the specified line

Basic Map Reduce: One Map and One Reduce
k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6
a1b2c3c6a5c2b7c8 Shuffle and Sort: aggregate values by keys

Real Map Reduce: Many Map and Reduce
k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6
a1b2 c3c6 a5c2 b7c8 Shuffle and Sort: aggregate values by keys
a15 b27 c2368
r1 s1 r2 s2 r3 s3

Map Reduce Example – The Components
Web Crawler Collects Data Programmer Writes Code
Elena scores 25
Class Mapper
Method Map (docid k, contents v) {
For each String word in v { // String token Emit (word, 1)
Elena and Candace are All Stars
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Class Reducer
Method Reduce (key k, Counts c[]) {
For each int cnt in c {
sum += cnt }
if (isName(k) )
Emit (String(k+”-output”), sum)

Map Reduce Example – The Execution
Output From Map Instance 1
Elena scores 25
Elena and Candace are All Stars
Output From Map Instance 2
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Output after Shuffle and Sort
Elena and Candace
Shuffle and Sort: aggregate values by keys Brittany

Map Reduce Example – The Execution cont.

Candace – output
Brittany- output
Candace is the most frequently mentioned player on the website, followed by Elena.

Map Reduce
Programmers specify two functions: map (k1, v1) → []
reduce (k2, [v2]) → []
 All values with the same key are sent to the same reducer
The execution framework handles everything else…
Getting the Lingo Right:
Map and Reduce are functions, that is, code designed to take inputs and produce outputs.
A function’s signature describes the number and data types of its inputs/outputs
The invocation of a function (the execution of the code) is a task. A map task is the running execution of the map function. A job is the collection of map tasks need to process a set of inputs. If the job is parallelized, multiple tasks run concurrently.
The machine that runs a set of map tasks is called a map instance

k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6
a1b2 c3c6 a5c2 b7c8 combine combine combine combine
a1b2 c9 a5c2 b7c8
a15 b27 c298
Shuffle and Sort: aggregate values by keys
r1 s1 r2 s2 r3 s3

Tabular viewpoint
Platform – attr.
Unstruct. Data
k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6
mNum ….. 1754
c36 combine
Shuffle and Sort: aggregate values by keys
How is mNum defined?
a1b2 c3c6 a5c2 b7c8 combine combine combine
a1b2 c9 a5c2 b7c8
a15 b27 c298
Option (1): Random number between 0
and maximum servers supported (e.g., 2048). Then, MR uses modulo to send keys.
Option (2): Quasi round robin (default in Hadoop)
Option (3): Based on numeric interpretation of key Spark allows programmer to specify “partitions”

Map Reduce Example – The Components
Web Crawler Collects Data Programmer Writes Code
Elena scores 25
Class Mapper
Method Map (docid k, contents v) {
For each String word in v { // String token Emit (“wnbaWords”, word)
Elena and Candace are All Stars
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Class Reducer
Method Reduce (key k, Counts c[]) {
sum{} = 0 // define hashtable For each string member in c {
sum{member} += 1 }
for each word in getKeys(sum) { if (isName(word) )
Emit (String(word+”-output”), sum{c} ) }

Map Reduce Example – The Execution
Output From Map Instance 1
wnbaW Mercury
Elena scores 25
Elena and Candace are All Stars
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Output after Shuffle and Sort
Shuffle and Sort: aggregate values by keys Mercury
Map instance 1 Map instance 2

Combiners in detail
Same signature as the reduce function
Combiner (k2, [v2]) → [k3 , v3]
Running on the same node as the map tasks They may or may not run, purely an optimization Platform decides when to execute combiners
Essentially a map-side reduce
Pushes aggregation operation sooner

Classification of aggregates
In our Map-Reduce examples, Reduce takes each value grouped by key: F(x1,x2,x3,…,xn) where F is the reduce function and X is the [v]
Distributive: Function F() is distributive if there is a function G() such that F({X}) = G(F({X1}), F({X2}), …)
Example: SUM (F=G=SUM) where X1 is a subset of X Algebraic: Function F() is algebraic if there is a function G()
that uses O(1) memory and a function H() such that F({X}) = H({G({X1}), G({X2}), …})
Holistic: Aggregate function F() is holistic if there is no constant bound on the size of the memory needed to describe a sub-aggregate function.
Example: AVERAGE G={SUM, COUNT}
H=Add sums, divide by sum of counts

Classification of aggregates cont.
Distributive: A Combiner can execute F() as well as final Reducer. Alternatively, multiple iterations of Map- Reduce can produce two Reduces that execute F()
Algebraic: Combiner executes G() while Reduce executes H(). Because G() is O(1) memory, for any non- trivial subset Xi , there is less (or equal) compute burden
compared to if G() were omitted F(X).
Holistic: Combiners not generally preferred because there is little opportunity to combine.

Unstructured Data Example cont.
Oh, excuse me. Did I say WNBA? I meant NBA. No, I meant all professional sports? No, I meant all corporate entities?
The same problem is relevant across many domains
At some point, data doesn’t fit on your laptop How do mappers & reducers find the files they need?
 A distributed file system is the answer, e.g., Hadoop Distributed File System (HDFS)

HDFS Architecture
HDFS namenode
File namespace
block 3df2
block 3df2
HDFS datanode 2
Linux file system
Map Reduce Scheduler
HDFS Client
Whe(frile niasmefi,lbelocxk?id) (block id, block location)
Datanode 1
Launch map
Create new file
instructions to datanode mapdatanode state
HDFS Client
(block id, byte range)
HDFS Client
worker OK block data
HDFS datanode 1
Linux file system
Adapted from ( et al., SOSP 2003, West Lafayette native(Purdue) )

Namenode Responsibilities
Managing the file system namespace:
 Holds file/directory structure, metadata, file-to-block mapping,
access permissions, etc. Coordinating file operations:
 Directs clients to datanodes for reads and writes
 No data is moved through the namenode Maintaining overall health:
 Periodic communication with the datanodes  Block re-replication and rebalancing
 Garbage collection

Distributed File Systems
Companies like Google, Apple and Facebook run map reduce jobs over 10,000+ machines at multiple locations (called datacenters) hourly!
Challenge: Improve throughput for distributed file systems (and hence map reduce)

Data Management in the Cloud Module 2: Map Reduce
Module Outline
1. Unstructured data
●What is it? Why is it common
●What is Map Reduce? Why is it important
2. How does Map Reduce manage data
●What is a distributed file system
●Adding hardware to improve throughput ●Writing better software to improve throughput

ogle (Data Center in Oklahoma)
The datacenter is the computer!

Anatomy of a Datacenter
Source: Barroso and Urs Hölzle (2009)

Building Blocks
Rack (32 – 84 blades)
Blade or 1U server
Rack switch
End-of-row switch covers (128 – 1600 blades)
A switch is a simple networking device that transmits messages between any two ports at fixed rate.
Source: Barroso and Urs Hölzle (2009)

Datacenter Characterization & Map
Source: Barroso and Urs Hölzle (2009)
MEM: 8GB, 100ns, 20 GB/s Disk: 2TB, 10ms, 200 MB/s

Bottleneck Analysis
Simple models to (1) purchase machines, (2) guide software design and (3) target systems problems
Map Reduce is bound by storage
Use peak throughput of each storage device
Model components involved for types of Map Reduce workload
Data center characterization: Bandwidth across nodes, racks, etc. Data center map: Nodes per rack, racks per ToR switch, etc.
Throughput-Capacity Curve depicts throughput across various Map Reduce workloads (configurations)
Answers to key questions
Source: Barroso and Urs Hölzle (2009)

Map Reduce Workloads
Recall: Map tasks read a list of key value pairs, process them, and push results to reducers
Assume: Reading data is the slowest aspect of a job (i.e., the bottleneck)
Key observation: The hardware/software components involved in a Map task depend on where the data stored.
k1 v1 k2 v2
Source: Barroso and Urs Hölzle (2009)

Map Reduce Workloads
Key observation: The hardware/software components involved in a Map task depend on where the data stored.
Local Disk k1 v1 k2 v2
Rack Mem k1 v1 k2 v2
(1) issue a system call (2) operating system invokes disk controller (3) controller reads data and stores in memory (4) OS returns a pointer to memory
(1) issue a system call
(2) OS invokes NIC
(3) send msg through rack switch
(4) Remote OS stores msg in mem (5) App reads msg; new syscall
(6) Remote OS invokes disk controller which reads data in mem
(7) Remote app sends msg back; syscall to NIC to local memory
(1) issue a system call
(2) OS invokes NIC
(3) send msg through rack switch (4) Remote OS stores msg in mem (5) Remote app sends msg back; syscall to NIC to local memory

Bottleneck Analysis
Modeling storage capacity
Storage capacity is the range of data a Map task could access C = N * S(t,n) ; N = number of nodes that can be accessed; S(t,n) = Storage per node for device t
If map tasks can access all disks on a rack?
C = 80 * 2 TB = 160 TB
If maps tasks can access only local memory? C = 1 * 8 GB = 8 GB

Bottleneck Analysis
Modeling throughput
The slowest component determines throughput Model the components involved
Min (c1, c2, c3 ….)
If map tasks can access all disks on a rack?
min (rack switch, local disk, local memory) = 100 MB/s
If maps tasks can access only local memory? min (local memory) = 20 GB/s

Throughput-Capacity Curve
10000000 1000000 100000 10000 1000 100 10 1 0.1
Local Disk
Throughput (MB/s)
Capacity (GB)
Source: Barroso and Urs Hölzle (2009)

Throughput-Capacity Walk Through
MEM: 8GB, 100ns, 20 GB/s Disk: 2TB, 10ms, 200 MB/s
10000000 1000000 100000 10000 GB 1000 100 10
10000000 1000000 100000 10000 MB/s 1000
0.1 0.1 Local LoLoccal … Rack …DC DC
Local LLocall… Rack..D.C DDCC
Mem Disk Rack Rack Mem Disk`
Mem Disk Mem
Local… Rack… DC
Source: Barroso and Urs Hölzle (2009)
Mem Disk Rack Rack Mem Disk` Mem Disk Mem
Local… Rack… DC

System Management #1
Assume data is stored parsimoniously on exactly 1 server and each server executes map tasks
Which Map Reduce workload can achieve throughput of 20 GB/s?
If throughput of 100 MB/s is acceptable, where can map tasks get data from?
Which achieves higher throughput Rack memory or Local disk?
The answer varies if we change the data center characterization

Numbers Everyone Should Know*
L1 cache reference
Branch mispredict
L2 cache reference
Mutex lock/unlock
Main memory reference 100 ns
Send 2K bytes over 1 Gbps network 20,000 ns
Read 1 MB sequentially from memory 250,000 ns
Round trip within same datacenter 500,000 ns
Disk seek 10,000,000 ns
Read 1 MB sequentially from disk 20,000,000 ns
Send packet CA → Netherlands → CA
150,000,000 ns
* According to (LADIS 2009 keynote)

Data Management in the Cloud Module 2: Map Reduce
Module Outline
1. Unstructured data
●What is it? Why is it common
●What is Map Reduce? Why is it important
2. How does Map Reduce manage data
●What is a distributed file system
●Adding hardware to improve throughput ●Writing better software to improve throughput

Map Reduce
Programmers specify two functions: map (k1, v1) → []
reduce (k2, [v2]) → []
 All values with the same key are sent to the same reducer
The execution framework handles everything else…

k1 v1 k2 v2 k3 v3 k4 v4 k5 v5 k6 v6
a1b2 c3c6 a5c2 b7c8 combine combine combine combine
a1b2 c9 a5c2 b7c8
a15 b27 c298
Shuffle and Sort: aggregate values by keys
r1 s1 r2 s2 r3 s3

“Everything Else”
The execution framework handles everything else…
 Scheduling: assigns workers to map and reduce tasks
 “Data distribution”: moves processes to data
 Synchronization: gathers, sorts, and shuffles intermediate data  Errors and faults: detects worker failures and restarts
You don’t know:
 Where mappers and reducers run
 When a mapper or reducer begins or finishes
 Which input a particular mapper is processing
 Which intermediate key a particular reducer is processing
What can you do?
 Cleverly structure intermediate data to reduce network traffic

Structure of Intermediate Data
Term co-occurrence matrix for a text collection
 M = N x N matrix (N = vocabulary size)
 Mij: number of times i and j co-occur in some context
(for concreteness, let’s say context = sentence) Why?
 Distributional profiles as a way of measuring semantic distance  Semantic distance useful for many language processing tasks

Structure of Intermediate Data cont.
Term co-occurrence matrix for a text collection = specific instance of a large counting problem
 A large event space (number of terms)
 A large number of observations (the collection itself)
 Goal: keep track of interesting statistics about the events
Basic approach
 Reducers aggregate partial counts
 Mappers generate partial counts
How do we aggregate partial counts efficiently?

First Try: “Pairs”
 For all pairs, emit (a, b) → count
Reducers sum up counts associated with these pairs Use combiners!
Each mapper takes a sentence:
 Generate all co-occurring term pairs

Pairs: Pseudo-Code

Pairs: Data Flow
Output From Map Instance 1
scores 1 25 1 25 1

guides 1 Mercury 1 past 1
Elena scores 25
Elena and Candace are All Stars
guides Mercury 1 guides past 1
Mercury past 1 Mercury Candace 1
guides Candace 1 Candace past 1
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Map Stage emits 13 intermediate values from 8 input words
What happens in the reduce stage? All of the keys are unique, producing 13 group-by sums
Is this implementation holistic, algebraic or distributive aggregation?
N=13, if we add a combiner at the end of the map stage, it could simply sum recurring word pairs (F({X}) = G(F({X1}), F({X2}), … and F=G). Yes, this is distributive
Will this implementation complete quickly? Probably not, a lot of network transfer

“Pairs” Analysis
Advantages Disadvantages
 Easy to implement, easy to understand
 Lots of pairs to sort and shuffle around (upper bound?)  Not many opportunities for combiners to work

Another Try: “Stripes”
Idea: group together pairs into an associative array
(a, b) → 1 (a, c) → 2 (a, d) → 5 (a, e) → 3 (a, f) → 2
a → { b: 1, c: 2, d: 5, e: 3, f: 2 }
Each mapper takes a sentence:
 Generate all co-occurring term pairs
 For each term, emit a → { b: countb, c: countc, d: countd … } Reducers perform element-wise sum of associative
a → { b: 1, d: 5, e: 3 }
a → { b: 1, c: 2, d: 2, f: 2 } a → { b: 2, c: 2, d: 7, e: 3, f: 2 }
Key idea: cleverly-constructed data structure brings together partial results

Stripes: Pseudo-Code

Stripes: Data Flow
Output From Map Instance 1
Elena scores Candace
Mercury Britt
scores 1 25 1 25 1
guides 1 past
1 Mercury 1 1 Britt 1
Elena scores 25
Mercury 1 past 1
Elena and Candace are All Stars
Britt 1 past 1
Watch Candace. Candace Works
Brittany guides Mercury past Candace
Map Stage emits 6 intermediate values from 8 input words
What happens in the reduce stage? Element-wise sum
Is this implementation holistic, algebraic or distributive aggregation?

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com