代写代考 Did you do your pre-session reading?

Did you do your pre-session reading?

Programming modes…
• “Traditional” programming is serial… • … but we have many cores and many

Copyright By PowCoder代写 加微信 powcoder

computers!
• So let’s parallelise! • Ok, but how?

Brainstorming…

Brainstorming…
• Many problems to solve…
– How do we partition the input to workers?
– How do we distribute the work to workers?
– How do we aggregate the results from workers? – How do we synchronise/orchestrate the workers?

Brainstorming…
• Parallel programming for many cores/threads on a single computer
• What’s the first thing that comes to mind?
– Break input/task into parts that can be processed concurrently
– Then execute them concurrently across cores/threads/…
• Possible problems/challenges?
– Identifying parts of the input/task that can be processed concurrently
– Exact input/task partitioning scheme
– Coordination
– Synchronisation
– Result merging –…
– Identifying whether it actually makes sense to do this…
• Wait till we go distributed…

Brainstorming…
• What is the best way to parallelise a job on a single machine using multiple threads/cores?
a. All threads access the input and intermediate data at a shared memory location
b. Each thread is sent a full copy of the input data, uses own memory for intermediate data
c. Each thread is sent a full copy of part of the input data, uses own memory for intermediate data
d. Each thread chooses what part of the input data to process, uses a shared location for intermediate data

The easiest case…
Example: Lowercase all letters in a very large text file • Why is this easy?
– Data can be partitioned in equal-sized partitions Can precompute #partitions very easily
– No dependency within or across partitions
Each partition can be processed independently of others
– Amenable to a simple master-workers solution
• Master pre-splits input, assigns each split to a worker, gathers
• Worker receives a split, processes it, returns results to master
– What if a worker dies mid-processing?
– What if the master dies mid-processing?

The not so easy case…
Example: Lowercase only first letter of each word in a very large text file
• Why is this not as easy?
– Data can’t necessarily be partitioned in equal-sized partitions
Think of words on partition boundaries
• How can we “fix” this while maintaining the simplicity of the master-workers solution?
– How do we compute the partitions?
– Can we parallelise that step?
– Whywouldorwouldn’twewanttodoso?
– What if a worker dies mid-processing?
– What if the master dies mid-processing?

The somewhat harder case…
Example: Given a large graph in the form of a series of text lines, each containing the ID of a unique node and the IDs of nodes to which it links (outlinks), compute how many inlinks each node has
• Why is this harder?
– Data can’t necessarily be partitioned in equal-sized partitions
– Data across partitions not necessarily independent
• Outlinks in one partition reference a node in another partition
• Outlinks from multiple partition need be aggregated to compute the final result
• How can we “fix” this while maintaining the simplicity of the master- workers solution?
– How do we partition the input?
– How do we distribute the work?
– How do we aggregate the results?
– What if a worker dies mid-processing?
– What if the master dies mid-processing?

The even harder case…
Example: Given a large number of 2-d points, compute a simple clustering into k clusters
o Take the first k points as cluster heads; then:
1. Assign each point to the closest cluster head
2. Average all points per cluster head and make the average point the new head for that cluster
3. Compute distance of new and old cluster heads per cluster, for all clusters; if max distance below a threshold stop, otherwise loop to 1.
• Why is this harder?
– Data can’t necessarily be partitioned in equal-sized partitions
– Data across partitions not necessarily independent
– Algorithm requires multiple passes to terminate (converge)
• How can we “fix” this while maintaining the simplicity of the master-workers approach?
– How do we partition the input?
– How do we distribute the work?
– How do we aggregate the results?
– What if a worker dies mid-processing?
– What if the master dies mid-processing?
• Look ahead: What if we didn’t care for a master-workers architecture?

And now for something completely different

The actually hard case…
• Assume we now have many computers on a local network (cluster/datacentre)
– How do we partition the input?
– How do we distribute the work?
– How do we aggregate the results?
– What if a worker dies mid-processing?
– What if the master dies mid-processing?
– What it the network goes down across two workers?
– What it the network goes down between the master and one (or more) workers?
– What if…?
What if…?
What if…?

Many fish in the ocean…
• NFS + SSH
• Batch Synchronous Parallel (BSP)
• Message Passing Interface (MPI)/OpenMP
• Parallel Virtual Machine (PVM)
• 􏰛Be􏰭􏰋􏰂lf􏰪 clusters •…
• What’s wrong with all these? – i.e., how much is the fish?
• Fault tolerance in a large scale environment – i􏰍e􏰍􏰄 􏰖􏰭 l􏰭ng and 􏰔hank􏰖 f􏰭􏰬 all 􏰔he fi􏰖h􏰅

