程序代做 COMP5349: Cloud Computing Sem. 1/2022

School of Computer Science Dr.
COMP5349: Cloud Computing Sem. 1/2022
Objectives
Week 4: Basic MapReduce

Copyright By PowCoder代写 加微信 powcoder

In this lab, we will practice basic Hadoop commands and coding in a pseudo cluster using AWS EMR (Elastic MapReduce). The objectives are:
• Learn to request AWS EMR (Elastic MapReduce) and configure access to it using a single node cluster (pseudo cluster mode)
• Learn to use basic HDFS commands
• Run sample programs on EMR using either the local file system or HDFS for in-
put/output
• Understand map/(combiner)/reduce data pipelines in MapReduce applications
• Inspect execution on vCPUs
Lab Exercise
Question 1: Start Single Node EMR
a) Request EMR cluster
Login to AWS Management Console from AWS Academy Learner Lab. Find the EMR service under the Analytics heading. Click EMR to enter the EMR dashboard. The EMR dashboard looks like the EC2 dashboard but with less items on the left panel. Click the Create cluster button to open the window for cluster settings. The default screen shows a few quick options. Click the link ”Go to advanced options” (see Figure 1).
The advanced options are grouped into four steps.
• In Step 1: Software and steps configuration, select Release: emr-6.2.0. For the software component, check ONLY Hadoop 3.2.1. Uncheck all other prese- lected components.
17.03.2022

Figure 1: Cluster Creation Options
• In Step 2: Hardware configuration, under the Cluster Nodes and Instances heading, specify 1 Master Node, 0 Core Nodes, and 0 Task Nodes. Select c4.xlarge instance for Master Node.
• In Step 3: General cluster settings, update the cluster name to something more indicative, e.g. “comp5349-week4”; use the default settings for everything else.
• In Step 4: Security, select the key that you have created in a previous lab in the EC2 key pair selection box (if you do not have any valid keys, then you will need to create another one).
Figure 2: Software Configuration
Once you have updated all necessary options, click the Start Cluster button to cre- ate the cluster. Cluster creation may take a few minutes. It involves instance provi- sioning, software installation and configuration. Once the instance is provisioned and configured, the cluster’s state will be Waiting state (Figure 3).
Master Node’s public DNS address is shown in the summary screen. This information is available once Master Node is provisioned. In this lab, we will interact with the cluster via Master Node’s command line interface. This means we need to manage the security setting of this node by allowing incoming traffic to at least port 22. Scroll down to the bottom left corner of the summary page to view the Security and access section (see figure 4). Click the link next to Security groups for Master to inspect and, if necessary, edit the rules.
Some rules have already been defined in this group to allow internal communication among cluster nodes. You will need to add one rule to allow external SSH con- nections. This opens port 22 for incoming TCP traffic. Do not change or delete any other rules.

b) Connect to Master Node
Figure 3: Master Public DNS
Figure 4: Security Group
A sample connection command can be found from the summary page of the cluster by clicking the link Connect to Master Node Using SSH on the summary page (see figure 3 bottom right). Note that the sample command for macOS/Linux assumes the security key is stored under your home directory. You need to update the path accordingly.
Question 2: Run Sample Program
In this exercise, we will run a sample word count program using the data file place.txt as input. The data file has been uploaded to repo: python-resources under the week4/data directory.
GIT is not installed by default on Master Node and can be installed by running the com- mand sudo yum install git. Once installed, run the following command to clone the python-resources repository on Master Node.
git clone \
https://github.sydney.edu.au/COMP5349-Cloud-Computing-2022/python-resources.git

