School of Computer Science Dr. Ying Zhou
COMP5349: Cloud Computing Sem. 1/2020
Week 6: Distributed Execution Engine
Introduction
02.04.2020
In this lab, you will practice executing MapReduce and Spark applications in distributed EMR cluster. The objectives of this lab are:
• Understand replication in HDFS
• Understand various execution stages in MapReduce application including level of
parallelism in map and reduce phase and the cost of shuffling stage
• Get initial idea of Spark execution on YARN as preparation for week 7 lecture
This lab runs entirely on EMR, only a few shell windows and browser windows are needed on the client side.
Question 1: Prepare the Environment
To observe various effects of distributed execution, a five-node EMR cluster will be started and used in the lab. We have also prepared a few relatively large data set for use in the lab. To minimize network latency. All data files used in this lab are stored on AWS S3 bucket and made public.
a) Start 5-node EMR Cluster with Both MapReduce and Spark
Login to AWS console and go to EMR dashboard, click ’Create Cluster’ button to start creating a cluster. You will be presented with a “quick options” page, we used this page in week 4 lab to create a single node cluster. Now click go to advanced options link right next to Quick Options title at the top of the page to enter the advanced options selection page.
• In Step 1: Software and steps configuration, select Spark: Spark 2.4.4 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.2
• In Step 2: Hardware configuration, select to use 1 master node and 4 core nodes, all of which are of m4.xlarge instance type.
• In Step 3: General cluster settings, update the cluster name to something more indicative, e.g. “comp5349-week6”; keep everything else as default
1
• In Step 4: Security, select the key you have created in early labs in EC2 key pair selection box. You may also edit the security group to open port 22 here.
Note that the cluster provision may take a while.
b) Set Up Dynamic Port Forwarding
We have used local port forwarding in week 4 lab to enable access to HDFS and YARN web UI. If you have tried to click various links from YARN web UI, you may have noticed that some links may result in error because the use internal IP and port numbers that are not set up in the local port forwarding. To ensure accessibility of every links, we need to use dynamic port forwarding and browser proxy. AWS has detailed two step set up instructions
• Part 1: Set Up an SSH Tunnel to the Master Node Using Dynamic Port Forwarding • Part 2: Proxy Configuration
c) Access Data Used in Lab
For Haddop application, two text files with format similar to partial.txt are provided:
n06.txt and n08.txt.
They can be downloaded to any EC2 instance with AWS CLI support using the follow- ing commands (replace the last part with the directory you want to save the file, the command assumes a data directory in the current user’s home directory):
aws s3 cp s3://comp5349-data/week6/n06.txt ~/data/ –no-sign-request
aws s3 cp s3://comp5349-data/week6/n08.txt ~/data/ –no-sign-request
For Spark application, two csv files are provided: movies.csv and ratings.csv. They are larger than the two files used in week 5 lab. They can be dowloaded to EC2 instance with the following commands (again, replace the last part with the directory you want to put save the file):
aws s3 cp s3://comp5349-data/week6/ratings.csv ~/data/ –no-sign-request
aws s3 cp s3://comp5349-data/week6/movies.csv ~/data/ –no-sign-request
Both MapReduce and Spark applications in this lab will read input from HDFS and write output to HDFS. You need to put the data file on HDFS using put command. Below is a sample for putting file n06.txt under current user’s home directory in HDFS
hdfs dfs -put n06.txt n06.txt
Question 2: HDFS
After configuring the browser proxy and set up the SSH tunel, you can access the HDFS Web UI on port 50070 of the master node’s DNS http://ec2-xxx-xxx-xxx.ompute-1. amazonaws.com:50070 from a browser.
The web UI would look similar to the web UI you have seen in the pseudo distributed mode. The “Overview” tab would show general information of the cluster software,configuration
2
as well as the file system information. In pseudo distributed mode, you always see a sin- gle living data node, which is the master node itself. With a cluster of five, there will be 4 living data node.
HDFS uses replication to achieve durability and high availability. The replication factor is a configurable parameter. The default configuration of your EMR cluster sets replication factor to 2. It means there are two copies of each file stored on HDFS. The copies will be stored in different nodes. HDFS is designed to store very large files. It does not store large file as a single entity. In stead, files are partitioned into fixed sized blocks. The default size is 128M. For instance, a file of 400MB will be stored as 4 blocks, the size of the first three blocks are 128MB each. The size of the last block is 16M. Each block will have two copies. In total, this file would be stored in eight blocks. HDFS ensures the blocks are distributed relatively even in the cluster.
The storage distribution information can be easily found using the HDFS web UI. From theHDFSWebUI,clickthemenuUtilitiesthenthemenuitemBrowse the filesystem and navigate to the folder /user/hadoop. This is the home directory of user hadoop on HDFS. hadoop should be the user you used to log on to the master node. It is HDFS convention to put all user’s home directory under /user.
Assuming you have put all data files under /user/hadoop in HDFS, you will see the four files, all file names are clickable. Now click n06.txt to inspect its block distribution. By default, it will show block 0’s information. Basic information include the block ID, which is an internal ID HDFS used to identify each blocks; and availability, which shows the nodes that have a copy of this block.
The size of n06.txt is 386.86MB. It is stored as 4 blocks in HDFS. You can use the drop down list to select block 1,2 and 3 and display their respective storage distribution information. Try to find out the location of the replica for each block and answer the following questions
• Are replicas of the same block distributed on different nodes?
• Which node has the meta data information such as the number of blocks and their
replica locations?
Similar to local installation, you can interact with the distributed HDFS using HDFS shell command. You may issue the command from a remote client. It is more convenient to login to one of the slave node and use that as the client to issue commands there.
Question 3: MapReduce Execution on YARN
We will use the week 4 application again, but running on a larger dataset to inspect various statistics. The easiest way is to clone the python resources repo on master node.
Below is a sequence of commands to:
3
• install git;
• clone the repo;
• change present working directory to where the script locates;
• change the mode of tag driver.sh to executable; and
• execute the program by supplying an input file n08.txt and and output directory n08 out
sudo yum install git
git clone \
https://github.sydney.edu.au/COMP5349-Cloud-Computing-2020/python-resources.git
cd python-resources/week4/naive
chmod 764 tag_driver.sh
./tag_driver.sh n08.txt n08_out
a) Job Progress Information
After submitting the job successfully. MapReduce framework will start to run the job and print many information on the console. Below is the output of job progress after running the above submit command for Java application:
INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-88-150.ec2.internal/172.31.
INFO input.FileInputFormat: Total input files to process : 1
INFO net.NetworkTopology: Adding a new node: /default-rack/172.30.2.79:50010
…
INFO mapreduce.JobSubmitter: number of splits:16
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1585649694747_0001
INFO impl.YarnClientImpl: Submitted application application_1585649694747_0001
INFO mapreduce.Job: The url to track the job: http://ip-172-30-2-174.ec2.internal:20888/pro
INFO mapreduce.Job: Running job: job_1585649694747_0001
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 13% reduce 0%
…
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 33%
INFO mapreduce.Job: map 100% reduce 92%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: Job job_1585649694747_0001 completed successfullyy
You should have seen similar output when running jobs in week 4 lab. The progress report might be much shorter for small input data.
The sample output show that reduce phase starts when the map phase is 100% com- pletes. This could be unique to Python MapReduce application. In Java application, it
4
is quite common to see reduce tasks start before mapper finishes in large MapReduce jobs. The first step any reducer does is to obtain intermediate results from mappers. This is called shuffle in MapReduce terminology. To improve execution time, reducer starts shuffling data before mapper finishes execution when it is in native mode. This is possible because mapper emits results continuously on each input key value pair. The mapper calls the user supplied map function on every line of the input file and each call generates some key value pairs as the result. The results will be buffered in memory and spill to the disk. Reducers establish RPC calls with the mapper nodes to obtain the intermediate results on the disk. Theoretically, reducers can start to request intermediate results after the first batch of intermediate results are spilled to disk by mapper.
The next part of the output include important execution statistics. Those execution statistics are logged and can be viewed from the history server webUI afterwards. The first section of the execution statistics contains File System counters. Below is the sample output of application application 1554164887016 0029:
File System Counters
FILE: Number of bytes read=79590893
FILE: Number of bytes written=163271106
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=422493302
HDFS: Number of bytes written=53832814
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
The section reports statistics about local and HDFS read/write. The program’s input is located on HDFS /user/hadoop/n08.txt. The file is about 402MB, roughly the same as the reported HDFS bytes read: 422493302. The program’s output is written on HDFS and is about 52MB in total, roughly the same as the reported HDFS byte written 53832814.The local file system read and write happens during the shuffle period.
b) Overall Job Statistics
The next section reports statistics about the overall job. Below is the sample output from the same application:
Job Counters
Launched map tasks=16
Killed reduce tasks=1
Launched reduce tasks=4
Data-local map tasks=11
Rack-local map tasks=5
5
Total time spent by all maps in occupied slots (ms)=21335616
Total time spent by all reduces in occupied slots (ms)=10233600
Total time spent by all map tasks (ms)=222246
Total time spent by all reduce tasks (ms)=53300
…
In total sixteen map tasks are launched; Four reduce tasks are launched with one killed. Our input file n08.txt is about 402MB in size and is stored as four blocks in HDFS. By default, there should be four map tasks, each work on a block of data. The stream jar started 16 tasks based on a property mapreduce.job.maps set in hadoop configurtion file. This property has a value 16. We have specified to use three reduc- ers in the code, hence three reduce tasks are completed. The killed reduce task is a speculative reduce task.
For map tasks, it also report the number of data-local tasks. Data-local task means the mapper is running on a node where a copy of its data is stored. This is the most efficient way, involving no network transfer. The rack-local tasks means the map task is running on a node with no copy of the data, but the data is stored on a node in the same rack of the task node. Rack-local task involves transferring data between nodes, but the transfer happens within the rack.
The history server webUI provides more detailed information on speculative tasks as well as data-local or rack-local tasks. The history server can be accessed from YARN ui at port 8088, or from its own port 19888. Figure 1 shows the home page of history server which lists all completed applications and a brief summary of map and reduce tasks.
Figure 1: History Server Entry Page
More detailed view can be obtained by clicking the job id. Figure 2 shows the individual job’s history overview.
Clicking either ‘map’ or ‘reduce’ would bring the summary screen for map and re- duce tasks. Figure 3 shows the summary screen for all map tasks. It shows the start, end time of each task and the total elapsed time. The total elapsed time for the first three tasks are around 20 seconds. You will notice that the last task task 1554164887016 0029 m 000003 takes much less time than the rest. It’s total elapsed time is only 6 seconds. The map tasks’ execution time depends a lot on the input data size. The size of the first three blocks of input file n08.txt is 128M each, while the size of the last block is around 19MB. This explains the shorter execution time.
6
Figure 2: Individual Job History View
Figure 3: Summary screen of reduce tasks
Clicking on each task would bring the detail screen for each task. The first re-
duce task has two attempts. Figure 4 shows the detail screen of the first reduce
task. The detail screen shows the node executes the task and if multiple attempts
have been started. In this example two attempted have been started. The first one attempt 1585649694747 0001 r 000000 0 is started on node ip-172-30-2-80.ec2.internal at Tue Mar 31 22:07:36 +1100 2020. The attempt’s progress is considered as slower
than average and at Tue Mar 31 22:07:53 +1100 2020, another attempt was started on the ip-172-30-2-79.ec2.internal trying to catch up the speed. The second at- tempt was for speculation and was killed because the first attempt finishes first.
Now inspect the details of all individual map tasks and figure out some data-local task. You will need to check HDFS webUI to figure out the location of each block. And double check the log file of each task to see which block it is supposed to process.
The rest of the statistics in this section include execution times measured at various scales. The history server webUI provides a better view of time spent. Figure 3
7
Figure 4: Detail screen of individual reduce task
shows the summary screen of reduce tasks. There are in total three reduce tasks with similar execution time. The execution time of reducer also depends a lot on the data it needs to process. Similar execution time indicates that the default partitioner partition the map results evenly to three partitions. The actual execution time of reducers are divided into three stages: shuffle, merge and reduce. The last stage reduce measures the time taken to run the user defined reduce function. The shuffle stage measures the time taken to transfer the map results to reducer nodes. The merge stage measures the time taken for reducer node to prepare merge results from different mapper and to group them for reducer input. It is clear that in this application, where the reduce function does simple processing, majority of the reducer time are spent on obtaining and preparing the data.
c) I/O Statistics and Comparison with Combiner Version Output
The next output section reports statistics about I/O and execution time for map and re- duce stage. Again, the same information can be found from history server’s Counters page. Below is a subset of the sample output, highlighting the size of shuffled data. The application does not use combiner, all mapper’s output are transferred to reduc- ers. You can see that the Map output materialized bytes is exactly the same as Reduce Shuffle bytes. The Map output records is also exactly the same as Reduce input records.
Map-Reduce Framework
Map output records=27339121
Map output bytes=598469997
Map output materialized bytes=80391737
…
Combine input records=0
Combine output records=0
…
Reduce shuffle bytes=80391737
Reduce input records=27339121
Reduce output records=828616
…
You can run the combiner version of the same application on the same data set. The
8
combiner version can be found in folder combiner. The script is called tag driver combiner.sh. run it like:
./tag_driver_combiner.sh n08.txt n08_comb_out
The detailed execution statistics of this application can be viewed in history server WebUI. Below is the console output highlighting the difference on shuffle size
Map-Reduce Framework
Map input records=2723874
Map output records=27339121
Map output bytes=647564515
Map output materialized bytes=31869623
…
Combine input records=27339121
Combine output records=1451612
Reduce shuffle bytes=31869623
Reduce input records=1451612
Reduce output records=828616
In the combiner version, the Reduce shuffle bytes=31869623. It is much smaller than the naive version with Reduce shuffle bytes=80391737. The Reduce input records=1451612 is also much smaller than the Map output records=27339121. The combinerdecreasestherecordsizebynearly20fold:Combine input records=27339121 becomes Combine output records=1451612. As a result the overall reduce tasks take 6 seconds (see Figure 5).
Figure 5: Summary screen of reduce tasks in Combiner version
d) The Output
All applications write their output on HDFS. If you check the output directory, you will usually find three files named like part-r-00000,part-r-000001 and part-r-00002. They are the output of the reduce tasks: one for each. We set to use three reduce tasks, hence three output files. The output of reduce tasks are named using the pattern part-r-xxxxx, where the number is the id of the reduce task, starting from 0. All Hadoop output files are named in this style. The letter indicate whether it is
9
produced from a mapper or a reducer. Output produced by map task will be named as part-m-xxxxx. The number indicates the id of the mapper/reducer that produces the output.
e) Run the Sample Application on Different Input
Now try to run the two versions of the sample application on a different input (e.g. n06.txt) and inspect the execution statistics.
Question 4: Spark Execution on YARN
Spark program can be deployed and managed by YARN. The spark driver program can run inside the cluster (cluster deploy mode) or as an external client program (client deploy mode). Client mode is the default option. The deploy mode option can be specified explicitly in the spark-submit script. In this section, we will run the Spark sample program in week 5 lab on a large data set consisting of two files: movies.csv and ratings.csv. You need to put these two files on HDFS as well. In all scripts we assume the two files are put in a directory “movies” under the home directory of the current user on HDFS. The following sequence of command will create a direttory ‘movies’ on HDFS under ‘hadoop‘ user’s home and upload the two csv files there.
hdfs dfs -mkdir movies
hdfs dfs -put movies.csv movies/movies.csv
hdfs dfs -put ratings.csv movies/ratings.csv
We have prepared a script version of the sample application used in week 5 together with a submit script. You can submit the application to the cluster by executing the submit script python genre avg.sh
Spark by default is not configured to print out a lot of running statistics on the console. Instead, it provides a history server to visually view the statistics. The history server runs on port 18080 of the master node. Figure 6 shows the history server’s home page.
The history server provide a lot of execution information. You can explore by clicking various tabs. Some will be covered in next week’s lecture. One thing worth checking out is the data flow of the program. This can be found from the Jobs tab, and job’s DAG visualization link. Figure 7 shows the theoretic execution DAG of the week 5 sample application. The version you will see will be slightly different, as showin in Figure 8. This is because PySpark re-write a few operations’ implementation.
10
Notice
Warning:
Figure 6: Spark History Server Home Page
• Always terminate (or stop in some rare cases) all your AWS resources after finish with it. Otherwise, MONEY WILL BE LOST!
11
Figure 7: Average Rating Per Genre Theoretic Data Flow
12
Figure 8: Average Rating Per Genre Python Data Flow
13