School of Information Technologies Dr. Ying Zhou
COMP5349: Cloud Computing Sem. 1/2020
Week 9: Spark DataFrame API and Performance Observation
Overview
0.1 Objectives
The objectives of this lab are:
• Learn to use EMR Notebook
• Get familiar with common operations in DataFrame API
• Understand how DataFrame operations are executed by Spark framework • Experiment and observe spark application’s performance
0.2 Running Environment
This lab runs entirely on AWS EMR, only a few shell windows and browser windows are needed on the client side. You may access EMR from any client side OS you prefer.
Two EMR clusters will be started in this lab, most of the questions are practiced on a single node cluster. Question 5 uses a multi node cluster. Make sure you terminate both before leaving the lab! Please note that using single node cluster is mainly for cost consideration. You may use a multi-node cluster through out the lab if cost is not an issue.
0.3 Data Set
The movie rating data set will be used again to demonstrate the basic data frame opera- tions. The data set contains the following two csv files:
AcopyofthedatasetisputonAWS S3withpublicreadpermissiontominimizenetwork latency.
1
30.04.2020
0.4 Brief Intro to EMR Notebook
Jupyter notebook is a very useful tool for developing interactive data analytic applications. It is also possible to use Jupyter notebook on EMR cluster. EMR notebook is a “serverless” Jupyter notebook. It supports Sparkmagic kernels, which is a set of tools that allows Jupyter notebooks to interact with Spark cluster through Apache Livy, a REST server for Spark. This allows developers to interactively run Spark applications on EMR clusters. All EMR notebooks run on master node and there is limitation on the number of notebooks that can run simultaneously based on master node’s capacity. The actual tasks are sent to execute on worker node. It is possible to use EMR notebook to run application on very large data set.
EMR notebook has the following requirements for the clusters:
• The cluster must have Hadoop, Spark and Livy component.
• The cluster must be launched within an EC2-VPC, not the classic one.
• The cluster must be created using Amazon EMR release version 5.18.0 or later.
2
Question 1: Create Single Node Notebook Compatible Cluster
In this step we are going to create a single node cluster with required setting to run Jupyter Notebook. We also configure to allow executors to utilize the maximum resources possible on each node in a cluster.
Login to AWS console and proceed to S3 dashboard. Create a bucket to store materials belonging to this course, the bucket name should be unique. We assume it is called “comp5349-uniKey”. Create a folder in the bucket to store this week’s content. You may call it “week9”.
Now proceed to EMR dashboard, click ‘Create Cluster’ button to start creating a cluster. You will be presented with a “quick options” page, click go to advanced options link right next to Quick Options title at the top of the page to enter the advanced options selection page. Do the following when you are in advanced options:
• Step 1: Software and steps configuration: select EMR release 5.29 from the drop down list. Then click Hadoop 2.8.5,Spark 2.4 and Livy 0.6.0 to include the three components. In Edit Software Settings section, paste the following con- figuration into the provided text box. This is the configuration to enable maximum resource allocation
[
{
} }
]
• Step 2:
stead of default EC2 classic network. Also select to use 1 master node and 0 core nodes. You may use m4.xlarge instance type for AWS Educate account. See Figure 2 as example.
3
“Classification”: “spark”,
“Properties”: {
“maximizeResourceAllocation”: “true”
Figure 1: Cluster Software Configuration
Hardware configuration: in the network drop down list, choose a VPC in-
Figure 2: Cluster Hardware Configuration
• Step 3: General cluster settings: change the cluster name to something more indicative, e.g. “comp5349-week9”; keep everything else as default
• Step 4: Security: select the key you have created in early labs in EC2 key pair selection box.
Click launch cluster after all necessary configuration has been set.
Question 2: Create an EMR Notebook
EMR notebook is considered as AWS component with its own ID. It should be created from AWS console and stored in S3; The location in S3 can be specified by user, but AWS always added a subfolder representing NotebookID and save the notebook in that subfolder. All notebooks created in this way can be seen from EMR dashboard of AWS console and they can be attached to different clusters.
The week9 folder in course repo contains a template of the notebook to be used. You need to create a new notebook from scratch on AWS console, then copy and paste code from the template.
EMR notebook is created from EMR dashboard. After creating a cluster, go back to the dashboard and click Notebooks on the left hand side menu list. This will bring you to the noteboook page. Click “Create notebook” button to start the creation process. See Figure 3
On the Create notebook page:
• Enter a notebook name such as “MovieData Summary”. Note that the notebook
name cannot contain space.
• In “clusters” selection, click “Choose an existing cluster” then click choose button. This will bring up a pop up window showing all eligible clusters. The cluster you have created should be listed there. Select it then click “Choose cluster” button to choose it. See Figure 4
4
Figure 3: Create notebook
Figure 4: Choose existing clusters
• In “Notebook location” selection, click “Choose an existing S3 location in us-east-1”, this will bring up a window showing your S3 buckets, Choose the bucket and folder you just created, which may have a uri like s3://comp5349-unikey /week9
• click Create notebook button to create it.
• This will brings you to the notebook detail page. You will see the notebook status changes from “Starting” to “Pending” to “Ready”. When it is ready, the “open” button is enabled and you can click it to open the notebook (see Figure 5). The notebook will open in a new tab.
Now Open the notebook template from github repo, copy and paste the content to your newly created notebook. Remember to save the notebook for later use. you can use EMR notbook in the same way as Jupyter notebook. EMR notebook is configured to access many kernels. Check that the kernel associated is Pyspark. If it is not, you should change the kernel to Pyspark
5
Figure 5: Notebook Ready
Question 3: SparkSQL Dataframe Basic Operations
The sample code in the notebook has been covered partially in the lecture to illustrate basic DataFrame operations and the generated execution plan with optimization.
The complete example can be obtained from the course repo. It is an EMR notebook. It is hard coded the input file for EMR environment.
The CSV files do not contain a header row. A schema is specified while loading the csv into a DataFrame. Each schema is represented by a StructType matching the struc- ture of the csv file. (Specifying Schema). Simple filtering, joining, grouping and window operations are used to obtain various summary statistics.
The example also illustrates different I/O options. The input files are read from S3. Each file is referenced with a full path with protocol prefix (S3://). The last statement merges all partial results into a single file and writes it to folder wk9-results. The default storage for EMR cluster is hdfs, the ouput folder will be created under the hdfs home directory of the current user. If the code is run as EMR notebook, the current user is “livy” and you can find the output from this location on hdfs:/user/livy/wk9-results. If you need to run the last cell multiple times. Make sure you either change the output location name or login in to Master node to remove the directory from HDFS.
Question 4: Execution Observation
a) Effect of Caching
In the sample, the original ratings DataFrame is used in a few jobs. Each job would build its own computation graph starting from reading the input csv to create the ratings DataFrame. This is not I/O efficient. We can use caching to store the creatd DataFrame in memory for later use. Update the sample code by adding .cache() at the csv reading statement. E.g. Update the following statement
ratings = spark.read.csv(rating_data,header=False, schema=rating_schema) to
ratings = spark.read.csv(rating_data,
header=False, schema=rating_schema).cache()
6
Figure 6: Spark History Home page
Re-run the code to observe execution differences in spark’s history sever. For note-
book application, you may find the history from “incomplete applications” See Figure
6.
You may notice that the execution plan for this particular query starts from InMemoryTableScan (see Figure 7 in contrast to read csv file (seen in lecture slides). The overall size of input/output would be different as well.
b) Query and Job
The sample code seems to suggest that there is a one to one mapping between query and job. This is because we put a show() or collect() at nearly every query to inspect the results. Each call to show() or collect() indicate the end of a job and Spark will start executing the computation graph to produce the result.
A query could be totally ignored if it is not returning any result to the console or if its result is not used by a following query that needs to return a result. To test this, remove the call for collect() from the following statement and re-run the code/or cell.
ratings.filter(“mid<=5").groupBy('mid').avg('rate').collect()
You may find this query is not executed at all. This is an example of Spark’s lazy
execution feature. Now try to comment out the statement romance 1939.show():
romance_1939=movies.filter(movies.title.contains("1939") \
& movies.genres.contains("Romance"))
# romance_1939.show()
romance_1939.drop('genres').join(ratings,'mid','inner') \
.drop('ts','uid').groupBy('title').avg('rate').collect()
In the original version, the previous filter query is executed as one job and the fol- lowing join query executed as 2 jobs, with a total of 3 jobs. When this statement is commented out. The previous filter query does not convert to a job. The next join query is converted to two jobs as before.
c) Task Number and Shuffling Property
You may have noticed that in some stage, in particular, stages involving join or groupby
7
type of operations, there are 200 tasks. For instance, the second stage of job execut- ing the following statement has 200 tasks.
ratings.filter("mid<=5").groupBy('mid').avg('rate').collect()
This is controlled by a property called spark.sql.shuffle.partitions. The property specifies the number of partitions to use in join and groupby like operations. It has a default number 200. The property is only applicable for SparkSql API. In RDD based API, we can set partition number in each operation call. The default value can be overridden in code using statement like:
spark.conf.set("spark.sql.shuffle.partitions", 20)
The statement set the partition number to 20. Now re-run the code and see if the task
number changed to 20
Question 5: Multi Node Cluster
Now stop the single node cluster and start a 3 or 4 node cluster with the same setting. The notebook you have created can be attached to this cluster using the following steps:
• From the notebook page, find the notebook you just created from the list. • click the notebook to enter its detail page
• click change cluster button to enter change cluster page
• choose an existing cluster
• click Change cluster and start notebook button.
This may take a little while, and you can see the notebook’s status changes from “Starting” to “Pending” to “Ready”. When it is ready, the “open” button is enabled and you can click it to open the notebook. The notebook will open in a new tab. Run the notebook again and check the executor tab in Spark History server. You will see that the driver is running on the master node while there are many other executors running on the worker node. This is the client deployment mode. See Figure 8 as an example. You also noticed that the driver memory is much less than the executor memory.
Question 6: Write your own code
Use the DataFrame API to re-implement the average rating per genre application in week 5 lecture and lab.
8
Figure 7: Cache Effect
9
Figure 8: Client Deployment Mode Driver and Executor
10