School of Computer Science Dr.
COMP5349: Cloud Computing Sem. 1/2022
Week 7: Distributed Execution Engine
Introduction
Copyright By PowCoder代写 加微信 powcoder
07.04.2022
In this lab, we will practice executing MapReduce and Spark applications in a distributed EMR cluster. The objectives of this lab are:
• Understanding replication in HDFS
• Understanding various execution stages in MapReduce application including level of
parallelism in map and reduce phase and the performance cost of shuffling stage • Understanding Spark execution on YARN
This lab runs entirely on EMR. Only a few shell windows and browser windows are needed on the client side.
Question 1: Prepare the Environment
To observe various effects of distributed execution, a five-node EMR cluster will be started and used in the lab. We have also prepared a few relatively large data sets for this lab. To minimize network latency, all data files used in this lab are stored in an AWS S3 bucket and made public.
a) Start 5-node EMR Cluster with Both MapReduce and to the AWS Academy Learner Lab course and access the AWS console as you did in the week 4 lab. Go to the EMR dashboard and click the ’Create Cluster’ button to create a cluster. Click the go to advanced options link to enter the advanced options selection page.
• In Step 1: Software and steps configuration, select Release: emr-6.5.0. For the software component, check Hadoop 3.2.1 and Spark 3.1.2
• In Step 2: Hardware configuration, specify 1 master node and 4 core nodes, all of which are of c4.xlarge instance type.
• In Step 3: General cluster settings, update the cluster name to something more indicative, e.g. “comp5349-week7”; keep everything else at the default set- tings.
• In Step 4: Security, select the key that you have created in earlier labs in the EC2 key pair selection box.
Note that the cluster provision and configuration may take around 5 to 6 minutes.
b) Set Up Dynamic Port Forwarding
We need to access a number of Web UIs in this lab including HDFS Namenode, YARN Resource manager and Spark history server. Dynamic port forwarding is required to access HDFS and YARN Web UI.
Follow the AWS instructions to set up browser proxy and SSH tunnel. It is recom- mended to set up browser proxy (Part 2) while waiting for the cluster to start. Once the master node is provisioned with a public DNS, part 1 can be performed. Part 1 is needed for every new cluster with a different master node.
• Part 1: Set Up an SSH Tunnel to the Master Node Using Dynamic Port Forwarding • Part 2: Proxy Configuration. This guide contains instructions for Firefox browser
using Foxyproxy and for Chrome browser using SwitchyOmega.
c) Update Security Group Rules EMR has default security groups for master and slave nodes. The Web UIs run on the master node but are only accessible within the cluster. The only port with public access on the master node is 22. We need to add a few incoming rules for master’s security group to enable public access of the Web UIs. You are recommended to wait until the cluster is ready before you update the security group. The security groups are listed in the summary page of the cluster in the section Security and access (figure 1).
Figure 1: Master Security Group
Click the link to the ElasticMapReduce-master security group and add the following rules as indicated in figure 2
Figure 2: Additional Incoming Rules for Master Node d) Access Data Used in Lab
For the Hadoop application, two text files with a format similar to partial.txt are provided: n06.txt and n08.txt.
They can be downloaded to any EC2 instance with AWS CLI support using the fol- lowing commands (replace the last part with the directory you want to save the file; the command assumes that you wish to store the downloaded files in a subdirectory called data within the current user’s home directory):
aws s3 cp s3://comp5349-data/week6/n06.txt ~/data/ –no-sign-request
aws s3 cp s3://comp5349-data/week6/n08.txt ~/data/ –no-sign-request
For the Spark application, two csv files are provided: movies.csv and ratings.csv. They are larger than the two files used in the week 5 lab. They can be downloaded to an EC2 instance with the following commands (again, replace the last part with the directory you want to save the file):
aws s3 cp s3://comp5349-data/week6/ratings.csv ~/data/ –no-sign-request
aws s3 cp s3://comp5349-data/week6/movies.csv ~/data/ –no-sign-request
Both MapReduce and Spark applications in this lab will read input from HDFS and write output to HDFS. You need to put the data file on HDFS using put command. Be- low is a sample command for putting file n06.txt under current user’s home directory in HDFS
hdfs dfs -put data/n06.txt n06.txt
Question 2: HDFS
After configuring the browser proxy and setting up the SSH tunnel, you can access the HDFS Web UI on port 9870 of the master node by accessing http://ec2-xxx-xxx-xxx. ompute-1.amazonaws.com:9870 in a Web browser.
This Web UI looks similar to the Web UI in pseudo distributed mode. The “Overview” tab shows general information about the cluster software, configuration and file system
information. In pseudo distributed mode, you always see a single living data node, which is the master node itself. With a cluster of five nodes, there will be 4 living data nodes.
HDFS uses replication to achieve durability and high availability. The replication factor is a configurable parameter. The default configuration of your EMR cluster sets the replication factor to 2. It means there are two copies of each file stored on HDFS. The copies will be stored in different nodes. HDFS is designed to store very large files. It does not store a large file as a single entity. Instead, files are partitioned into fixed sized blocks. The default block size is 128 MB. For instance, a 400 MB file will be stored as 4 blocks; the size of the first three blocks is 128 MB each and the size of the last block is 16 MB. Each block will have two copies. In total, this file would be stored in eight blocks. HDFS ensures the blocks are distributed relatively evenly in the cluster.
The storage distribution information can be easily found using the HDFS Web UI. From the HDFS Web UI, click the menu Utilities. Then, click on the menu item Browse the filesystem and navigate to the folder /user/hadoop. This is the home directory of the user hadoop on HDFS. hadoop should be the user you used to log in to the master node. It is HDFS convention to put all users’ home directories under /user.
Assuming you have put all data files under /user/hadoop in HDFS, you will see the four files; all file names are clickable. Now click n06.txt to inspect its block distribution. By default, it will show block 0’s information. Basic information include the block ID, which is an internal ID HDFS uses to identify each blocks and availability, which shows the nodes that have a copy of this block.
The size of n06.txt is 386.86MB. It is stored as 4 blocks in HDFS. You can use the drop down list to select blocks 0, 1, 2 and 3, and display their respective storage distribution information. Try to find out the location of the replica for each block and answer the following questions
• Are replicas of the same block distributed on different nodes?
• Which node has the metadata such as the number of blocks and their replica loca- tions?
Question 3: MapReduce Execution on YARN
We will run the week 4 application again using a larger data set to inspect various statis- tics. The easiest way is to clone the python resources repository on the master node.
Below is a sequence of commands to:
• Install git;
• Clone the repository;
• Change the present working directory to where the script is located;
• Enable the execute permission bit for the file owner of tag driver.sh; and
• Execute the program by supplying an input file n08.txt and and output directory n08 out.
sudo yum install git
git clone \
https://github.sydney.edu.au/COMP5349-Cloud-Computing-2022/python-resources.git
cd python-resources/week4/naive
chmod 700 tag_driver.sh
./tag_driver.sh n08.txt n08_out
a) Job Progress Information
Once the job is submitted successfully, the MapReduce framework will run the job and print a copious amount of information on the console. Below is the job progress output after running the above submit command for the Java application:
INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-93-194.ec2.internal/172.31.
INFO input.FileInputFormat: Total input files to process : 1
INFO net.NetworkTopology: INFO net.NetworkTopology: Adding a new node: /default-rack/172.31
INFO mapreduce.JobSubmitter: number of splits:16
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1618877612882_0001
INFO impl.YarnClientImpl: Submitted application application_1618877612882_0001
INFO mapreduce.Job: The url to track the job: http://ip-172-30-2-174.ec2.internal:20888/pro
INFO mapreduce.Job: Running job: job_1618877612882_0001
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 13% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 67%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: job_1618877612882_0001 completed successfully
You should have seen similar output while you were running jobs in the week 4 lab.
The sample output seems to suggest that that reduce phase starts when the map phase is 100% complete. This is not the case and we can tell when the reduce phase starts from the application master log. The first step any reducer does is to obtain intermediate results from the map tasks. This is called shuffle in MapReduce terminology. To improve execution time, a reducer starts shuffling data when some map tasks finish.
The next part of the output includes important execution statistics. Those execution statistics are logged and can be viewed from the history server Web UI afterwards (de- fault port is 19888). The first section of the execution statistics contains File System
Counters we have seen in the week 4 lab. Below is the sample output of application application 1618877612882 0001:
File System Counters
FILE: Number of bytes read=79590893
FILE: Number of bytes written=163271106
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=422493302
HDFS: Number of bytes written=53832814
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
The section reports statistics about local and HDFS read/write. The program’s input is located on HDFS /user/hadoop/n08.txt. The file’s size is about 402 MB, which is roughly the same as the reported HDFS bytes read: 422493302. The program’s output is written on HDFS and its size is about 52MB, which is roughly the same as the reported HDFS byte written 53832814. The local file system read and write happens during the shuffle period.
b) Overall Job Statistics
The next section reports statistics about the overall job. Below is the sample output from the same application:
Job Counters
Killed map tasks=1
Launched map tasks=16
Launched reduce tasks=3
Data-local map tasks=12
Rack-local map tasks=4
Total time spent by all maps in occupied slots (ms)=21335616
Total time spent by all reduces in occupied slots (ms)=10233600
Total time spent by all map tasks (ms)=222246
Total time spent by all reduce tasks (ms)=53300
In total, sixteen map tasks are launched; There is also a killed map task. Three reduce tasks are launched. Our input file n08.txt is about 402 MB in size and is stored as four blocks in HDFS. By default, there should be four map tasks; each of them works on a block of the file. The stream jar started 16 tasks based on the property mapreduce.job.maps as set in the Hadoop configuration file. This property has a value of 16. In the code, we have specified that three reducers be used. Hence, three reduce tasks are launched. The killed map task is a speculative map task. You are expected to see different output in terms of killed task(s) as the speculative mechanism is triggered at execution time.
The output differentiates two types of map tasks: data-local tasks and rack-local tasks. A Data-local map task runs on a node where a copy of its data is stored. This is the most efficient way, involving no network transfer. The Rack-local map task runs on a node with no copy of the data but the data is stored on a node in the same rack of the task node. A Rack-local task involves transferring data between nodes but the transfer happens within the rack. Depending on how tasks are scheduled, the number of data-local and rack-local tasks may differ between runs.
More detailed information on speculative task(s) as well as data-local or rack-local tasks can be found from YARN Web UI by inspecting the individual application’s his- tory. The YARN home page contains a list of all applications with a history link for each application (see Figure 3).
Figure 3: All applications
Figure 4 shows an individual application’s execution overview.
Figure 4: Individual Job History View
The killed attempt does not appear here because it was killed before it could be as- signed a container. Again, different executions may exhibit different behaviour with respect to speculative tasks. Detailed information on application execution is provided in its application master’s log. Below are snippets of AM log entries:
Before Scheduling: PendingReds:3 ScheduledMaps:16 ScheduledReds:0 AssignedMaps:0 AssignedReds:0 Got allocated containers 11
Assigned container container_1618877612882_0001_01_000002 to attempt_1618877612882_0001_m_000001_0 Assigned container container_1618877612882_0001_01_000003 to attempt_1618877612882_0001_m_000002_0 Assigned container container_1618877612882_0001_01_000004 to attempt_1618877612882_0001_m_000000_0 …
Assigned container container_1618877612882_0001_01_000008 to attempt_1618877612882_0001_m_000010_0 …
Got allocated containers 1
Assigned container container_1618877612882_0001_01_000013 to attempt_1618877612882_0001_m_000011_0 Got allocated containers 1
Assigned container container_1618877612882_0001_01_000014 to attempt_1618877612882_0001_m_000012_0 Got allocated containers 1
Assigned container container_1618877612882_0001_01_000015 to attempt_1618877612882_0001_m_000013_0 …
Task succeeded with attempt attempt_1618877612882_0001_m_000001_0
Num completed Tasks: 1
Progress of TaskAttempt attempt_1618877612882_0001_m_000002_0 is : 1.0 …
Task succeeded with attempt attempt_1618877612882_0001_m_000002_0
Num completed Tasks: 2
Before Scheduling: PendingReds:3 ScheduledMaps:2 ScheduledReds:0 AssignedMaps:14 AssignedReds:0 CompletedMaps:2 CompletedReds:0 Reduce slow start threshold reached. Scheduling reduces.
Received completed container container_1618877612882_0001_01_000003
Received completed container container_1618877612882_0001_01_000002
Got allocated containers 2
Assigned container container_1618877612882_0001_01_000016 to attempt_1618877612882_0001_m_000014_0 Assigned container container_1618877612882_0001_01_000017 to attempt_1618877612882_0001_m_000015_0 ..
All maps assigned. Ramping up all remaining reduces:3
DefaultSpeculator.addSpeculativeAttempt — we are speculating task_1618877612882_0001_m_000000 We launched 1 speculations. Sleeping 15000 milliseconds.
Scheduling a redundant attempt for task task_1618877612882_0001_m_000000 attempt_1618877612882_0001_m_000000_1 TaskAttempt Transitioned from NEW to UNASSIGNED
Before Scheduling: PendingReds:0 ScheduledMaps:1 ScheduledReds:3 AssignedMaps:14 AssignedReds:0 CompletedMaps:2 …
task_1618877612882_0001_m_000000 Task Transitioned from RUNNING to SUCCEEDED attempt_1618877612882_0001_m_000000_1 TaskAttempt Transitioned from UNASSIGNED to KILLED
Got allocated containers 1
Assigned container container_1618877612882_0001_01_000018 to attempt_1618877612882_0001_r_000000_0
The log snippets highlight a few important scheduling and allocation behaviours:
• Map tasks are scheduled and assigned before reduce tasks. The first reduce task is assigned a container after all map tasks have been assigned.
• Reduce tasks are assigned containers before all map tasks finish executing. A re- duce slow start threshold is used to control when reduce tasks will be scheduled. In the above example, after two map tasks have finished executing, AM started to schedule reduce tasks.
• Scheduling means waiting in queue for resources and tasks can be killed while waiting; the speculative attempt 1618877612882 0001 m 000000 1 task was killed because the original task task 1618877612882 0001 m 000000 Task finished exe- cuting while the speculative attempt was still waiting for resources.
c) The Output
All applications write their outputs on HDFS. If you check the output directory, you will
usually find three files with names like part-r-00000, part-r-000001 and part-r-00002. 8
They are the outputs of the reduce tasks: one for each. We have specified three re- duce tasks. Hence, there are three output files. The output files of reduce tasks are named using the pattern part-r-xxxxx, where the number is the id of the reduce task, starting from 0. All Hadoop output files are named in this style. The letter indicate whether it is produced from a mapper or a reducer. Output produced by a map task will be named as part-m-xxxxx. The number indicates the id of the mapper/reducer that produced the output.
d) Run the Sample Application on Different Input
Now try to run the two versions of the sample application using different input (e.g. n06.txt) and inspect the execution statistics.
Question 4: on YARN
A Spark program can be deployed and managed by YARN. The spark driver program can run inside the cluster (cluster deploy mode) or as an external client program (client deploy mode). The deploy mode option can be specified explicitly in the spark-submit script. In this section, we will run the Spark sample program from the week 6 lab on a large data set consisting of two files with the same names: movies.csv and ratings.csv. You need to put these two files on HDFS.
We have prepared a script version of the sample application used in week 6. The script specifies neither a file protocol nor absolute path for the location of the input files. In a local spark installation, this is interpreted as present working directory on the local file system. In a Hadoop cluster, this is interpreted as the current user’s HDFS home directory. The following two commands upload the two csv files to HDFS under “hadoop” user’s home directory.
hdfs dfs -put movies.csv
hdfs dfs -put ratings.csv
A submission script python genre avg.sh is also provided. The following command sub- mits the application to the cluster:
./genre_avg.sh spark-out
The argument specifies an output directory on HDFS.
By default, Spark is not configured to print out a lot of running statistics on the console. Instead, it provides a history server to display various statistics. EMR starts the history server by default. The history server runs on port 18080 of the master node. It can be accessed directly or through YARN.
Figure 5: Server Homepage
Figure 5 shows the home page of the history server if accessed directly on port 18080. The completed applications will appear on the homepage with a link to its detailed execu- tion statistics.
The application execution statistics screen has several tabs. The default one is the Jobs tab as shown in figure 6. It shows all the jobs of the application. This particular application started only one job. Click on the link for this job to reveal the Stages v
程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com