CS作业代写 EECS 485 Project 4: Map Reduce | p4-mapreduce

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
p4-mapreduce 
EECS 485 Project 4: Map Reduce
Due 11:59pm ET March 27, 2022. This is a group project to be completed in groups of two to three.

Copyright By PowCoder代写 加微信 powcoder

Initial Release for W22
3/21: Added public test case test_manager_05
3/22: Added time.sleep() to integration tests to give Workers more time for finishing reduce tasks
Introduction
In this project, you will implement a MapReduce server in Python. This will be a single-machine, multi-process, multi-threaded server that will execute user-submitted MapReduce jobs. It will run each job to completion, handling failures along the way, and write the output of the job to a given directory. Once you have completed this project, you will be able to run any MapReduce job on your machine, using a MapReduce implementation that you wrote!
There are two primary modules in this project: the , which will listen for MapReduce jobs, distribute work amongst the Workers, and handle faults. modules register themselves with the Manager, and then await commands, performing map or reduce tasks based on instructions given by the Manager.
You will not write MapReduce programs, but rather the MapReduce server. We have provided several sample MapReduce programs that you can use to test your MapReduce server.
Refer to the Threads and Sockets Tutorial for background and examples. Setup
Group registration
https://eecs485staff.github.io/p4-mapreduce/

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
2 /Users/awdeorio/src/eecs485/p4-mapreduce
2 /Users/awdeorio/src/eecs485/p4-mapreduce
3 $ git status
4 On branch main
5 Your branch is up-to-date with ‘origin/main’.
7 nothing to commit, working tree clean
8 $ git remote -v
9 origin https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (fetch)
10 origin https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (push)
.gitignore
2 /Users/awdeorio/src/eecs485/p4-mapreduce
3 $ head .gitignore
4 This is a sample .gitignore file that’s useful for EECS 485 projects.
env/bin/activate
2 /Users/awdeorio/src/eecs485/p4-mapreduce
s://eecs485staff.github.io/p4-mapreduce/
Register your group on the Autograder. Project folder
Create a folder for this project (instructions). Your folder location might be different.
Version control
Set up version control using the Version control tutorial. You might also take a second look at the Version control for a team tutorial.
After you’re done, you should have a local repository with a “clean” status and your local repository should be connected to a remote GitLab repository.
You should have a file (instructions).
Python virtual environment
Create a Python virtual environment using the Project 1 Python Virtual Environment Tutorial.
Check that you have a Python virtual environment, and that it’s activated (remember

/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Starter files
Download and unpack the starter files.
Move the starter files to your project directory and remove the original starter_files/ directory and tarball.
You should see these files.
2 /Users/awdeorio/src/eecs485/p4-mapreduce
3 $ wget https://eecs485staff.github.io/p4-mapreduce/starter_files.tar.gz
4 $ tar -xvzf starter_files.tar.gz
2 /Users/awdeorio/src/eecs485/p4-mapreduce
3 $ mv starter_files/* .
4 $ rm -rf starter_files starter_files.tar.gz
3 $ls-denv
5 $ echo $VIRTUAL_ENV
6 /Users/awdeorio/src/eecs485/p4-mapreduce/env
3 ├── mapreduce
15 ├── tests
__init__.py
├── __init__.py
└── __main__.py
13 ├── requirements.txt
14 ├── setup.py
├── testdata
├── grep_correct.txt
└── word_count_correct.txt
├── grep_map.py
├── __init__.py
└── __main__.py
s://eecs485staff.github.io/p4-mapreduce/

https://eecs485staff.github.io/p4-mapreduce/ 4/28
/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Activate the virtual environment and install packages.
Here’s a brief description of each of the starter files.
1 $ source env/bin/activate
2 $ pip install -r requirements.txt
3 $ pip install -e .
mapreduce/manager/
mapreduce/worker/
mapreduce/submit.py
mapreduce/utils.py
requirements.txt
MapReduce Python package skeleton files
MapReduce Manager skeleton module, implement this MapReduce Worker skeleton module, implement this Provided code to submit a new MapReduce job
Code shared between Manager and Worker
Python package dependencies matching autograder MapReduce Python package configuration
Public unit tests
Sample MapReduce programs, all use stdin and stdout
├── grep_reduce.py
├── wc_map.sh
├── wc_map_slow.sh
├── wc_reduce.sh
└── wc_reduce_slow.sh
├── file01
└── file08
input_small
├── file01
└── file02
input_large
├── file01
├── file02
├── file03
└── file04
├── test_worker_08.py
tests/testdata/exec/

Sample MapReduce program correct output
tests/testdata/correct/
tests/testdata/input/
tests/testdata/input_small/
tests/testdata/input_large/
mapreduce worker
mapreduce-manager
mapreduce-
mapreduce-submit
4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Sample MapReduce program input
Sample MapReduce program input for fast testing
Sample MapReduce program input for testing on large input Files used by our public tests
Before making any changes to the clean starter files, it’s a good idea to make a commit to your Git repository.
Complete the Threads and Sockets Tutorial.
Here are some quick links to the libraries we used in our instructor implementation.
Python Subprocess
Python Multithreading
Python Sockets
Python JSON Library
Python heapq Library
Python Pathlib Library (Object-oriented filesystem paths) Python Logging facility
We’ve also provided sample logging code Run the MapReduce server
You will write a Python package that includes Manager with the command line entry point
and modules. Launch a and a Worker with
. We’ve also provided to send a new job to the Manager.
Start a Manager and Workers
The starter code will run out of the box; it just won’t do anything. The Manager and the Worker run as seperate processes, so you will have to start them up separately. First we start up a Manager which will listen on localhost TCP port 6000 and UDP port 5999. Then we start up two Workers, and tell them that they should communicate with the Manager on localhost ports 6000 and 5999. The first Worker will listen for messages on localhost port 6001 and the second on port 6002. The ampersands ( & ) will cause each process to start in the background.
https://eecs485staff.github.io/p4-mapreduce/ 5/28

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
By default, –host and –manager-host are set to localhost , the Manager’s port is set to 6000, the Worker’s port is set to 6001, and both and are set to 5999, so the following commands would be equivalent:
See your processes running in the background.
–manager-hb-port
1 $ mapreduce-manager &
2 $ mapreduce-worker &
3 $ mapreduce-worker –port 6002 &
1 $ pgrep -lf mapreduce-worker # macOS
2 $ pgrep -af mapreduce-worker # Linux/WSL
3 15364 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resourc
4 15365 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resourc
5 $ pgrep -lf mapreduce-manager # macOS
6 $ pgrep -af mapreduce-manager # Linux/WSL
7 15353 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resourc
Stop your processes.
Submit a MapReduce job
Lastly, we have also provided mapreduce/submit.py which sends a new MapReduce job to the Manager. You can specify the job using command line arguments.
1 $ pkill -f mapreduce-manager
2 $ pkill -f mapreduce-worker
3 $ pgrep -lf mapreduce-worker # no output, because no processes 4 $ pgrep -lf mapreduce-manager # no output, because no processes
1 $ mapreduce-manager –host localhost –port 6000 –hb-port 5999 &
2 $ mapreduce-worker –host localhost –port 6001 –manager-host localhost –manager-por
3 $ mapreduce-worker –host localhost –port 6002 –manager-host localhost –manager-por
1 $ mapreduce-submit –help
2 Usage: mapreduce-submit [OPTIONS]
4 Top level command line interface. 5
6 Options:
7 -h, –host TEXT
8 -p, –port INTEGER
9 -i, –input DIRECTORY
Manager host, default=localhost
Manager port number, default=6000
Input directory, default=tests/testdata/input
s://eecs485staff.github.io/p4-mapreduce/

https://eecs485staff.github.io/p4-mapreduce/ 7/28
/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Here’s how to run a job. Later, we’ll simplify starting the server using a shell script. Right now we expect the job to fail because Manager and Worker are not implemented.
1 $ pgrep -f mapreduce-manager # check if you already started it
2 $ pgrep -f mapreduce-worker # check if you already started it
3 $ mapreduce-manager &
4 $ mapreduce-worker &
5 $ mapreduce-worker –port 6002 &
6 $ mapreduce-submit –mapper tests/testdata/exec/wc_map.sh –reducer tests/testdata/exe
Init script
The MapReduce server is an example of a service (or daemon), a program that runs in the background. We’ll write an init script to start, stop and check on the MapReduce Manager and Worker processes. It should be a shell script named bin/mapreduce . Print the messages in the following examples.
 Pro-tip: Debugging is easier when you manually start a Manager, start Workers, and submit a job from the command line. We also recommend verifying that your code matches the walk- through example.
Be sure to follow the shell script best practices (Tutorial). Start server
Exit 1 if a Manager or Worker is already running. Otherwise, execute the following commands.
1 mapreduce-manager –host localhost –port 6000 –hb-port 5999 &
3 mapreduce-worker –host localhost –port 6001 –manager-host localhost –manager-port
4 mapreduce-worker –host localhost –port 6002 –manager-host localhost –manager-port
10 -o, –output DIRECTORY
11 -m, –mapper FILE
12 -r, –reducer FILE
14 –nmappers INTEGER
15 –nreducers INTEGER
Output directory, default=output
Mapper executable, default=tests/testdata/exec/wc_map.sh
Reducer executable,
default=tests/testdata/exec/wc_reduce.sh
Number of mappers, default=4
Number of reducers, default=1
Show this message and exit.

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Example: accidentally start server when it’s already running.
Stop server
Execute the following commands. Notice that || true will prevent a failed “nice” shutdown message from causing the script to exit early. Also notice that we automatically figure out the correct option for Netcat ( nc ).
1 $ ./bin/mapreduce start
2 Error: mapreduce-manager is already running
1 2 3 4 5 6 7 8 9
# Detect GNU vs BSD netcat. We need netcat to close the connection after # sending a message, which requires different options.
set +o pipefail # Avoid erroneous failures due to grep returning non-zero if nc -h 2>&1 | grep -q “\-c”; then
NC=”nc -c”
elif nc -h 2>&1 | grep -q “\-N”; then
NC=”nc -N”
elif nc -h 2>&1 | grep -q “\-C”; then
NC=”nc -C” else
echo “Error detecting netcat version.”
set -o pipefail
echo ‘{“message_type”: “shutdown”}’ | $NC localhost 6000 || true
sleep 2 # give the manager time to receive signal and send to workers
Check if the Manager and Workers have shut down. If not, kill the processes.
1 2 3 4 5 6 7 8
if pgrep -f mapreduce-manager &> /dev/null; then echo “killing mapreduce manager …”
pkill -f mapreduce-manager || true
if pgrep -f mapreduce-worker &> /dev/null; then
echo “killing mapreduce worker …”
pkill -f mapreduce-worker || true fi
Example 1, server responds to shutdown message.
1 $ ./bin/mapreduce start
2 starting mapreduce …
s://eecs485staff.github.io/p4-mapreduce/

/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
1 $ ./bin/mapreduce stop
2 stopping mapreduce …
Example 2, server doesn’t respond to shutdown message and process is killed.
Server status
Print whether or not the Manager is running, as well as whether or not any Workers are running. Exit 0 if both the Manager and at least one Worker are running. Exit 1 otherwise. Example:
1 ./bin/mapreduce stop
2 stopping mapreduce …
3 killing mapreduce manager …
4 killing mapreduce worker …
1 $ ./bin/mapreduce start
2 starting mapreduce …
3 $ ./bin/mapreduce status
4 manager running
5 workers running
6 $ echo $?
8 $ ./bin/mapreduce stop
9 stopping mapreduce …
10 killing mapreduce manager …
11 killing mapreduce worker …
12 $ ./bin/mapreduce status
13 manager not running
14 workers not running
15 $ echo $?
Restart server
MapReduce server specification
1 $ ./bin/mapreduce restart
2 stopping mapreduce …
3 killing mapreduce manager …
4 killing mapreduce worker …
5 starting mapreduce …
https://eecs485staff.github.io/p4-mapreduce/ 9/28

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Here we describe the functionality of the MapReduce server. The fun part is that we are only defining the functionality and the communication protocol: the implementation is entirely up to you. You must follow our exact specifications below, and the Manager and Worker should work independently (i.e. do not add any more data or dependencies between the two classes). Remember that the Manager/Workers are listening on TCP/UDP sockets for all incoming messages. Note: To test your server, we will only be checking for the messages listed below. You should not rely on any communication other than the messages listed below.
As soon as the Manager/Worker receives a message on its main TCP socket, it should handle that message to completion before continuing to listen on the TCP socket. For example, let’s say every message is handled in a function called handle_msg . When the message returns and ends execution, the Manager will continue listening in an infinite while loop for new messages. Each TCP message should be communicated using a new TCP connection. Note: All communication in this project will be strings formatted using JSON; sockets receive strings but your thread must parse it into JSON.
We put [Manager/Worker] before the subsections below to identify which class should handle the given functionality.
Code organization
Your code will go inside the and packages, where you will define the two classes (we got you started in and
mapreduce/worker/__main__.py ). Since we are using Python packages, you may create new files as you see fit inside each package. We have also provided a utils.py inside mapreduce/ which you can use to house code common to both Worker and Manager. We will only define the communication protocol for the Manager and the Worker: the actual implementation of the classes is entirely up to you.
A note about paths
This project will use a lot of file paths. First, using Pathlib will help a lot with file path manipulation. Second, a program shouldn’t assume that paths will be absolute (start with / ) or relative. For
mapreduce/manager
mapreduce/worker
mapreduce/manager/__main__.py
example, it should work if the mapper executable is supplied as
A note about sockets
Use context managers to automatically close sockets (explanation). Good example:
tests/testdata/exec/wc_map.sh
/Users/awdeorio/src/eecs485/p4-mapreduce/tests/testdata/exec/wc_map.sh
s://eecs485staff.github.io/p4-mapreduce/ 1

/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
1 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 2 …
Bad example:
Manager overview
The Manager should accept three command-line options:
host : host address to listen for messages port : TCP port to listen for messages hb-port : UDP port to listen for heartbeats
On startup, the Manager should do the following:
Create a new folder tmp . This is where we will store all intermediate files used by the MapReduce server. If tmp already exists, keep it. Hint: Pathlib mkdir.
Hint: use the Pathlib slash operator to “glue together” different parts of a file path. Delete any old mapreduce job folders in tmp . HINT: see Pathlib glob and use “job-*” ,
which matches a directory that you’ll create during the New job request section. Create a new thread, which will listen for UDP heartbeat messages from the Workers.
Create any additional threads or setup you think you may need. Another thread for fault tolerance could be helpful.
Create a new TCP socket on the given port and call the listen() function. Note: only one listen() thread should remain open for the whole lifetime of the Manager.
Wait for incoming messages! Ignore invalid messages, including those that fail JSON decoding. To ignore these messages use a try/except when you to try to load the message as shown below
Wait to return from the Manager constructor until all Manager threads have exited.
Worker overview
The Worker should accept five command-line options:
1 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2 …
3 sock.close()
msg = json.loads(msg) except JSONDecodeError:
https://eecs485staff.github.io/p4-mapreduce/ 11/28

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
host : host address to listen for messages
port : TCP port to listen for messages
manager-host : address at which to send messages to the Manager manager-port : Manager TCP port to send messages manager-hb-port : Manager UDP port to send heartbeat messages
On initialization, each Worker should do a similar sequence of actions as the Manager:
Create a new TCP socket on the given port and call the listen() function. Note: only one listen() thread should remain open for the whole lifetime of the Worker. Ignore invalid
messages, including those that fail JSON decoding.
Send the register message to the Manager. Make sure you are listening before sending this message.
Upon receiving the register_ack message, create a new thread which will be responsible for sending heartbeat messages to the Manager.
Shutdown [Manager + Worker]
Because all of our tests require shutdown to function properly, it should be implemented first. The Manager can receive a special message to initiate server shutdown. The shutdown message will be of the following form and will be received on the main TCP socket:
The Manager should forward this message to all of the living Workers that have registered with it. The Workers, upon receiving the shutdown message, should terminate as soon as possible. If the Worker is already in the middle of executing a task (as described below), it is okay for it to complete that task before handling the shutdown message, as both of these happen inside a single thread.
After forwarding the message to all Workers, the Manager should terminate itself.
At this point, you should be able to pass test_manager_00 ’s first part, and test_worker_00 completely. Another shutdown test is test_integration_00 , but you’ll need to implement Worker registration first.
 NOTE: The Manager should safely ignore any heartbeat messages from a Worker before that Worker successfully registers with the Manager.
2 “message_type”: “shutdown” 3}
https://eecs485staff.github.io/p4-mapreduce/

4/16/22, 9:51 PM EECS 485 Project 4: Map Reduce | p4-mapreduce
Worker registration [Ma

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com