The user name is your UniKey and the password is the password for your UniKey login. Assuming that the repository is cloned in the hadoop user’s home directory, you can invoke the following command to run a sample wordcount program using the input file place.txt.
hadoop-mapreduce-examples wordcount \
file:///home/hadoop/python-resources/week4/data/place.txt \
hadoop-mapreduce-examples is a script to conveniently invoke sample programs included in every Hadoop release; a Hadoop installation comes with several sample programs. The above command runs the wordcount program; the argument immediately after the script name indicates the program to run. The other arguments are program-specific arguments: input path and an output path. They are specified as follows:
file:///home/hadoop/python-resources/week4/data/place.txt
This input path argument has a protocol prefix file://. This instructs hadoop to read the input from the local file system. Hadoop can read/write data from a number of file systems. The default file system for cluster mode (including pseudo-cluster mode) is HDFS and a protocol prefix is needed for any other file system.
The output path argument in the command above does not have a protocol prefix, which indicates HDFS; this means the output will be saved in the count out folder under the current user’s home directory on HDFS. We will see how to inspect files on HDFS in the next exercise.
After the application is submitted, progress information will be printed on the console as the job starts to execute. A summary will be printed when the job terminates. The number of map and reduce tasks can be found under the “Job Counters” heading. For this particular application, you may see counter information as follows:
Launched map tasks=1
Launched reduce tasks=1
This means only one map and one reduce task is running. No parallelism is observed.
Hadoop always creates a new output directory each time an application is submit- ted. It does not overwrite any existing directory! If you specify an existing directory as the output directory, the application will exit with an error. When you need to run the same program multiple times, make sure you either remove the output directory before submitting the job or change the name of the output directory for each execution. The command to remove a directory in HDFS is hdfs dfs -rm -r

.

Question 3: HDFS basic commands
HDFS provides a list of commands for normal file system operations such as listing di- rectory content, creating directories and copying files. All HDFS commands have a prefix hdfs dfs. The actual commands are otherwise similar in name and format as Linux file system commands. It is worth noting that the HDFS command line has a default present working directory that always points to the current user’s HDFS home directory; any other location can be specified using an absolute path (starting with the root directory /). There is no corresponding HDFS command like cd to change the present working directory. Figure 5 show the results of a few ls command variations:
Figure 5: HDFS Commands and Outputs
The first one is an ls command without any parameters. It will display the contents of the current user’s home directory in HDFS, which was initially empty but now contains the output directory count out after running the sample program in the previous step.
The second one shows the contents of the root directory of HDFS /. It contains a few directories like app, tmp, user, and var.
The third one shows the contents of the directory /user. This is the parent directory of all user’s home directories, including user hadoop.
Question 4: Inspect contents of files in HDFS
There are two ways to inspect the contents of an HDFS file. You may inspect it using com- mands like cat,head or tail. Below is a sample cat command to print out the contents of the output file part-r-00000.
hdfs dfs -head count_out/part-r-00000
A more convenient way is to download the file to the local file system and view it using either the command line or graphical user interface tools. The following is a sample com- mand that downloads the output file part-00000 and saves it as count out.txt in the present working directory:

hdfs dfs -get count\_out/part-r-00000 count_out.txt
Question 5: Run another Sample Program with input/output both on HDFS
Next, upload place.txt to hdfs and run another sample program with the following com- mands:
hdfs dfs -put ~/python-resources/week4/data/place.txt
hadoop-mapreduce-examples grep \
place.txt \
placeout \
“/Australia[\d\w\s\+/]+”
The first command uploads the file place.txt in the specified directory to the HDFS home directory of the user hadoop as place.txt.
The second command runs the grep sample program to find all records with the word “Australia” in it. The grep program requires three parameters: input location, output loca- tion and a regular expression to search for. In the above command, place.txt specifies the input location, placeout specifies the output directory, and the last argument is the regular expression. Both the input and output paths use the default file system HDFS.
Try it yourself: Use the correct HDFS command to inspect the first few lines in the output file of this program.
Question 6: A Customized Sample Program
In this exercise, you will see the complete Python source code of a MapReduce applica- tion. The input of the program is a csv file. Each line represents a photo metadata record using the following format:
photo-id owner tags date-taken place-id accuracy
The tags are represented as a list of words (tags) separated by white space. A sample
input file partial.txt is stored under the week4/data directory.
The MapReduce application builds a tag-owner index for each tag appearing in the data set. The index key is the tag itself, the entry records the number of times an owner has used this tag in the data set. For example, if a data set contains only these three records:
509657344 protest 2007-02-21 02:20:03 xbxI9VGYA5oZH8tLJA 14
520345662 winter protest 2009-02-21 02:20:03 cvx093MoY15ow 14
520345799 winter protest 2009-02-21 02:35:03 cvx093MoY15ow 14

