CIS 455/555: Internet and Web Systems
Spring 2022
Homework 3: MapReduce, Parallelism, and Stream Processing with Punctuation Due April 11, 2022, at 10:00pm ET
1. Introduction
Copyright By PowCoder代写 加微信 powcoder
In this assignment, you will extend StormLite to a distributed framework, which essentially emulates Apache Storm – but in the process, also emulates MapReduce. As we hinted at in the lectures, MapReduce is really doing parallel (sharded) processing of streams of tuples, and Storm does the same thing with slightly different abstractions (e.g., streaming vs batched). Your solution will make use of REST services built in the Framework (or your framework from Homework 1). You will test your framework by implementing a simple WordCount MapReduce program, which would normally run on Hadoop.
As usual, you should start by forking, then cloning the framework / skeleton code (project at https://bitbucket.org/upenn‐cis555/555‐hw3) from bitbucket. This includes a slightly expanded version of StormLite from HW2, with some MapReduce components.
2. Overview
At the end of this assignment, you will have a system consisting of two types of nodes: some number of workers and a single master. These nodes can be distributed, which means that the workers can run on different computers than the master and other workers. Coordination and data will be handled through the REST interfaces mentioned above.
As for emulating MapReduce on top of StormLite, note that StormLite gives you a basic abstraction for thinkingaboutcomputationoverunitsofdatainstreams. Wewillrunmapandreducefunctionswithin StormLite bolts (with a spout that can read key-value data). Workers run spouts and bolts, and they store the data that your MapReduce framework is working on; the master coordinates the workers and provides a user interface. The shuffle, which is typically done along with a sort in MapReduce after all data has been mapped, is instead done incrementally by the StormLite framework using hashing. (This will allow us to dispense key-value pairs in a streaming fashion and is useful for the vast majority of MapReduce algorithms, but of course it doesn’t allow us to directly emulate MapReduce algorithms that rely on sorting.)
A key issue is that, in a MapReduce job, the inputs end and the reduce can only start once the reducer has read all inputs. This requires that we implement a very simple distributed consensus algorithm (see 4.5.1 for details).
For our StormLite MapReduce executor, there are some key classes you will need to note. Much as we modeled StormLite off Apache Storm, these classes are heavily based on Apache Hadoop MapReduce. Recall that we spent some time in lecture discussing the different aspects of Hadoop MapReduce. Other than sort-based algorithms, you should be able to drop in Hadoop MapReduce code with minimal changes.
● Config (a set of key-value pairs) will be used to specify various parameters on how your MapReduce engine’s topology is created and how it executes. This includes the names of the classes to instantiate.
● Job is the basic class specifying what the map and reduce functions are. Think of it as a simplified Hadoop MapReduce.
Additionally, we give you some “starter” spouts and bolts that will form the core of your implementation:
● FileSpout is an abstract class for a file reader that reads a sharded file. If you have workers 0 and 1, FileSpout will read file.0 and file.1 on those machines, respectively. [Note: this depends on you setting up the Config variable workerIndex correctly to the worker index, as
required below.]
● MapBolt runs map() from an instance of Job (as specified in the Config parameters).
● ReduceBolt runs reduce() from an instance of Job (as specified in the Config parameters).
ReduceBolt buffers tuples until it knows that there is no more input (i.e., all potential senders have
marked “end of stream”).
● PrintBolt simply prints the output to the console.
You will need to modify and expand some of these, but do not change the existing interfaces as we will be dropping in our own map/reduce classes to test your system.
Both the worker and master nodes will communicate with one another via route handlers, registered with the or your own HW1 framework. That means that each worker and master will have a single WorkerServer or MasterServer running on separate ports, optionally on multiple machines.
We dive into their operation in more detail in later sections, but at a high level, each worker node will have a separate storage directory in which it keeps its local share of the data – as temp files and also, as relevant, as BerkeleyDB environments/tables. For instance, a worker might have a storage directory called storage/node1. Each worker may have multiple operators and threads, each with a bolt executor ID (such as 1234); you can use this to generate filenames or BerkeleyDB table names unique to the executor. Keep in mind that there will usually be many workers, and that the data will be distributed among them; for instance, you might have a system with 10 workers and 100,000 web pages total, but each worker might only store 10,000 of them. You should also keep in mind that there might be multiple executor threads running on a worker when architecting your storage strategy.
The actual MapReduce Jobs will simply be classes that implement a special interface (which contains a map and a reduce function). In a MapReduce framework like Hadoop, these classes would be sent from the master to the workers; to simplify the assignment, we will assume that these classes are already in the classpath on each worker. We will also assume that only one job is being run at a time.
Changes to the StormLite API. We’ve made some important, relatively minor, changes to StormLite to make it easier (1) for you to debug your distributed implementation, and (2) for you to look at extending StormLite to a full multithreaded execution model. Specifically, (1) the Tuple now includes an (optional) field for you to store the ID of the executor that produced it, so you can trace where it came from (all IStreamSources have a getExecutorId); (2) all methods for sending or writing take an (optional) source ID; (3) the spout nextTuple and bolt execute methods return a Boolean indicating they still may produce more stream elements (if true) or they have reached end-of-stream (if false).
3. The master
A MasterServer will control and monitor job progress from the Web. The master presents a status page (/status) on which it displays the list of workers that are currently online, as well as some information about each (e.g., the worker’s IP address, and what the worker is doing). To keep this list up to date, the workers will periodically send some information about themselves to the master. The status page will also have an input form that allows the administrator to specify a MapReduce job to run, as well as some parameters (such as a subdirectory of the storage directory to read data from, a subdirectory to write the output to, etc.). When the administrator submits this form, the master forwards this information to each of the workers, which then begin processing the data.
3.1 Status updates from the workers
The MasterServer should provide a way for the workers to report their status. Specifically, it should accept GET requests for the URL /workerstatus, with the following parameters (in the query string):
● port:theportnumberonwhichtheworkerislisteningforHTTPrequests(e.g.,port=4711)
● status: INIT (initialization), MAPPING, REDUCING or IDLE, depending on what the worker is
doing (e.g., status=IDLE). (You’ll see an enum for this in TopologyContext.)
● job: the name of the class that is currently being run (for instance,
job=edu.upenn.cis455.mapreduce.job.MyJob)
● keysRead:thenumberofkeysthathavebeenreadsofar(ifthestatusisMAPPINGorREDUCING),
the number of keys that were read by the last map (if the status is IDLE) after a job, or zero if the
status is IDLE.
● keysWritten: the number of keys that have been written so far (if the status is MAPPING or
REDUCING), the number of keys that were written by the last REDUCE (if the status is IDLE). If the
node has never run any jobs, return 0.
● results: the set of results (up to the first 100) output by each worker. This will of course be
empty until the ReduceBolt has actually produced output and fed it to the next stream.
Note that the worker is not reporting its IP address because the master can get it from the request. (The port number is needed because the worker will usually use different ports for listening and for sending requests.) The master should keep this information, along with the time it has last received a /workerstatus request from a given IP:port combination.
3.2 The user‐facing status (and job‐launch) page
When a user requests the URL /status from the master server, the route should return a dynamic HTML document that contains (1) a section with status information about the workers, and (2) a web form for submitting jobs.
The status section should include one line for each active work (a worker is considered active if it has posted a /workerstatus within the last 30 seconds) and should in the following comma-separated form. Make sure that your output matches exactly to be compatible with the autograder. Again, for simplicity, you can assume we are only going to execute a single job at a time.
0: port=8001, status=IDLE, job=None, keysRead=0, keysWritten=0, results=[]
1: port=8002, status=IDLE, job=Foo, keysRead=2, keysWritten=2, results=[(a, 1),(b,1)]
The web form for submitting jobs should use the following form code: