CS代考 Cloud Computing INFS3208

Cloud Computing INFS3208
Updates
CRICOS code 00025B 2

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• How Spark works
CRICOS code 00025B 3

Cloud Computing INFS3208
What is Apache Spark?
2009 2013
2014
CRICOS code 00025B 4

Cloud Computing INFS3208
Limitations of MapReduce in Hadoop
1. Slow Processing Speed
• Reads and writes data to and from the disk • Batch-oriented
2. Difficult to Use
• Hand code every operation
• No interactive mode, hard to debug
3. No Iterative Processing
• No support for cyclic data flow
CRICOS code 00025B 5

Cloud Computing INFS3208
Characteristics of Spark – Speed
Apache Spark achieves high performance in terms of processing speed
• Process data mainly in memory of working nodes
• Prevent unnecessary I/O operations on disks
Performance:

Sorting Benchmark in 2014
 100TB data
 206 nodes in 23 mins (Spark)
 2,000 nodes in 72 mins (MapReduce)
 3x speed performance and 1/10 resources
Machine Learning algorithm
 100x faster than Hadoop for logistic regression

https://spark.apache.org
CRICOS code 00025B 6

Cloud Computing INFS3208
Characteristics of Spark – Ease of Use
Spark has provided many operators making it easy to build parallel apps.
• map / reduce
• filter / groupByKey / join
• and more.
Highly accessible with supported languages & Interactive mode:
• Scala (interactive, fast)
• Python (interactive, slow)
• Java (non-interactive, fast)
• R and SQL shells (interactive, slow)
https://spark.apache.org
CRICOS code 00025B 7

Cloud Computing INFS3208
Characteristics of Spark – Iterative Processing
• Suitable for machine learning algorithms
• Directed Acyclic Graph (DAG)
• Allow programs to load and query repeatedly
https://spark.apache.org
CRICOS code 00025B 8

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• Shuffle
• How Spark works
CRICOS code 00025B 9

Cloud Computing INFS3208
Resilient Distributed Dataset (RDD)
Resilient Distributed Dataset (RDD)
• RDD is a fundamental data structure of Spark.
• RDD is a read-only (i.e. immutable) distributed
collection of objects/elements.
• Distributed : Each dataset in RDD is divided into logical partitions, which are computed by many worker nodes (computers) in the cluster.
• Resilient : RDD can be self recovered in case of failure (support rebuild if a partition is destroyed).
• Datasets: Json file, CSV file, text file etc.
Worker Node 1
Action
Transformation
Create RDD
Spark
CRICOS code 00025B
10
RDD
Partition 4 Partition 3 Partition 2 Partition 1
Partition 7
Partition 6
Partition 5
Input Data
Worker Node 2
Partition 8

Cloud Computing INFS3208

• •

RDD
Resilient Distributed Dataset (RDD)
Resilient Distributed Dataset (RDD)
In Spark, all work is expressed as
1. creating new RDDs or,
2. transforming existing RDDs or,
3. action on RDDs to compute a result.
Data manipulation in Spark is heavily based on RDDs. RDDs can contain any type of Python, Java, or Scala
objects, including user-defined classes.
Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.
Worker Node 1
Action
Transformation
Create RDD
Spark
Partition 4 Partition 3 Partition 2 Partition 1
Partition 7
Partition 6
Partition 5
CRICOS code 00025B
11
Input Data
Worker Node 2
Partition 8

Cloud Computing INFS3208
RDD Creation: textFile() method
Two ways to create RDDs
– load an external dataset;
– parallelize a collection of objects (e.g., a list or set) in their driver program
• External: Spark uses textFile(URI) method load data (in filesystems) into newly created RDD