The program would generate an index with two keys: “protest” and “winter”.
The index of “protest” would look like
The index of “winter” would look like
a) Python Source Code
The Python source code contains two versions: a naive implementation and a version with a combiner function. They are stored under different directories. Each version consists of two Python scripts that implement the map and reduce functions respec- tively.
In the naive implementation, the map function is designed to process a line of the input file. The key is the byte offset of the line, which we do not need to use in the future. The value is the actual line content, expressed as a string. It outputs a list of (tag, owner) pairs.
The reduce function is designed to process all information related with a particular tag. The input key is the tag and the input value is a list of owners that have used this tag. If an owner has used this tag three times, the same (tag, owner) pair will appear three times in the value list. The output key is still the tag while the output value is a summary of the owner data with respect to that tag.
Below is a summary of the map and reduce functions highlighting the theoretic in- put/output key value pairs:
(fileOffset, line) => {(tag, owner1), (tag,owner2),…}
(tag,{owner1, owner1, owner2, …}) => (tag,{owner1=x, owner2=y, …})
The actual input to Python’s reduce function is sorted but not grouped. It looks like this:
tag1, owner1
tag1, owner1
tag1, owner2
tag2, owner3
Developers need to implement their own program logic to identify the key change.
In the combiner implementation, a combiner function is applied to perform local ag- gregation after each map. The combiner function does exactly the same operations

as the reduce function. To make it work, we change the format of the input/output key value pairs as follows:
(fileOffset, line) => {(tag, owner1=1),(tag,owner2=1),…}
(tag,{owner1=a, owner2=b, …}) => (tag,{owner1=x, owner2=y, …})
(tag,{owner1=a, owner2=b, …}) => (tag,{owner1=x, owner2=y, …})
The actual input to Python reduce function is slightly different from the theoretic one above. Below are examples of grouped and ungrouped reduce input, using the com- biner version.
Two records with grouped format, theoretic input of reduce function as seen if imple- mented in Java:

Sorted but not grouped format, typical input of reduce function in Python code:

b) Run naive implementation
Python scripts are invoked by a streaming jar that comes with the Hadoop installation. We need to provide a number of arguments to indicate the scripts, the input and other properties. A shell script is provided for each version. For the naive version, we have tag driver.sh. For the combiner version, we have tag driver combiner.sh. Each shell script contains all the necessary arguments for running the appropriate Python scripts with streaming jar.
The following commands will upload the sample input file into HDFS and run the shell script to start the MapReduce application without a combiner.
hdfs dfs -put ~/python-resources/week4/data/partial.txt
./tag_driver.sh partial.txt naive_out
If the application completes successfully, you will see from the output that it has started 4 map tasks and 3 reduce tasks. The number of reduce tasks is specified in the shell script; the number of map tasks is determined by the framework based on various properties that can be specified in configuration files.
The output also contains summary information about the input and output data of map,combiner and reduce function as follows.
Map input records=73
Map output records=331

