School of Computer Science Dr. Ying Zhou
COMP5349: Cloud Computing Sem. 1/2020
Week 8: Spark On YARN
23.04.2020
In this lab, you will inspect the execution statistics of a few Spark applications in dis- tributed EMR cluster. The objectives of this lab are:
• Understand job, stage and tasks in Spark execution
• Understand the relationship between executors and tasks
• Understand the level of parallelism controlled by various properties
Question 1: Prepare the Environment
Follow week 6 instruction to start a 5-node EMR cluster with both MapReduce and Spark configured.
Login to the master node to
• download the lab code from course git repository and
• download movies.csv and ratings.csv from s3.
We mainly use Spark history server UI to inspect Spark execution statistics in this week’s lab. EMR provides an easy way to access alternative Spark history server without SSH tunneling. The link to that alternative Spark history server can be round on your newly created cluster’s entry page. Figure 1 highlight the link.
Figure 1: Direct Link to Spark History Server
If you have configured the browser proxy and tunneling last week, you can access Spark history server from the master node’s 18080 port. This option enables you to access YARN and HDFS Web UI as well. Otherwise, using the link provided by EMR is sufficient.
1
Question 2: Spark Execution on YARN
Follow instruction of week 6 question 4 to execute the application to compute average rating per genre. After the application completes successfully. Access the Spark history server using either EMR provided link or from master node’s 18080 port. Figure 2 shows the entry page of the Spark history server accessed from EMR provided link. If the page shows No completed applications found!, click the link “Show incomplete applications” at the bottom of the page to find your application.
Figure 2: Spark History Server Entry Figure 3 shows the application list.
Figure 3: Spark Application List
Click the application ID will bring you to the application’s history page. The history server displays an application’s history at various levels: Jobs, Stages, Storage, Environment, Executors. The default view is at Jobs level as show in Figure 4. The entry point of Jobs view shows a list of jobs created for the application. This particular application has only one job.
Figure 4: Spark Application History Job View
Each job in the application has a clickable link to show details of that job. Individual job view includes a summary of stages and number of tasks in each stage, as show in Figure 5. Note that PySpark merges the stages handling different inputs into a single stage, in total we see three stages for the application instead of four as discussed in the lecture.
Spark framework builds a Directed Acyclic Graph (a DAG) of stages for each job. This helps to understand dependencies of RDDs in the job. The stage DAG can be inspected
2
Figure 5: Summary of stages and tasks in a job
in individual job view , by expanding “DAG Visualization” link. An example of the DAG of this job can be found on week 6 lab hand out. The basic execution element in Spark application is “task”. Details of how long and where each task executes can be seen from the Stages view. The Stages view is accessible by clicking the Stages tab, or by clicking individual stage link on a job’s stage summary page. Figure 6 shows detailed information of all tasks in stage 1. Here we expanded the “Event Timeline” to visualize each task’s execution statistics. We can see that the 7 tasks of this stage are executed in parallel in two executors, running on different nodes. The first executor runs 4 tasks, the second runs 3 tasks.
Figure 6: Stage 1 task details
The whole application has only two executors. Each has run multiple tasks in parallel and also in batches, depending on the level of parallelism supported by the executor. The current executor is configured to run up to 4 tasks in parallel. This is controlled by the default spark.executor.cores value set to 4. The first two stages all have 7 tasks, the two executors are able to run each stage’s tasks in parallel in one batch. They run tasks of different stages in different batch. Figure 7 shows that in stage 0, executor 0 runs 3 tasks while executor runs 4 tasks all in parallel. The Stages view also shows a lot of other details of tasks in tabular format, which you can explore yourself. The statistics grouped by executors can be seen from Executor view. In particular, you may want to check the data locality level of each task and the shuffled data size.
3
Figure 7: Stage 0 task details
We specify in the submit script to have 3 executors to run the application. This is not
observed because the spark.dynamicAllocation.enabled property is set to True.
Now run the application with different resource configuration for executors and see if there is any difference in the execution statistics. Week 7 slides provide a few examples of spec- ifying resource configurations, such as the memory limit. You can also change the default executor-core numbers. Remember to remove the output directory before executing the application a second time.
Question 3: Understand DAG job, stage and task
We provide another application to help you understand the important spark execution concepts: job, stage and task. This application finds the top movies per genre based on the number of ratings each movie receives. It runs on the same movie rating data set.
The Python source code and submission scripts can be found on week 8 folder of the course repository. The Python source code is put in a single script: genre top movies.py. You may inspect the source code in your favorite text editor or in Github directly.
The overall processing is similar to the previous one. It uses two reduce like operations: reduceByKey and groupByKey. Both operations have wide dependencies between the parent and child RDDs. They will trigger stage boundaries.
The reduceByKey operation is applied on rating data to compute the total number of rat- ings per movie. The mapToPair and reduceByKey operation sequence closely mimics the classic word count MapReduce program.
The rating count and movie data are joined on movie id to associate the genre information with the rating.
The groupByKey operation is used to group all movies as well as their respective counting for each genre.
4
Figure 8 shows the DAG involved in computing the final RDD.
Figure 8: Spark DAG of Genre Top 5 Program Figure 9 show the task summaries of this application’s only job.
Figure 9: Genre Top 5 Task Summary
Inspect the history of this application to see how many jobs, stages and tasks there are. Use different resource allocation to observe changes on execution statistics. Remember to remove the output directory before executing the application a second time.
5