URI (Uniform Resource Identifier) can be from any storage source  your local file system (a local path on the machine),
 HDFS (hdfs://),
 Cassandra, HBase, Amazon S3, etc.
Can DD be var instead of val?

Once created, distFile can be acted on by dataset operations (transformations and actions). CRICOS code 00025B 12

Cloud Computing INFS3208
RDD Creation: textFile() method

External file must also be accessible at the same path on worker nodes.
– Either copy the file to all workers or
– use a network-mounted shared file system.
Support running on directories, compressed files, and wildcards.
▪ textFile(“/my/directory”)
▪ textFile(“/my/directory/*.txt”) ▪ textFile(“/my/directory/*.gz”).
Second argument for controlling the number of partitions of the file.


CRICOS code 00025B 13

Cloud Computing INFS3208
RDD Creation: parallelize () method
• Another simple way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method:
• This approach is very useful when you are learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them.
array
RDD
sc.parallelize(array, 5)
Array(1,2,3,4,5)
1 2 3 4 5
CRICOS code 00025B
14

Cloud Computing INFS3208
RDD Partition
Partition Policy:
• •
the number of partitions is equal to the CPU cores in the cluster
For different deploy modes of Spark, • Spark.default.parallelism
– Default local: the number of CPU cores or local[N]
– Default Mesos: 8 partitions
– Default Standalone or Apache YARN: Max(2, total_cpus_in_cluster)
Cluster Modes for Spark
•Local •Standalone •YARN •Mesos •Kubernetes
Create Partition:
• Set number of partitions when creating RDD
– textFile(path, partitionNum) vs parallelize(collection, partitionNum)
CRICOS code 00025B 15

Cloud Computing INFS3208
RDD Partition
Set Partition using repartition() method:
Repartition actually create a new RDD with the specified number of partitions
CRICOS code 00025B 16

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• How Spark works
CRICOS code 00025B 17

Cloud Computing INFS3208
RDD Operations – Transformation
There are two types of RDD Operations: Transformation and Action
Transformations
• Transformations are operations on RDDs that return a new RDD.
• Introduce dependencies between RDDs to generate lineage graph
• Many transformations are element-wise (working on one element at a time)
• Transformed RDDs are computed lazily (only when you use them in an action)
RDD1
RDD2
+10 points Add(10)
transformation
CRICOS code 00025B
18

Cloud Computing INFS3208
Common Transformation
Common Transformations (15+) supported by
map(func)
filter(func)
flatMap(func)
union(otherDataset)
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
Meaning
Return a new distributed dataset formed by passing each element of the source through a function func.
Return a new dataset formed by selecting those elements of the source on which funcreturns true.
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Return a new dataset that contains the union of the elements in the source dataset and the argument.
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
When called on datasets of type (K, V) and (K, W), returns a dataset of
(K, (V, W)) pairs with all pairs of elements for each key. Outer joins are
supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
CRICOS code 00025B 19

Cloud Computing INFS3208
Example: a huge logfile (100TB), log.txt, with lines of messages, including normal, warning, and error messages. We want to display the warning and error messages.
• use a filter() transformation from inputRDD to errorsRDD (only transform error lines)
• use a filter() transformation from inputRDD to warningsRDD (only transform warning lines)
• use a union() transformation to display the final results.
Direct Acyclic Graph
CRICOS code 00025B 20

Cloud Computing INFS3208
RDD Operations – Transformation
Element-wise transformation:
• map(func): Return a new RDD formed by passing each element of the source through a function func.
• filter(func): Return a new RDD formed by selecting those elements of the source on which func returns true.
array RDD(rdd1)
sc.parallelize(array)
RDD(rdd2)
RDD(rdd3)
Array(1,2,3,4,5)
1 2 3 4 5
rdd1.map(x=>x+10)
rdd2.filter(x=>x>12)
11 12 13 14 15
13 14 15
(x) => (x+10) is equivalent to func(x) { return x+10 }
CRICOS code 00025B
21

Cloud Computing INFS3208
RDD Operations – Transformation
• Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap().
– flatMap(func): Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
– A simple usage of flatMap() is splitting up an input string into words,
data.txt
RDD (lines) RDD (words)
MapReduce is good Spark is fast
Spark is better than MapReduce
sc.textFile()
“MapReduce is good” ”Spark is fast”
“Spark is better than MapReduce”
“MapReduce” ”Spark” ”Spark” ”MapReduce”
“is” “good” “is” “fast” “is” “better”
“than”
lines.flatMap(line=>line.split(“ “))
CRICOS code 00025B 22

Cloud Computing INFS3208
RDD Operations – Transformation
Pseudo set transformation:
• •


• •
distinct(): produces a new RDD with only distinct items, but is expensive due to shuffle union(): gives back an RDD consisting of the data from both sources
Different from the mathematical union, if there are duplicates in the input RDDs, the result of Spark’s union() will contain duplicates (which we can fix if desired with distinct()).
intersection(): returns only elements in both RDDs. intersection() also removes all duplicates (including duplicates from a single RDD)
intersection() and union() are two similar concepts, the performance of intersection() is much worse
since it requires a shuffle over the network to identify


returns an RDD that has only values present
in the first RDD and not the second RDD (shuffle).
RDD1 {coffee, coffee, panda, monkey, tea}
RDD2 {coffee, monkey, kitty}

common elements.
subtract(other) function takes in another RDD and
RDD1.distinc t() {coffee, panda, monkey, tea}
RDD1.union(RD D2) {coffee, coffee, coffee, panda, monkey, monkey, tea, kitty}
RDD1.inters ection(RDD2) {coffee,
monkey}
RDD1.subtra ct(RDD2) {panda, tea}
CRICOS code 00025B 23

Cloud Computing INFS3208
RDD Operations – Transformation
• cartesian(other) transformation:
– returns all possible pairs of (a, b)
– a is in the source RDD and b is in the other RDD.
– similar to cross join in SQL
– The Cartesian product can be useful but is very expensive for large RDDs.
CRICOS code 00025B 24

Cloud Computing INFS3208
RDD Operations – Action RDD1 Actions
• must return a final value
• the values of action are stored to drivers or to the external storage system.
• trigger job execution that forces the evaluation of all the transformations
• It brings laziness of RDD into motion.
RDD2
+10 points Add(10)
transformation
RDD2
• count() action – returns the count as a number
• take() action – collects a number of elements from the RDD
CRICOS code 00025B
25
65 73
84 78
action
75 83
94 88

Cloud Computing INFS3208
RDD Operations – Action RDD1 To print out some information about the badLinesRDD:
RDD2
• count() action – returns the count as a number
• take() action – collects a number of elements from the RDD
+10 points Add(10)
transformation
RDD2
65 73
84 78
75 83
94 88
CRICOS code 00025B
26
action

Cloud Computing INFS3208
Spark – Action
Common Actions (10+) supported by
Meaning
collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()
Return the number of elements in the dataset.
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
first()
Return the first element of the dataset (similar to take(1)).
take(n)
Return an array with the first n elements of the dataset.
foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
CRICOS code 00025B 27

Cloud Computing INFS3208
RDD Operations – Action
• collect() action:
– returns the entire RDD’s contents to our driver program
– When RDDs are distributed over worker nodes, to display the correct result, collect() is used to gather all data into one node
• reduce(func) action:
– takes a function (func) that operates on two elements of the type in your RDD – returns a new element of the same type.
Example:
CRICOS code 00025B 28

Cloud Computing INFS3208
RDD Operations – Action
• Given an RDD rdd = {1, 2, 3, 4, 5} • reduce(func):
12345
f( , )
3
{1, 2, 3, 4, 5}
f( , ) 6
f( , )
15 10
f( , )
Result
CRICOS code 00025B
29

Cloud Computing INFS3208
RDD Operations – Action
• take(n):
– returns n elements from the RDD and attempts to minimize the number of accessed
partitions
– it may represent a biased collection (may not return the elements in the expected order).
– Useful for unit tests and quick debugging.
• top(): extracts the top elements from an RDD (an ordering defined).
• takeSample(withReplacement, num, seed): allows us to take a sample of our data.
• foreach(func): performs computations on each element in the RDD.
• count(): returns a count of the elements.
• countByValue() returns a map of each unique value to its count.
CRICOS code 00025B 30

Cloud Computing INFS3208
RDD Operations – Action
Examples: given a rdd containing {1,2,3,3} rdd.reduce( (x,y) => x+y) -> 9 rdd.collect() -> {1,2,3,3}
rdd.count() -> 4
rdd.take(2) -> {1,2}
rdd.top(2) -> {3,3}
rdd.countByValue() -> {(1,1),(2,1),(3,2)} rdd.foreach(println) -> 1, 2, 3, 3
CRICOS code 00025B 31

Cloud Computing INFS3208
• •
Working with Key/Value Pairs
RDDs of key/value pairs are a common data type required for many operations in Spark. – (“Spark”, 2), (“is”, 3), (“is”, (1,1,1)), etc.
Key/value RDDs are commonly used to perform aggregations.
– counting up reviews for each product,
– grouping together data with the same key,
– grouping together two different RDDs.
Spark provides special operations on RDDs containing key/value pairs.
– reduceByKey() method can aggregate data separately for each key
– join() method can merge two RDDs together by grouping elements with the same key
._1 & ._2 stand for key and value, respectively.
– E.g. ._1 stands for “Spark” or “is”, while ._2 stands for 2 or 3.


CRICOS code 00025B 32

Cloud Computing INFS3208
Creating Pair RDDs
Use map() method: • map(x => (x, 1))
RDD (words)
– –
x is each element in RDD words
=> (right arrow or fat arrow): separate parameters of a function and function body
 E.g. (x,y) => (x+y) is equivalent to func(x, y) { return x+y }
 x => (x, 1) is thus equivalent to
func(x) {return (x, 1) } a pair of key/value
 In this way, you can conveniently write anonymous functions
“MapReduce” ”Spark” ”Spark” ”MapReduce”
“is” “good” “is” “fast” “is” “better”
“than”
Generate key/value pairs: words.map(x=>(x,1))
RDD (wordspair)
(“MapReduce”,1) (”Spark”,1) (”Spark”,1) (”MapReduce”,1)
(“is”,1) (“is”,1) (“is”,1)
(“good”,1) (“fast”,1)
CRICOS code 00025B 33
(“better”,1) (“than”,1)

Cloud Computing INFS3208
Transformations on Pair RDDs
• reduceByKey(func,[numTasks])transformation:
– is quite similar to reduce(): both take a function and use it to combine values.
– runs several parallel reduce operations, one for each key in the dataset, where each operation combines values that have the same key.
– different from reduce(), reduceByKey() is not implemented as an action that returns a value to the user program.
– returns a new RDD consisting of each key and the reduced value for that key. • Example:
– Given grades of all the courses in ITEE, calculate the averaged GPA for each student;
– Given a text, count each word’s occurrence.
CRICOS code 00025B 34

Cloud Computing INFS3208
RDD Operations – Transformation
• reduceByKey(func): called on a dataset of (K, V) pairs,
• returns a dataset of (K, V) pairs where the values for each key are aggregated using the
given reduce function func,
• func must be of type (V,V) => V, e.g. (a,b) => a+b.
RDD (words) RDD (wordspair)
RDD (reducewords)
“MapReduce” “Spark” “Spark” “MapReduce”
“is” “good” “is” “fast” “is” “better”
“than”
(“MapReduce”,1) (”Spark”,1) (”Spark”,1) (”MapReduce”,1)
(“is”,1) (“is”,1) (“is”,1)
(“good”,1) (“fast”,1)
(“MapReduce”,2) (“good”,1)
(“is”,3)
(“fast”,1) (”Spark”,2)
(“better”,1)
(“better”,1) (“than”,1)
Generate key/value pairs: words.map(x=>(x,1))
wordspair.reduceByKey((a,b)=>a+b)
(“than”,1)
CRICOS code 00025B 35

Cloud Computing INFS3208
RDD Operations – Transformation
• reduceByKey(func):
– wordspair.reduceByKey((a,b)=>a+b) on {(”is”, 1), (”is”, 1), (”is”, 1), (”is”, 1), (”is”, 1)}
f(a, b) { return a + b }
12345
f( , ) f( , ) f( , ) f( , )
– (1,1,1,1,1):1+1->2->2+1->3->3+1->4->4+1->5
Result
CRICOS code 00025B
36

Cloud Computing INFS3208
Transformations on Pair RDDs
• reduceByKey(_+_): Elements 1, 2, 3, 4, and 5 should have the same key so that the value can be aggregated with the func:
– {(“s123456”, 78), (“s123456”, 80), (“s123456”, 65), (“s123456”, 90),
– (“s654321”, 80), (“s654321”, 40) , (“s654321”, 50) , (“s654321”, 90) , (“s654321”, 80) }
– For the key “s123456”, 78, 80, 65, 90 will be aggregated with func
– For the key “s654321”, 80, 40, 50, 90, 80 will be aggregated with func
f(a, b) { return a + b }
12345
f( , ) f( , ) f( , ) f( , )
CRICOS code 00025B
Result
37

Cloud Computing INFS3208
Transformations on Pair RDDs
reduceByKey(func) vs reduce(func):
• reduce(func) and reduceByKey(func) are aggregating operations
• reduceByKey(func) is a transformation on paired RDD while reduce(func) is an action on regular RDD.
• reduce(func) can perform on non-pair RDDs
• reduceByKey(func) needs to match key first
• The aggregation function (func) works very similar:
12345
f( , ) f( , ) f( , ) f( , ) CRICOS code 00025B
Result
38

Cloud Computing INFS3208
RDD Operations – Transformation
• groupByKey(): called on a dataset of (K, V) pairs,
• returns a dataset of (K, Iterable) pairs
• E.g. (“is”,(1,1,1)) pair, (1,1,1) is Iterable.
RDD (words) RDD (wordspair)
RDD (groupwords)
“MapReduce” ”Spark” ”Spark” ”MapReduce”
“is” “good” “is” “fast” “is” “better”
“than”
(“MapReduce”,1) (”Spark”,1) (”Spark”,1) (”MapReduce”,1)
(“is”,1) (“is”,1) (“is”,1)
(“good”,1) (“fast”,1)
(“MapReduce”,(1,1)) (“fast”,1) (“good”,1) (”Spark”,(1,1))
Generate key/value pairs: words.map(x=>(x,1))
wordspair.groupByKey()
(“better”,1) (“than”,1)
CRICOS code 00025B
39
(“is”,(1,1,1))
(“better”,1)
(“than”,1)

Cloud Computing INFS3208
Transformations on Pair RDDs
Is groupByKey() an action or a transformation?
reduceByKey(func) returns a pair RDD of (K, V) pairs where the values for each key are aggregated using the given reduce function func.
 E.g. {(“MapReduce”,2), (“Spark”,2), (“is”,3), (“better”,1), (“than”,1), (“fast”,1), (“good”,1)} groupByKey() returns a pair RDD of (K, Iterable) (still key/value pairs)
 E.g. {(“MapReduce”, (1,1)), (“Spark”, (1,1)), (“is”, (1,1,1)), (“better”,1), (“than”,1), (“fast”,1), (“good”,1)}.
Differences between reduceByKey(func) and groupByKey()
• groupByKey() has no function, while reduceByKey(func) has a reduce function
• Returns are different:


• reduceByKey(func) aims to perform grouping + aggregation. reduceBykey() is equvelent to groupByKey().reduce(func).
CRICOS code 00025B 40

Cloud Computing INFS3208
Transformations on Pair RDDs
.keys and .values transformations
• Sometimes, you may want to return keys in a Key/Value RDD
• .keys will return a new RDD that contains all keys in the previous RDD.
• Example: rdd.keys
• Sometimes, you may want to return values in a Key/Value RDD
• .values will return a new RDD that contains all values in the
previous RDD.
• Example: rdd.values
CRICOS code 00025B 41

Cloud Computing INFS3208
Transformations on Pair RDDs
sortByKey(bool):
How to sort by value?
• called on a dataset of (K, V) pairs where K implements Ordered,
• returns a dataset of (K, V) pairs sorted by keys in ascending (default) or descending order
• specify false for descending order. • Example:
– {(“MapReduce”,2), (“Spark”,2), (“is”,3), (“better”,1), (“than”,1), (“fast”,1), (“good”,1)} – sortByKey():
{ (“MapReduce”,2), (“Spark”,2), (“better”,1), (“fast”,1), (“good”,1), (“is”,3), (“than”,1) } – sortByKey(false):
{ (“than”,1), (“is”,3), (“good”,1), (“fast”,1), (“better”,1), (“Spark”,2), (“MapReduce”, 2) }
https://www.cs.cmu.edu/~pattis/15-1XX/common/handouts/ascii.html
CRICOS code 00025B 42

Cloud Computing INFS3208
Transformations on Pair RDDs
mapValues(func):
• apply func to each values without changing keys • Example:
{(“MapReduce”,2), (“Spark”,2), (“is”,3), (“better”,1), (“than”,1), (“fast”,1), (“good”,1)}
pairRDD.mapValues(x => x+1).foreach(println) (“MapReduce”,3)
(“Spark”, 3) (“is”,4) (“better”, 2) (“than”, 2) (“fast”,2) (“good”,2)}
CRICOS code 00025B 43

Cloud Computing INFS3208
Transformations on Pair RDDs
join()
• called on datasets of type (K, V) and (K, W),
• returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. • Example:
• Given two datasets
– student(sNumber: String, prgName: String), infs3208(sNumber:String, fMark: Double)
– Join code: student.join(infs3208).
CRICOS code 00025B 44

Cloud Computing INFS3208
Pair RDDs Transformation Examples
Given one pair RDD: { (1,2), (3,4), (3,6) }
Function Name
Purpose
Example
Result
reduceByKey(func)
Combine values with the same key.
rdd.reduceByKey( (x,y)=>x+y)
{(1,2), (3,10)}
groupByKey()
Group values with the same key.
rdd.groupByKey()
{(1,[2]),(3,[4,6]}
keys()
Return an RDD of just the keys.
rdd.keys()
{1, 3, 3}
values()
Return an RDD of just the values.
rdd.values()
{2, 4, 6}
mapValues(func)
Apply a function to each value of a pair RDD without changing the key.
rdd.mapValues(x =>x +1)
{(1,3), (3,5), (3,7)}
sortByKey()
Return an RDD sorted by the key.
rdd.sortByKey()
{(1,2), (3,4), (3,6)}
CRICOS code 00025B 45

Cloud Computing INFS3208
Pair RDDs Action Examples
Given one pair RDD: { (1,2), (3,4), (3,6) }
Function Name
Purpose
Example
Result
countByKey()
Count the number of elements for each key.
rdd.countByKey()
{(1, 1), (3, 2)}
collectAsMap()
Collect the result as a map to provide easy lookup.
rdd.collectAsMap()
Map{(1, 2), (3, 4), (3, 6)}
lookup(key)
Return all
values associated with the provided key.
rdd.lookup(3)
[4, 6]
CRICOS code 00025B 46

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• Shuffle
• How Spark works
CRICOS code 00025B 47

Cloud Computing INFS3208
Lazy Evaluation
Transformations on RDDs are evaluated or computed in a lazy manner: – Spark will not begin to execute until it sees an action.
Lazy evaluation means that when a transformation on an RDD is called, the operation is not immediately performed.
Spark internally records metadata to indicate that this operation has been requested.
Spark can decide what the best way is to perform a series of transformations that are recorded.
Spark uses lazy evaluation to reduce the number of passes (storage on disk)
– Hadoop MapReduce, developers often have to spend a lot of time considering how to group together operations to minimize the number of MapReduce passes.
CRICOS code 00025B 48
https://www.wedowe.org/blog/theres-no-such-thing-as-laziness

Cloud Computing INFS3208
RDD Lineage Graph
• Because of lazy nature of RDD, dependencies between RDDs are logged in a lineage graph (or RDD operator graph or RDD dependency graph).
• Lineage graph can be regarded as a logical execution plan of RDD transformations.
r00
r01
r12 r13
• To get lineage graph:
– METHOD: toDebugString:String
r10 r11
• When you run into an action, this local plan is submitted to an opitimser, which is going to do optimization and implement it into a physical plan containing stages.
• Spark logs all transformations with a graph structure which can be optimized by graph optimization technology.
• Lineage graph can be used to re-build the RDDs
r11
CRICOS code 00025B
49

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• How Spark works
CRICOS code 00025B 50

Cloud Computing INFS3208
Terms in :
– A piece of code which reads some input from HDFS or local, performs some computation on the data and writes some output data.
Stages:
– Jobs are divided into stages.
– E.g. Map or Reduce stages (similar with Hadoop).
– Stages are divided based on computational boundaries.
Tasks:
– Each stage has some tasks, one task per partition. One task is executed on one partition of data on one executor.
Input
Transformation 1
Stage 1
Task Stage 2
Transformation 2
Transformation 3
Job
Transformation 4
Transformation 5
Stage 3
Transformation 6
CRICOS code 00025B
Output 51

Cloud Computing INFS3208
Terms in Driver (program driver)
• A separate process to execute user applications
• creates SparkContext to schedule jobs execution and
negotiate with cluster manager
Executors
• run tasks scheduled by driver
• store computation results in memory, on disk or off-heap
memory
• interact with storage systems
Cluster Manager
• Mesos (Apache)
• YARN (Hadoop)

Master
Driver Program
SparkContext
Node1
Cluster Manager
Node2
Noden
Executor
cache
Task Task Task Task
CRICOS code 00025B
52
Executor
cache
Task Task Task Task
Slave

Executor
cache
Task Task Task Task

Cloud Computing INFS3208
Terms in Context
• •

• •
– – – – –
The first step to create Apache Spark SparkContext, which is the main entry point to spark functionality
Configurable parameters of SparkContext for applications.
Spark use some of them to allocate resources on a cluster by executors (e.g. memory size, and
cores).
Once SparkContext is created, it can be used to create RDDs (e.g. textfile method), broadcast variable, accumulator, and run jobs until SparkContext is stopped.
Functionalities:
Get the current status of application and set the configuration
Cancel a job/stage and closure cleaning in various services and programmable dynamic allocation (request/kill executors) Access persistent RDDs and unpersist RDDs
etc.
CRICOS code 00025B 53

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• How Spark works
CRICOS code 00025B 54

Cloud Computing INFS3208
Narrow and Wide Dependencies
There are two types of transformations:
• Narrow transformation (dependencies)
– each partition of the parent RDD is used by at most one partition of the child RDD
– Does not require shuffle
– failure recovery is more efficient as only lost
parent partitions need to be recomputed
– Example: map, flatmap, filter, sample, union, etc.
Parent Child RDD RDD
Parent RDD
Child RDD
map, filter
union
CRICOS code 00025B
55

Cloud Computing INFS3208
Narrow and Wide Dependencies
There are two types of transformations:
• Wide transformation (dependencies)
– multiple child partitions may depend on one parent partition
– require data from all parent partitions to be available and to be shuffled across the nodes
– a complete re-computation is needed, if some partition is lost from all the ancestors
– Example: groupbyKey() and reducebyKey(), Join(), distinct(), intersect()
Parent Child RDD RDD
groupByKey
Parent Child RDD RDD
P1 P2 P3
A A’ B B’
reduceByKey
CRICOS code 00025B
56

Cloud Computing INFS3208
Recap: Directed Acyclic Graph (DAG)
• Directed Acyclic Graph (DAG) is a set of vertices and edges
– vertices represent the RDDs;
– edges represent the operation to be applied on
RDD.
• On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the job.
• The DAGScheduler splits the DD into stages based on applied transformation.
CRICOS code 00025B 57

Cloud Computing INFS3208
How Stage Generated
• DAG scheduler aims to optimize the performance by dividing transformation graph into different stages.
• DAG scheduler pipelines transformation (narrow transformation) operations together to optimize the graph.
• E.g. Many map operators can be scheduled in a single stage.
1:1 …
map filter union
One stage
CRICOS code 00025B 58
One stage

Cloud Computing INFS3208
How Stage Generated
• When the transformations (wide transformations) that can trigger shuffle, DAG Scheduler will divide the transformations into different stages.
• In the following example, reduceByKey triggers shuffle and DAG Scheduler divide the pipeline into two stages.
map filter union
map filter
Stage 1
Stage 2
reduceByKey
CRICOS code 00025B
59

Cloud Computing INFS3208
How Stage Generated
The DAG scheduler divides operator graph into stages according to the types of dependencies:
• Narrow dependency – all the transformations can be pipelined into one stage;
• Wide dependency – divide the transformations into different stages.
The final result of a DAG scheduler is a set of stages.
The stages are passed on to the Task Scheduler.
The task scheduler launches tasks via cluster manager. ( or Hadoop YARN or Apache Mesos).
The task scheduler doesn’t know about dependencies among stages.
CRICOS code 00025B 60

Cloud Computing INFS3208
How Stage Generated
Let’s look at another complicated example:
• RDDs:A–G
• Partitions: 1 – 19
• Narrow transformations:
– map, union
• Wide transformation:
– groupByKey, join
A:
Stage 1
C: 7 8
1 B:4 25
G:
3
groupByKey 6
17 18 19
join
D: 9 10
map
E: 11
F: 13 14 15 16
12
union
Stage 2
Stage 3
CRICOS code 00025B
61

Cloud Computing INFS3208
Outline
• Apache Spark and its Characteristics
• Resilient Distributed Dataset (RDD)
• RDD Operations – Transform & Action
• Lazy evaluation and RDD Lineage graph
• Terms in Spark
• Narrow and Wide Dependencies
• How Spark works
CRICOS code 00025B 62

Cloud Computing INFS3208
How Spark Works
Based on aforementioned RDD contents, let us have a look at the summary of the RDD running process in the Spark architecture
1. Create an RDD object;
2. SparkContext is responsible for calculating the
dependencies between RDDs and building DAGs;
3. DAG Scheduler is responsible for decomposing the DAG graph into multiple stages, each stage containing multiple tasks,
4. Task scheduler launches tasks to distribute across the worker nodes via cluster manager (Standalone or Mesos or YARN). The task scheduler does not know about dependencies among stages.
Figure taken from web
CRICOS code 00025B 63

Cloud Computing INFS3208
Reading Materials
1.https://data-flair.training/blogs/rdd-lineage/ 2.http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/ 3.http://web.utk.edu/~wfeng1/spark/introduction.html 4.https://www.tutorialspoint.com/apache_spark/index.htm 5.https://techvidvan.com/tutorials/spark-tutorial/
CRICOS code 00025B
P. 64

Cloud Computing INFS3208
Tutorial and Practical Sessions
Tutorial Questions:
1. What is Apache Spark and how it compares with Apache Hadoop?
2. What is Resilient Distributed Data (RDD)?
3. Please discuss how Spark works?
4. Advanced Topics in RICOS code 00025B
P. 65

Cloud Computing INFS3208
Tutorial and Practical Sessions
Practical Activities:
A1. Build a simulated Spark cluster and a multi-language environment for Spark programming with docker technology.
A2. Practise basic Scala programming.
A3. Practise RDD programming (Scala version) in Jupyter Notebook.
A4. Count the 100 most frequent words
A5. [optional] Play five RDD examples (in lecture slides) – I. Word count – II. Average marks calculation – III. Get top-5 values – IV. File sorting – V. Movie Rating
A6. [optional] Practise advanced Scala programming.
CRICOS code 00025B
P. 66

Next (Week 11) Topic:

CRICOS code 00025B