Introduction Background
Example MapReduce job Worker registration
Job submission
Fault tolerance Conclusion
Copyright By PowCoder代写 加微信 powcoder
This site uses Just the Docs, a documentation theme for Jekyll.
Search CS 162 HW 5
TABLE OF CONTENTS
1 Receiving tasks
2 Executing tasks
a Map tasks
b Reduce tasks
3 Finishing tasks
4 Autograder
In this part, you will implement the remainder of the basic MapReduce system. Specifically, you will be implementing distributing map and reduce tasks to workers as well as executing those tasks.
Note: This task must be completed by the checkpoint 1 deadline to receive full credit (more information can be found on Piazza).
Receiving tasks
After a worker registers with the coordinator, it should be assigned a task. You can do this either by having the coordinator tell the worker to complete a task, or by having the worker request a task from the coordinator. Either method is fine, but our reference solution implements the second option, and we’ll assume you do the same.
Design and implement an RPC to request a task from the coordinator. You’ll need to think about what information the worker needs to execute a task. Use the type signatures of the map and reduce functions (defined in src/lib.rs ) to guide you. You may want to take a look at the KeyValue struct definition in lib.rs and bytes::Bytes documentation.
For map tasks, the key passed to the map function should be the input file name, and the value should be the contents of the file. You may read the entire file into memory.
Workers that are not running a task should poll the coordinator for a task every WAIT_TIME_MS milliseconds. The coordinator should assign tasks in the correct queue order. It should not assign any reduce tasks until all map tasks are complete or assign any task to more than one worker at a time.
Once you are done, sanity-check that tasks are being assigned correctly by inserting logging statements.
Executing tasks
Once the worker receives a task, it should execute the task by calling the map or reduce functions appropriately. The coordinator itself should never execute any tasks.
We recommend taking a look at the documentation in codec to help with serializing data. Map tasks
In addition to reading in and calling the map function on the contents of the file, you will need to separate the resulting keys into buckets (remember that bucket should be computed using ihash(key) % n_reduce ) and store them in memory. This will allow you to send the necessary keys when a reduce worker requests map results for its specific reduce task.
To read files, we recommend reading the documentation on tokio::fs::File and tokio::io::AsyncReadExt . To serialize key/value pairs for storage and communication purposes, use codec::LengthDelimitedWriter ; write the key then the value.
Here is a code snippet from our reference solution that you may find helpful (you may need to add appropriate use statements). It reads the contents of a file ( file is asumed to be a String ) into a buffer and creates a KeyValue pair that you should pass to an application map function.
let mut content = Vec::new();
let mut file = tokio::fs::File::open(&file).await?;
file.read_to_end(&mut content).await?;
let content = Bytes::from(content);
let kv = KeyValue {
key: Bytes::from(file),
value: content,
You should then call the application map function on kv and the aux arguments provided in the SubmitJobRequest :
(app.map_fn)(kv, aux)
Reduce tasks
Once all map tasks for a job are done, the coordinator will start assigning reduce tasks. In order to get the key/value pairs needed for its reduce task, a worker must know who to ask to retrieve the results for each map task. As such, when the coordinator assigns a reduce task, it should tell the reduce worker where to find map task results. You would likely include something of the form “Worker ID X has the results of map task Y”.
A reduce worker should then contact the appropriate worker to get map task results. You have freedom to implement any interface you choose, but you probably should have the requester include:
• The map task number it is requesting data from
• The reduce task/bucket number it was assigned
• The ID of the job
You will need to define an RPC for requesting data in proto/worker.proto and implement the RPC stub in src/worker/mod.rs . To connect to another worker’s gRPC server, you can use the worker::connect function.
To recover key/value pairs from the serialized data stored by the map worker, use codec::LengthDelimitedReader ; read the key first, then the value.
Once all of the necessary key/value pairs have been retrieved, the reduce worker needs to split the pairs up by key. It should then call the reduce function on the key and a list of all values associated with that key, writing pairs consisting of a key and the result of calling reduce on its values to an output file. Once again, use codec::LengthDelimitedWriter to serialize data to the output file, writing the key and then the value. The documentation for tokio::io::AsyncWriteExt may also be useful here.
As a reminder, your output files must be named mr-out-i , where 0 <= i < n_reduce , is the reduce task number in order for your code to pass the autograder. You must also use
codec::LengthDelimitedWriter for serialization, writing the key before the value.
You may find that you have a Vec of values (that should correspond to the same key) that you want to pass to an application reduce function. You can convert the Vec into the boxed iterator the reduce function expects like this:
let iter = Box::new(vec.into_iter());
Here is a code snippet from our reference solution that loops through each key and its corresponding values, calls reduce on them, then writes them to a buffer (you may need to add appropriate use statements):
let mut writer = LengthDelimitedWriter::new();
for (key, values) in &kv_grouped {
// `values` is a vector of values (type `Vec
let output = reduce_fn(key.clone(), Box::new(values.into_iter()), aux.clone())?;
writer.send(key);
writer.send(output);
let buf = writer.finish().freeze();
Here is a snippet of code that writes a buffer of Bytes to a MapReduce output file:
let name = format!(“{}/mr-out-{}”, output_dir, task);
let mut out_file = tokio::fs::File::create(name).await?;
out_file.write_all(&buf).await?;
Finishing tasks
If a task completes successfully, the worker should notify coordinator. Design an RPC to do this.
Once the coordinator learns that a task is complete, it should update its data structures. Once all map tasks for a job are complete, the coordinator should begin assigning reduce tasks. Once all map and reduce tasks for a job are complete, the coordinator should mark the job complete. Subsequent calls to the PollJob RPC should have done = true .
Autograder
Once you complete this portion of the assignment, you should be passing the autograder tests up to and including mr-no-duplicates .
Back to top
Copyright © 2022 CS 162 staff.
程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com