COMP5349 – Cloud Computing
Week 7: Spark Distributed Execution
Dr. Ying Zhou School of Computer Science
Last Week
n Last week we covered distributed execution of MapReduce jobs GFS and its open source counterpart HDFS
YARN resource management system
n All systems we used so far have master/slave architecture
One node(process) takes the coordinator role with global knowledge
This is different to the master/slave copy structure you might have encountered in database
n Potential issues with single master design Single point of failure
Master as bottle neck
n Common ways to deal with such issues
Master maintains small amount of data and the important data maintained by
master are replicated in other external nodes
Minimizes master’s involvement in a cluster’s usual business
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-2
Last Week
n YARN basic components
Resource Manager, Node Manager, Application Master
n MapReduce Execution by YARN
Each Mapper/Reducer occupies a container with configurable
resources allocated
The Application Master works out the number of containers required and request those through YARN Resource Manager
YARN RM would grant AM where (nodes) to use what amount of resources
AM would launch container through node manager and decide to run which mapper/reducer on which container
¡ Each container runs a JVM executing mapper/reducer logic ¡ Data locality preference would be applied at this level
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-3
Outline
n Spark Execution View Application
Job Stage Task
n Impact of Shuffling
n Cluster Resource Specification
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-4
How Spark Execute Application
In total they represent a Spark application
“The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running”
Based on http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/ Section 5 of the original Spark paper published on NSDI’12 by Zaharia, Matei, et al
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-5
Spark Driver and Executor
Driver program
Runs in an executor, maybe on a different machine Runs in an executor, maybe on a different machine s
Runs in an executor, maybe on a different machine
Runs in an executor, maybe on a different machine
Runs in an executor, maybe on a different machine
Runs in an executor, maybe on a different machine
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-6
Driver collects back the results and save it in a file
Spark Lazy Evaluation Principle
n All transformations in Spark are lazy, in that they do
not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.
When you call count() on an RDD, all transformations leading to the construction of this RDD will be executed.
This includes the very first file reading operation
n Benefits: Better optimization by analysing the whole sequence of transformations
Transformations are usually grouped together as stages (late slides) to avoid unnecessary data shuffling
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-7
Spark Lazy Evaluation Consequence
n Because each action has its own data flow graph (lineage graph), there are possible unnecessary re-computations
What if I use count() for each intermediate RDDs ¡ This will cause a lot of re-computations indeed!
¡ Using count(), take(), first(), … for debugging purpose is appropriate
n If the computation branches out, e.g. a common RDD is used in two or more different sequences of of transformations, each ended with an action.
Call cache() or persist() on the common RDD to keep it in the memory or other storage level
cache() is a special case of persist(), which keeps the RDD in the memory
persist() accepts argument for different storage levels, but the default one is to keep RDD in the memory
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-8
Spark Application in Execution
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-9
Job, Stage and Task
n Job is triggered by actions such as count, save, etc
ratings
RDD
mid, rating
Paired RDD
Adding a count() here would create another job.
Jobs can have overlapping transformation sequence
If the RDD is persisted by an early job, all transformations leading to it in later jobs will be skipped
mid, (genre, rating)
Paired RDD values() transformation genre, rating
map transformation
join transformation
flatMap transformation
movies
RDD
mid, genre
Paired RDD
aggregateByKey transformation genre, (rateSum, rateCount)
Our application has one job, there is only one action saveAsTextFile
This is a lineage graph of the RDD (genre, ave-rating)
Paired RDD
Paired RDD
map transformation genre, avg-rating
Paired RDD
saveAsTextFile action result file
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
05-10
Narrow and Wide Dependencies
n A transformation can cause narrow or wide dependencies between the parent and child RDD
n Narrow dependency means one partition of the child RDD can be computed by only one or limited partition of the parent RDD
All map like transformations (map, flatMap, filter, etc) have narrow dependency
n Wide dependency means one partition of the child RDD may need data from all partitions of the parent RDD
All reduce like transformations (reduceByKey, groupByKey, join, etc) have wide dependency in general
In certain circumstance, they may have narrow dependency (see later slides).
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-11
Job, Stage and Task
n A job can have many stages as a DAG based on the RDD’s lineage
ratings
map
flatMap
mid, rating
mid, genre
join
mid, (genre, rating)
Values
genre, rating
aggregateByKey
genre, (rateSum, rateCount)
genre, avg-rating
saveAsTextFile action result file
movies
A stage consists of a series of narrow dependency transformations
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
05-12
Job, Stage and Task
n Transformations inside a stage can execute in a pipeline style to improve efficiency
There is no global data shuffle inside a stage n There will be data shuffling across stages
Similar to shuffle in MapReduce framework
n The pipelined transformations inside a stage represent a task
Task is the schedulable execution unit of an application ¡ Analogy to the map and reduce task in MapReduce
The number of tasks of an application depends on the number of partitions the parent RDD has
Wide dependency transformation can take a number of partition parameter
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
05-13
Job, Stage and Task
5 (join + values) tasks
map
rating p0
rating p1 map
join
rating p2
rating p3
rating p4
movies
map map
map
flatMap
mid, rating p0
mid, rating p1
mid, rating p2
mid, rating p3
mid, rating p4
mid, genre
mid, (genre, rating) p0
join
mid, (genre, rating) p1
join ……
mid, (genre, rating) p4
The movies data has one block, the ratings data has five blocks.
join does not specify partition number
genre, rating p0
genre, rating p1
……
genre, rating p4
values
values()
values()
aggregateByKey
aggregateByKey specifies to have one partition in the result RDD .
genre, (rateSum, rateCount)
Map
genre, avgRating
1 aggregateByKey + map tasks
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
05-14
Comparison between MR and Spark
n MapReduce Job:
Consists of M Mappers (map tasks) and/or R Reducers(reduce
tasks)
n Spark job is not the equivalent of MapReduce job
It could be smaller than or larger than MapReduce job
The previous spark application would needs two MapReduce jobs
n Spark stage is comparable with MapReduce’s map/reduce phase, in particular, with respect to shuffling
n Spark task is comparable with MapReduce’s task
n Spark executors stick around throughout the application execution time
Each typically run multiple tasks of different type
¡ E.g. the join task might run in the same executor of the map task
Much better data sharing within executor COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-15
Outline
n Spark Execution View n Impact of Shuffling
n Cluster Resource Specification
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-16
Shuffle and its impact
n Shuffles are fairly expensive; all shuffle data must be written to disk and then transferred over the network.
n Design and choose your transformations carefully to avoid shuffling too much data
n Transformations causing shuffle also stress memory if not designed properly
Any join, *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort.
It is preferred to have more small tasks to reduce memory stress in individual node
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-17
Map ratings p1 Map
join
mid, rating p1 join
When shuffles happen
Shuffle read
Shuffle write
ratings p0
mid, rating p0
mid, (genre, rating) p0
mid, (genre, rating) p1
join ……
mid, (genre, rating) p4
genre, rating p0
genre, rating p1
……
genre, rating p4
Map ratings p3 Map
Map
ratings p4
movies flatMap
mid, rating p2
mid, rating p3
mid, rating p4
mid, genre
ratings p2
No Shuffle, a pipeline
values
values
values
No shuffle here because one
child partition can be
computed by one parent
partition
Shuffle write
Shuffle write
aggregateByKey
Shuffle read
genre, (rateSum, rateCount)
map
genre, avgRating
No Shuffle, a pipeline
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-18
When shuffles do not happen
n There are cases where join or *ByKey operation does not involve shuffle and would not trigger stage boundary
If child and parent RDDs are partitioned by the same partitioner and/or have the same number of partition
rdd1’s partition0
Join with input co-partitioned
3 partitions 4 partitions
rdd1 = someRdd.reduceByKey(…)
3 partitions 2 partitions
rdd2 = someOtherRdd.reduceByKey(…)
rdd3 = rdd1.join(rdd2)
3 partitions 3 partitions 3 partitions
The two reduceByKey operations use the same key and end up with 3 partitions.
rdd1’s partition0 and rdd2’s partition 0 contain the same set of keys and can join to produce rdd3’s partition0, this is narrow dependency
rdd2’s partition0
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-19
When shuffles do not happen (cont’d)
n If the parent RDD uses the same partitioner, but
result in different number of partitions. Only one
parent RDD needs to be re-shuffled for the join
operation
shuffle
This reduceByKey operation produces two partitions
rdd1 = someRdd.reduceByKey(…)
shuffle
2 partitions
4 partitions
shuffle
The join operation, by default, adopts the larger partition number from its partens.
2 partitions
3 partitions
2 partitions 3 partitions 3 partitions
rdd2 = someOtherRdd.reduceByKey(…) rdd3 = rdd1.join(rdd2)
This reduceByKey operation produces three partitions
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-20
Outline
n Spark Execution View n Impact of Shuffling
n Cluster Resource Specification Specifying Available Resources for YARN
Specifying default resource requirements for Spark application Specifying individual application resource requirements
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-21
Cluster Deployment Modes
YARN/Mesos/Standalone
Depending on where the driver is running, spark application can be submitted in either cluster or client mode
With YARN cluster mode, the driver runs in a container, which also runs the AM and each executor runs in a container
With YARN and client, the driver runs on separate process, AM runs in a container, each executor runs in a container
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-22
How to submit Spark Application
n Spark applications are usually submitted using spark-submit script
Specify a few important parameters
n For debugging on local installation, you can set all important parameters in a configuration object and pass it to SparkContext. The program then run as regular Python application.
spark-submit \ –master yarn \
–deploy-mode cluster \
–num-executors 3 \
–py-files ml_utils.py AverageRatingPerGenre.py \ –input movies/ \
–output genre-avg-python/
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-23
System Wide Resource Specification
n Main resource types Memory size
Core number
¡ Control the level of parallelism, one thread per core
n YARN usually knows system wide available resources Total memory per host
¡ yarn.nodemanager.resource.memory-mb Total core per host
¡ yarn.nodemanager.resource.cpu-vcores
n These values were set in a configuration file
In EMR: /etc/hadoop/conf/yarn-site.xml
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-24
Sample System Resources
3 node Hadoop Cluster with m4.xlarge instance, each with 8 vCPU, 32G memory
Spark Application Resource Specification
n Driver resource specification spark.driver.memory
¡ Amount of memory to use for the driver process
¡ Default: 1G
spark.driver.cores
¡ Number of cores to use for the driver process, only in cluster mode. ¡ Default:1
n Executor resource specification spark.executor.memory
¡ Amount of memory to use per executor process
¡ Default: 1G
spark.executor.cores
¡ The number of cores to use on each executor
¡ Default 1
spark.default.parallelism
¡ Default number of partitions in RDDs returned by transformations like join, reduceByKey,
¡ Default: 20
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-26
Spark Application Resource Specification
n Many properties have default values set in configuration file In EMR: /etc/spark/conf/spark-defaults.conf
spark.executor.memory
spark.executor.cores
spark.driver.memory
spark.dynamicAllocation.enabled true
n Per application setting can be specified on the spark-submit script
spark-submit \ –master yarn \
–deploy-mode cluster \ — num-executors 3 \
— executor-cores 8 \
— executor-memory 12G
9486M 4 2048M
~ 9.26G, I made a mistake in the recording
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-27
Application History Screen Shot
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-28
Application History Screenshot: stages
Spark starts two tasks for file with only one block
ratings p0
ratings p1
ratings p2
ratings p3
ratings p4
Movies p0
Movies p1
Map
Map
Map
Map
Map
flatMap
flatMap
mid, rating p0
mid, rating p1
mid, rating p2
mid, rating p3
mid, rating p4
mid, genre
mid, genre
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-29
Stage Task Execution Details
At any time point, 4 tasks are running in parallel in one executor , because the executor is set to have 4 cores
5 tasks running on ratings data (542M in 5 blocks)
2 tasks running on movies data (1.7M in one block, with 2 splits)
spark.executor.cores 4
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-30
Executor View
spark-submit \
–master yarn \ –deploy-mode cluster \
spark.dynamicAllocation.enabled true
–num-executors 3 \
–py-files ml_utils.py AverageRatingPerGenre.py \ –input movies/ \
–output genre-avg-python/ spark.executor.memory
spark.driver.memory
9486M 2048M
spark.yarn.executor.memoryOverheadFactor 0.1875
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-31
Another Execution Plan
spark-submit \
–master yarn \
–deploy-mode cluster \
–executor-memory 4G \
–py-files ml_utils.py AverageRatingPerGenre.py \ –input movies/ \
–output output_2/
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-32
Executors Requested Then Removed
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-33
Stage 0 Task Execution in Two Executors
2 executors on the same node
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-34
Data Locality
n Data locality means how close data is to the code processing it
Both MapReduce and Spark work on the principle of shipping code instead
of data
n Spark Data Locality Levels
PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY data is elsewhere on the network and not in the same rack COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-35
Data Locality Information
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou ) 07-36
References
n Spark Documentation
n Sandy Ryza, How-to: Tune Your Apache Spark Jobs (part 1) published on March 09, 2013
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
n Sandy Ryza, How-to: Tune Your Apache Spark Jobs (part 2) published on March 30, 201
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
COMP5349 “Cloud Computing” – 2020 (Dr. Y. Zhou )
07-37