Enter MapReduce
• Why “Map” and “Reduce”?
• Map(function f, values v[]): apply f to all elements of v
• Reduce(function f, values v[]): combine all elements of v using f as an aggregate
• MapReduce used to denote:
– The programming paradigm
– Implementations of the model (e.g. Google MapReduce, Hadoop MapReduce, etc.)

Short history
• 2003: GoogleFS paper published
• 2004: Google publishes MapReduce paper
• 2004: Doug Cutting (of Yahoo!) starts incorporating the above to Nutch (later Lucene)
• 2006: GoogleFS/MapReduce parts factored out of Nutch and Apache Hadoop is born (Apache JIRA HADOOP-1)
• 2007: Only 3 companies on the “Powered by Hadoop” page
• 2008: YARN (aka Hadoop 2) is born
• 200􏰙: Yahoo! uses Hadoop to build its web index; 20 companies on the “Powered by Hadoop” page
• 2008: Hadoop wins the Terasort challenge
• 2012: Hadoop 1.0 available
• 2013: YARN deployed in production at Yahoo!
• 2013: Hadoop 2.2 available
• 2017: Hadoop 3.0 available •…

Why MapReduce?
• “Million Lyrics Dataset”
– One file per song
– Verses one per line, accessed as key-value pairs (key: line no; value: line text)
Hear the rime of the Ancient Mariner, see his eye as he stops one of three.
Mesmerises one of the wedding guests. Stay here and listen to the nightmares of the Sea.
And the music plays on, as the bride passes by. Caught by his spell and the Mariner tells his tale. Driven south to the land of the snow and ice, to a place where nobody’s been.
Through the snow fog flies on the albatross. Hailed in God’s name, hoping good luck it brings . And the ship sails on, back to the North. Through the fog and ice and the albatross follows on. The mariner kills the bird of good omen. His shipmates cry against what he’s done,
but when the fog clears, they justify him and make themselves a part of the crime.
Q: Compute all unique words and their frequency of appearance (aka Word Count)