Map output bytes=6801
Map output materialized bytes=4344
Input split bytes=468
Combine input records=0
Combine output records=0
Reduce input groups=233
Reduce shuffle bytes=4344
Reduce input records=331
Reduce output records=233
Spilled Records=662
Matching colour is used to highlight the data pipeline. In the naive implementation, the output of the map function becomes the input of the reduce function. The 331 map output records are fed directly as the input of the reduce function in Hadoop streaming. The output contains 233 unique keys. Hence, the reduce input group value is 233. Each unique key generates a reduce output record. So the reduce output contains 233 records. No combiner is used so the values for combiner input and output record are 0.
c) Run combiner version
Assuming that the present working directory is “python-resources/week4”, run the combiner version of the application as follows:
cd combiner
./tag_driver_combiner.sh partial.txt combiner_out
The application will start the same number of map and reduce tasks. You may notice slightly different summary information about the input and output data of map,combiner and reduce functions as follows:
Map input records=73
Map output records=331
Map output bytes=7463
Map output materialized bytes=4285
Input split bytes=468
Combine input records=331
Combine output records=266
Reduce input groups=233
Reduce shuffle bytes=4285
Reduce input records=266
Reduce output records=233
Spilled Records=532
Using a combiner adds a stage in the original data pipeline. The map output becomes the input of the combiner and the combiner output becomes the input of the reduce function. Because the combiner runs locally on the output of each individual map task

and we have 4 map tasks, the combiner output contains some duplicate keys. Hence, we see 266 combiner output records and 233 reduce input groups. Those will be further aggregated in the reduce stage to produce one record for each unique key.
d) Test Your Understanding
Consider the three row sample data:
509657344 protest 2007-02-21 02:20:03 xbxI9VGYA5oZH8tLJA 14
520345662 winter protest 2009-02-21 02:20:03 cvx093MoY15ow 14
520345799 winter protest 2009-02-21 02:35:03 cvx093MoY15ow 14
Assuming that there is only one reducer, answer the following questions:
• What are the output of the map phase in the naive version?
• What are the input of the reduce phase in the naive version?
• What are the output of the map functions in the combiner version?
• What are the input of the combiner functions in the combiner version? • What are the output of the combiner functions in the combiner version? • What are the input of the reduce function in the combiner version?
Question 7: vCPU usage
The c4.xlarge instance we use in the EMR cluster has 4 vCPUs by default, configured as 2 cpu cores each with 2 hyper threads (cpu instance optimization). This allows up to 4 threads to run concurrently on the virtual machine. In this section, we use a few sample applications to see how tasks are allocated on each vCPU.
Start another SSH connection to Master Node. This shell window will run the top com- mand to monitor CPU usage when a map/reduce application is executed in the pseudo cluster.
Run the top command and press the 1 key on your keyboard to show individual vCPU usage statistics. You should see at the top of the output four lines each representing the statistics of a vCPU. The default list option show four lines each representing the statistics of a vCPU such as percentage of time spent on executing user space processes, system process, and in idle states. There are other display options. You can press the t key twice to swap to a simple graph block view.
In the original SSH window, rerun the word count application. Remember to either use a different output directory name or remove the output directory created in previous run. This application has 1 map task followed by 1 reduce task. Both run in a short period of time; you are likely to see the map task and reduce task each run on a single vCPU. Figure 6 shows a sample top output when the map task is complete(map 100% in the top window). It is clear that the vCPU (cpu3) with high utilization value on user space processes is assigned to run the single map task of this application.

Figure 6: Word Count Application map phase vCPU usage
Rerun the sample python application and observe the CPU statistics during the execution. You may notice that nearly all vCPUs are used when the application is executing. The application starts 4 map tasks and 3 reduce tasks. All map tasks and all reduce tasks can run concurrently on Master Node. Figure 7 shows a sample top output during the map stage.
Figure 7: Sample Python application map phase vCPU usage
Now we will see if a long running task would be scheduled to a different vCPU during its life cycle. We will need a slightly larger input file and will set the number of reduce tasks to 1.
Use the following commands to download a larger input file n06.txt from a public S3 bucket and upload it to HDFS:
mkdir ~/data
aws s3 cp s3://comp5349-data/week6/n06.txt ~/data/ –no-sign-request
hdfs dfs -put ~/data/n06.txt n06.txt
Remove the line “-D mapreduce.job.reduces=3 ́’ in tag driver.sh. This would use the default value 1 for the reduce task number

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com