The MapReduce paradigm
map: (k1, v1)[(k2, v2)] reduce: (k3, [v3])[(k4, v4)]
void Map(int lineNo, String line) { foreach (Word w in line)
emit(tolower(w), 1);
void Reduce(Word w, int[] counts) { int sum = 0;
foreach (int i in counts) sum += i;
emit(w, sum); }
􏰛And 􏰔he 􏰖hi􏰕 􏰖ail􏰖 􏰭n􏰄 back 􏰔􏰭 􏰔he N􏰭􏰬􏰔h􏰄 􏰔h􏰬􏰭􏰂gh 􏰔he f􏰭g and ice and 􏰔he alba􏰔􏰬􏰭􏰖􏰖 f􏰭ll􏰭􏰋􏰖 􏰭n􏰍􏰪
back 1 ice to 1 north
1 ice 1 1 north 1
albatross 1 and 1,1,1 back 1 fog 1 follows 1
albatross 1 and 3 back 1 fog 1 follows 1
the 1 north 1 through 1 the 1 …
on 1,1 sails 1
the 1,1,1,1 through 1 to1
on2 sails 1 ship 1 the 4 through 1 to1

High-level view
• Key idea: Move the computation to the data, not the
data to the computation
– Why? – How?
• Main entities:
– Master: JobTracker
– Worker “master”: TaskTracker – Worker “worker”: Task
– Client: Client
• Input/output data storage: HDFS
• Intermediate data storage: Local FS

High-level view
worker worker worker

Execution phases
1. Job submission
2. Job initialization
3. Map tasks scheduling
4. Map task execution
5. Intermediate output sorting
6. Reduce task scheduling
7. Reduce task execution
8. Job completion
o Progress and status monitoring

• Input parsing/splitting – InputFormat
• Map function execution – Mapper
• Intermediate result partitioning/sorting
– Combiner, Partitioner, Group comparator
• Intermediate data transfer to reducers
• Reduce function execution
• Final output storage
– OutputFormat
High-level view

Job execution anatomy
Source: Hadoop: The definitive guide (3rd ed.)

Task assignment
• The JobTracker assigns each task to a TaskTracker
• How does the JT know which TT is idle? – Through JT-TT heartbeats
– Not in the Google paper; Why?
• How does the JT decide which task to run where?
– That’s a job for the scheduler • FIFO / Fair / Capacity
• Data-local / rack-local / remote
– Plus speculative execution
• (a.k.a., backup task execution)

Input splitting
• Default strategy: split files into equally-sized parts
• Splits should (by default) be at most one HDFS block large (lower limit on # splits)
• Splits are precomputed & stored along with the data files on HDFS
• Splits are data agnostic
– What happens if a word is split in half? – Let the InputFormat decide!

Aside: InputFormat
• abstract class org.apache.hadoop.mapreduce.InputFormat (subclasses under org.apache.hadoop.mapreduce.lib.input)
– FileInputFormat: Read data from files on HDFS; “utility” base class
• TextInputFormat: Treat input as text files; line-oriented operation
• KeyValueTextInputFormat: Treat input as files storing key-value pairs, one per line; key-values separated by a special character (default: tab)
• SequenceFileInputFormat: Treat input as binary files storing key-value pairs (i.e., org.apache.hadoop.io.SequenceFile’s)
– SequenceFileAsTextInputFormat: Converts keys/values to Strings by means of their toString() method
– SequenceFileAsBinaryInputFormat: Reads keys/values in their raw format
• CombineFileInputFormat: Process multiple small files as a single large file – CombineTextInputFormat: Combine multiple text files
– CombineSequenceFileInputFormat: Combine multiple SequenceFile’s
• NLineInputFormat: Treat input as text files, create a split every N lines – DBInputFormat: Read data from a SQL table (over JDBC)
– CompositeInputFormat: ‘‘Join’’ multiple data sources

Aside: OutputFormat
• abstract class org.apache.hadoop.mapreduce.OutputFormat (subclasses under org.apache.hadoop.mapreduce.lib.output)
– FileOutputFormat: Write data to files on HDFS
• TextOutputFormat: Write plain text files
• MapFileOutputFormat: Write indexed key-value files
• SequenceFileOutputFormat: Write output in org.apache.hadoop.io.SequenceFile format
– SequenceFileAsBinaryOutputFormat: Write output in raw format
– NullOutputFormat: Send all output to oblivion (a.k.a. /dev/null) – LazyOutputFormat: Writes data out in a lazy manner
– MultipleOutputs: Writes data to multiple files, possibly using a different OutputFormat for each

Intermediate output and shuffling…
• How should the mappers’ output be transferred to reducers
a. Mappers write their output locally, reducers fetch over the network
b. Mappers stream their output directly to reducers over the network
c. Mappers write their output on HDFS, reducers read it off of HDFS
d. Mappers store their output in main memory until map execution is complete, then copy it to the reducers’ memory over the network
• A typical design decision: To push or not to push?
– i.e., should mappers push data to reducers, or reducers pull data from mappers?
• How does each reducer know which mappers produced data relevant to the reducer’s task?
• How can the reducer know which part of each mapper’s output to fetch?
• How can the mapper know how to pre-partition its output files?
– Also, how are mapper output keys assigned to reducers? • And what about combiners?

Intermediate output and shuffling

• Distributed cache
– Allows to distribute (read-only) files to tasktrackers
• Can be used for jars, text, archives, conf files, etc.
• Compressed files unarchived automatically
• Jars/libs (can be) added to the task’s classpath
– Files are copied only once per job to each tasktracker
– Can be private or public
– Interesting ‘‘hack’’ to share state across tasks!
• Counters
– Built-in counters for:
• HDFS and FileInputFormat/FileOutputFormat profiling
• Map/reduce tasks’ progress
• Job progress
– Also support for user-defined counters
– All counters automatically aggregated at the jobtracker

Aside: More extras
• Compression
– Input/(intermediate) output data can/should be compressed
• More data per HDFS block/partitionLower storage/network overhead
• Less I/Os across the dataset Higher throughput
• Higher CPU overhead…
• … but it pays out for I/O-intensive workloads
– Multiple codecs already available: ZLib, GZip, BZip2, LZO, Snappy, LZ4, …
– … but what about splitting???
• Bzip2, LZO and Snappy are splittable by default
• Can be worked around, but not for the faint at heart (e.g., see https://github.com/kevinweil/hadoop-lzo)
– Use C++ for mapper/reducer implementations
• Streaming
– Use any language able of I/O on stdin/stdout for mappers/reducers

Task failure
• Task failure detected by tasktracker
– Runtime exception/other user code bug
error reported to tasktracker by child JVM
– Child JVM dies
tasktracker informed by the OS
– Task is hung
detected through timeouts in reporting
• The tasktracker informs the jobtracker of the failure
• The jobtracker reschedules the failed task (to a different tasktracker, if possible)
Multiple attempts per task
• Jobs with many failed tasks are eventually terminated
• … or bad input records are marked and skipped

TaskTracker failure
• TaskTracker failure detected by JobTracker – Crashedorslowtasktracker
missing heartbeats
– Failing or misconfigured tasktracker
multiple task failures
• Tasktracker removed from pool of ‘‘live’’ tasktrackers
• Tasks initially assigned to the failed tasktracker are rescheduled
• Failed/blacklisted tasktrackers reconsidered after some time – Why?

JobTracker failure
The death of all!!!
(… well not really, but close enough …)
􏰅 but wait􏰄 help is on the way!!!

• How would you fix this?
– Think: what is wrong with the current setup?
• Well, multiple things…
– JobTracker a SPoF and possible bottleneck
– JobTracker a “jack of all trades”:
• Does MR coordination, resource management, scheduling, etc. • Across all running MR jobs
• But without knowing what else goes on in the cluster…
– Multiple jobs managed by the same JT
• What if one of them is malicious or “malicious”?
• Solution?

Source: T. White. Hadoop: The Definitive Guide. O’ , 2012.

• Input parsing/splitting – InputFormat
• Map function execution – Mapper
• Intermediate result partitioning/sorting
– Combiner, Partitioner, Group comparator
• Intermediate data transfer to reducers
• Reduce function execution
• Final output storage
– OutputFormat
High-level view

Problem solving
• Need to define/consider:
– Input spec
– Desired output spec
– Aggregation function
– Grouping
– Partitioning
(more on these shortly)
• Caution: always keep scalability in mind!!!
– Will your design work if the data size is 1,000x its current size?
– How many passes does your design need to do over the input data?
– Do you really need that many jobs to accomplish your goal?
– Can you cut down on bytes read off of disk/sent over the network?
– What is the bottleneck in your design?

Problem solving
• Remember:
– Execution “stages”: DriverInputFormatMapper Partitioner Sorting Grouping Reducer OutputFormat
– Input splits defined by InputFormat; usually just contiguous regions of input file(s)
– Mapper output pre-sorted and pre-partitioned based on key/partitioner/group comparator
– Within each partition, key-values ordered by key/partitioner/group comparator
– … not by value! (if default settings used…)

Need to define/consider
• InputFormat:
– Input file(s) splitting logic (also defines number of mappers) — InputFormat.getSplits()
– Record boundaries — RecordReader
• Mappers:
– Possible pre-processing or state required for the duration of the mapper’s lifecycle —
Mapper.setup()
– Possible post-processing or tasks to be executed at the end of the mapper’s lifecycle — Mapper.cleanup()
– Input and output key/value data types and format/structure — note: key could be a composite key!
– Processing done for each input key/value — Mapper.map()
• Shuffle:
– Possibility to aggregate map output key-value pairs before sent to the reducer(s) — Combiner
– How many reducers we’ll need and how keys should be split across reducers — Partitioner
– How keys should be sorted at the input of the reducers (a.k.a. sorting comparator, often necessary for composite keys) — WritableComparator, job.setSortComparator()
– How key/values should be grouped together, in case we want to combine values for different keys (or only consider part of a composite key when forming the key:list-of-values pairs to be fed to reducers, a.k.a. grouping comparator) — WritableComparator, job.setGroupingComparator()

Need to define/consider
• Reducers:
– Possible pre-processing or state required for the duration of the reducer’s lifecycle —
Reducer.setup()
– Possible post-processing or tasks to be executed at the end of the reducer’s lifecycle — Reducer.cleanup()
– Input key/value data types and format/structure (generally matches the output spec of mappers but may be altered if using a custom grouping comparator or combiner)
– Output key/value data types and format/structure (this is what will be written to the final result file(s))
– Processing done for each input key/list-of-values pair — Reducer.reduce() • OutputFormat:
– Output file format — may alter how the key-values emitted by reducers are stored (e.g., as text or binary, in files or in a database, etc.)
– Need/use for counters — enum Co􏰮n􏰗ers 􏰩 􏰘 􏰯
– Need/use for distributed cache — DistributedCache, usually resources added by job.run() and loaded by Mapper.setup() and/or Reducer.setup()
– Need/use for compression for mapper output (set “mapreduce.map.output.compress􏰦, 􏰣mapreduce.map.output.compress.codec”) and/or reducer output (use FileOutputFormat.setCompressOutput() plus FileOutputFOrmat.setOutputCompressorClass(), SequenceFile.setOutputCompressionType(), …)

Example: Grep
• Problem: Output all lines containing a given pattern
• Solution:???
– Input key-value format – Desired output
– Aggregation function – Sorting
– Grouping
– Partitioning

• Considerations:
Example: Grep
– Input key-value format:
• TextInputFormat (one key-value per line, key: file offset, value: text line)
– Desired output
• Lines containing given pattern; no specific ordering requested
• None needed; no specific output ordering requested
– Aggregation function/Grouping/Partitioning
• None needed; just need to print matching lines
• This is an example use case for map-only jobs: no need for aggregation means no need for the shuffle and reduce phases.
• setup(): get pattern from job configuration, store it in a String object
• map(): search value for pattern; if found emit(null, value), else return
– Reducer: • none
• Set input format, output format, input path, output path, mapper class
• Set number of reducers to 0

Example: Sorting
• Problem: Totally sort input by key
– Assume input is a text file with two numbers per line
• Solution:???
– Input key-value format – Desired output
– Aggregation function – Sorting
– Grouping
– Partitioning

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com