Spark
Apache Spark
DSCI 551
Wensheng Wu
1
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
2
History
3
Apache took over Hadoop
Characteristics of Hadoop
• Acyclic data flow model
– Data loaded from stable storage (e.g., HDFS)
– Processed through a sequence of steps
– Results written to disk
• Batch processing
– No interactions permitted during processing
4
Problems
• Ill-suited for iterative algorithms that requires
repeated reuse of data
– E.g., machine learning and data mining algorithms
such as k-means, PageRank, logistic regression
• Ill-suited for interactive exploration of data
– E.g., OLAP on big data
5
Spark
• Support working sets (of data) through RDD
– Enabling reuse & fault-tolerance
• 10x faster than Hadoop in iterative jobs
• Interactively explore 39GB (Wikipedia dump)
with sub-second response time
– Data were distributed over 15 EC2 instances
6
Spark
• Provides libraries to support
– embedded use of SQL
– stream data processing
– machine learning algorithms
– processing of graph data
7
Spark
• Support diverse data sources including HDFS,
Cassandra, HBase, and Amazon S3
8
RDD: Resilient Distributed Dataset
• RDD
– Read-only, partitioned collection of records
– Operations performed on partitions in parallel
– Maintain lineage for efficient fault-tolerance
• Methods of creating an RDD
– from an existing collection (e.g., Python list/tuple)
– from an external file
9
RDD: Resilient Distributed Dataset
• Distributed
– Data are divided into a number of partitions
– & distributed across nodes of a cluster to be
processed in parallel
• Resilient
– Spark keeps track of transformations to dataset
– Enable efficient recovery on failure (no need to
replicate large amount of data across network)
10
Architecture
• SparkContext (SC) object coordinates the
execution of application in multiple nodes
– Similar to Job Tracker in Hadoop MapReduce
11
SC: sending tasks
Acquiring resources
Executor: sending responses
Components
• Cluster manager
– Allocate resources across applications
– Can run Spark’s own cluster manager or
– Apache YARN (Yet Another Resource Negotiator)
• Executors
– Run tasks & store data
12
Spark installation
• http://spark.apache.org/downloads.html
– Choose “pre-built for Hadoop 3.2 and later”
• Direct link (choose version 3.0.2):
– https://downloads.apache.org/spark/spark-
3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
13
http://spark.apache.org/downloads.html
Spark installation
• tar xvf spark-3.0.2-bin-hadoop3.2.tgz
– This will create “spark-3.0.2-bin-hadoop3.2” folder
– Containing all Spark stuffs (scripts, programs,
libraries, examples, data)
14
Prerequisites
• Make sure Java is installed & JAVA_HOME is
set
15
Accessing Spark from Python
• Interactive shell:
– bin/pyspark
– A SparkContext object sc will be automatically
created
• bin/pyspark –master local[4]
– This starts Spark on local host with 4 threads
– “–master” specifies the location of Spark master
node
16
Accessing Spark from Python
• Standalone program
– Executed using spark-submit script
– E.g., bin/spark-submit wc.py
• You may find many Python Spark examples
under
– examples/src/main/python
17
wc.py
from pyspark import SparkContext
from operator import add
sc = SparkContext(appName=”dsci351″)
lines = sc.textFile(‘hello.txt’)
counts = lines.flatMap(lambda x: x.split(‘ ‘)) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for v in output:
print(v[0], v[1])
18
Make sure you have this file
under the same directory
where wc.py is located
hello.txt
hello world
hello this world
19
Suppress verbose log messages
• cd conf
• cp log4j.properties.template log4j.properties
• edit log4j.properties
– change first line to:
• log4j.rootCategory=ERROR, console
– Or to:
• log4j.rootCategory=WARN, console
20
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
21
Creating an initial RDD
• From an external file
– textFile(
– lines = sc.textFile(“hello.txt”, 2)
• From an existing Python collection (e.g., list,
tuple, and dictionary)
– data = sc.parallelize([1, 2, 3, 4, 5], 2)
– create two partitions from given list
22
Creating RDD from an external file
• lines = sc.textFile(“hello.txt”) # lines is an RDD
– Return a collection of lines
– Spark does not check if file exists right away
– Nor does it read from the file now
23
Action
• Perform a computation on an RDD
– Return a final value (not an RDD) to client
• Usually the last operation on an RDD
• E.g., reduce(func)
– aggregates all elements in the RDD using func
– returns aggregated value to client
24
Actions
• getNumPartitions()
• foreachPartition(func)
• collect()
• take(n)
• count(), sum(), max(), min(), mean()
• reduce(func)
• aggregate(zeroVal, seqOp, combOp)
• takeSample(withReplacement, num, [seed])
• countByKey()
25
getNumPartitions()
• How many partitions does an RDD have?
• E.g., lines.getNumPartitions()
=> 1
• E.g., data.getNumPartitions()
=> 2
26
foreachPartition(func)
• What are in each partition?
• def printf(iterator):
par = list(iterator)
print ‘partition:’, par
• sc.parallelize([1, 2, 3, 4, 5], 2).foreachPartition(printf)
=> partition: [3, 4, 5]
partition: [1, 2]
27
Iterator for the list of elements
in the partition
collect()
• Show the entire content of an RDD
• sc.parallelize([1, 2, 3, 4, 5], 2).collect()
• collect()
– Fetch the entire RDD as a Python list
– RDD may be partitioned among multiple nodes
– collect() brings all partitions to the client’s node
• Problem:
– may run out of memory when the data set is large
28
take(n)
• take(n): collect first n elements from an RDD
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l, 2)
• rdd.take(3)
=>
[1,2,3]
29
count()
• Return the number of elements in the dataset
– It first counts in each partition
– Then sum them up in the client
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l, 2)
• rdd.count()
=> 5
30
sum()
• Add up the elements in the dataset
• l = [1,2,3,4,5]
• rdd = sc.parallelize(l)
• rdd.sum()
=> 15
31
reduce(func)
• Use func to aggregate the elements in RDD
• func(a,b):
– Takes two input arguments, e.g., a and b
– Outputs a value, e.g., a + b
• func should be commutative and associative
– Applied to each partition (like a combiner)
32
reduce(func)
• func is continually applied to elements in RDD
– [1, 2, 3]
– First, compute func(1, 2) => x
– Then, compute func(x, 3)
• If RDD has only one element x, it outputs x
• Similar to reduce() in Python
33
Recall Python example
• def add(a, b): return a + b
• reduce(add, [1, 2, 3])
6
Or simply reduce(lambda a, b: a + b, [1, 2, 3])
34
Spark example
• def add(a, b): return a + b
• data = sc.parallelize([1, 2, 3], 2)
• data.reduce(add)
6
Or simply: data.reduce(lambda a, b: a + b)
35
Implementation of reduce(func)
• Suppose [1, 2, 3, 4, 5] => two partitions:
– [1, 2] and [3, 4, 5]
• rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
• Consider reduce(add)
36
Local reduction
• Apply add to reduce each partition locally
– Using mapPartition(func) (see transformations)
• Func: apply ‘add’ function to reduce a
partition
– E.g., using Python reduce function
– reduce(add, [1, 2]) => 3
– reduce(add, [3, 4, 5]) => 12
37
Global reduction
• Collect all local results
– using collect()
=> res = [3, 12]
• Use Python reduce to obtain final result
– reduce(add, res) => reduce(add, [3, 12]) =15
38
Example: finding largest integers
• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]
• pdata = sc.parallelize(data)
• pdata.reduce(lambda x, y: max(x, y))
5
• Or simply: pdata.reduce(max)
39
aggregate(zeroValue, seqOp, combOp)
• For each partition p (values in the partition),
– “reduce”(seqOp, p, zeroValue)
– Note if p is empty, it will return zeroValue
• For a list of values, vals, from all partitions,
execute:
– reduce(combOp, vals, zeroValue)
40
But note reduce here is different from that in Python:
zeroValue can have different type than values in p
seqOp and combOp
• seqOp(U, v):
– how to aggregate values v’s in the partition into U
– U: accumulator, initially U = zeroValue
– Note: U and v may be of different data type
• combOp(U, p):
– how to combine results from multiple partitions
– U: accumulator, initially U = zeroValue
– p: result from a partition
41
Python reduce() w/o initial value
• reduce(func, list)
• If list is empty => ERROR
• Else if list contains a single element v, return v
• Otherwise, set accumulator x = list[0]
– for each of remaining element list[i]
• x = func(x, list[i])
– Return final value of x
42
Python reduce() with initial value
• reduce(func, list, initialValue)
• Same as:
– reduce(func, [initialValue] + list)
• Note: list can be empty now
– reduce() will return initialValue when list is empty
43
reduce(f) vs aggregate(z, f1, f2)
• func in reduce(func) needs to be commutative
and associative
– While f1 and f2 in aggregate(z, f1, f2) do not need
to be
– f1: similar to the combiner function in Hadoop
• Need to specify initial value for aggregate()
– & it can be of different type than values in RDD
44
Example
• data = sc.parallelize([1], 2)
• data.foreachPartition(printf)
– P1: []
– P2: [1]
• data.aggregate(1, add, add)
– P1 => [1] => after reduction => 1
– P2 => [1] + [1] = [1, 1] => 2
– final: [1] + [1, 2] => [1, 1, 2] => 4
45
Example
• data.aggregate(2, add, lambda U, v: U * v)
– P1 => 2
– P2 => 3
– Final: [2] + [2, 3] => 2 * 2 * 3 = 12
(where [2] is zeroValue, [2,3] is the list of values
from partitions)
46
Implementing count() using
aggregate()
• data = sc.parallelize([1, 2, 3, 4, 5])
• …
47
Implementing mean() using
aggregate()
• data = sc.parallelize([1, 2, 3, 4, 5])
• …
48
takeSample(withReplacement, num,
[seed])
• Take a random sample of elements in rdd
• withReplacement: True if with replacement
• num: sample size
• optional seed: for random number generator
• Useful in many applications, e.g., k-means
clustering
49
Example
• data = sc.parallelize(xrange(10))
• data.takeSample(False, 2, 1)
– [8, 0]
50
countByKey()
• Only available on RDDs of type (K, V)
– i.e., RDD that contains a list of key-value pairs,
e.g., (‘hello’, 3)
• Return a hashmap (dictionary in Python) of (K,
Int) pairs with count for each unique key in
RDD
– Count for key k = # of tuples whose key is k
51
Example
• d = [(‘hello’, 1), (‘world’, 1), (‘hello’, 2), (‘this’,
1), (‘world’,0)]
• data = sc.parallelize(d)
• data.countByKey()
=> {‘this’: 1, ‘world’: 2, ‘hello’: 2}
52
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
53
Transformation
• Create a new RDD from an existing one
• E.g., map(func)
– Applies func to each element of an RDD
– Returns a new RDD representing mapped result
54
Lazy transformations
• Spark does not apply them to RDD right away
– Just remember what needs to be done
– Perform transformations until an action is applied
• Advantage
– Results of transformations pipelined to the action
– No need to return intermediate results to clients
=> more efficient
55
Avoid re-computation
• However, this means that the same RDD may
be recomputed multiple times if it is used in
multiple actions
=> All transformations need to be redone
=> Consequence: costly
• Solution: allow caching of RDDs in memory
– May also persist them on disk
56
Transformations
• map(func)
• filter(func)
• flatMap(func)
• reduceByKey(func)
• groupByKey()
• sortByKey(True/False)
• distinct()
• mapPartitions(func)
57
func, numPartitions=None, partitionFunc=
numPartitions=None, partitionFunc=
ascending=True, numPartitions=None, keyfunc
=
numPartitions=None
Transformations
• join(rdd, [numTasks])
– leftOuterJoin
– rightOuterJoin
– fullOuterJoin
• aggregateByKey(zeroValue, seqOp, combOp,
[numTasks])
• mapValues(func)
• flatMapValues(func)
• union/intersection/subtract
• subtractByKey
58
Transformations
• groupBy(f)
– f is a function that produces the group key
59
f, numPartitions=None, partitionFunc=
map(func)
• map(func): Apply a function func to each
element in input RDD
– func returns a value (could be a list)
• Output the new RDD containing the
transformed values produced by func
60
Example
• lines = sc.textFile(“hello.txt”)
• lineSplit = lines.map(lambda s: s.split())
=> [[‘hello’, ‘world’], [‘hello’, ‘this’, ‘world’]]
• lineLengths = lines.map(lambda s: len(s))
=> [11, 16]
61
filter(func)
• filter(func): return a new RDD with elements
of existing RDD for which func returns true
• func should be a boolean function
• lines1 = lines.filter(lambda line: “this” in line)
[‘hello this world’]
• What about: lines.filter(lambda s: len(s) > 11)?
62
Notes
• data = sc.parallelize([1, 2, 3, 4, 5, 1, 3, 5], 2)
• data.map(lambda x: x if x % 2 == 0 else None).collect()
• def f(x):
if x % 2 == 0:
return x
else:
pass
• data.map(f).collect()
63
Same as “return None”
Produce the same result as above
[None, 2, None, 4, None, None, None, None]
Result
Python filter
• l = [1, 2, 3, 4, 5, 1, 3, 5]
• filter(lambda x: x % 2 == 0, l)
– [2, 4]
64
Spark implementation of filter
• def even(x): return x % 2 == 0
• data.filter(even)
Implemented as follows:
• def processPartition(iterator):
return filter(even, iterator)
• data.mapPartitions(processPartition)
65
mapPartitions(func)
• Apply transformation to a partition
– input to func is an iterator (over the elements in
the partition)
– func must return an iterable (a list or use yield to
return a generator)
• Different from map(func)
– func in map(func) applies to an element
66
Implementing aggregate()
• rdd.aggregate((0,0), combFunc, reduFunc)
• def combFunc(U, x): return (U[0] + x, U[1] + 1)
• def reduFunc(U, V): return (U[0] + V[0], U[1] +
V[1])
• def sumf(iterator):
return [reduce(combFunc, iterator, (0, 0))]
• rdd.mapPartitions(sumf).reduce(reduFunc)
67
Exercise
• Implement count() using mapPartitions() and
reduce() only
– rdd = sc.parallelize([1, 1, 2, 3, 3, 3], 2)
– rdd.count() => 6
68
flatMap(func)
• flatMap(func):
– similar to map
– But func here must return a list (or generator) of
elements
– & flatMap merges these lists into a single list
• lines.flatMap(lambda x: x.split())
=>rdd: [‘hello’, ‘world’, ‘hello’, ‘this’, ‘world’]
69
reduceByKey()
• reduceByKey(func)
– Input: a collection of (k, v) pairs
– Output: a collection of (k, v’) pairs
• v’: aggregated value of v’s in all (k, v) pairs
with the same key k by applying func
• func is the aggregation function
– Similar to func in the reduce(func, list) in Python
70
reduceByKey(func)
• It first performs partition-site reduction & then
global reduction
– By executing the same reduce function
• In other words, func needs to be commutative
and associative
• More details:
– http://spark.apache.org/docs/latest/api/python/pysp
ark.html
71
http://spark.apache.org/docs/latest/api/python/pyspark.html
Example
• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)
• def printf(part):
print list(part)
• rddp.foreachPartition(printf)
– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)]
– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)]
72
Example
• from operator import add
• rddp.reduceByKey(add)
• It will first execute local reduce:
– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)] => (1, 9),
(2,2)
– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)] => (3, 5), (1,
5), (2, 10)
73
Example
• Final reduce at reducer side
– (1, 9), (1, 5) => (1, 14)
– (2, 2), (2, 10) => (2, 12)
– (3, 5) => (3, 5)
• Note that if there are two reducers, then:
– Some keys, e.g., 1, may be reduced by one reducer
– Others, e.g., 2 and 3, by the other
74
reduceByKey() vs. reduce()
• reduceByKey() returns an RDD
– Reduce values per key
• reduce() returns a non-RDD value
– Reduce all values!
75
Exercise
• Implement countByKey using reduceByKey
– rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4), (3,5),
(2, 4), (1, 5), (2, 6)], 2)
– rddp.countByKey() => {1: 4, 2: 3, 3: 1}
76
aggregateByKey
• aggregateByKey(zeroValue, combOp, reduOp)
– Input RDD: a list of (k, v) pairs
– Aggregate values for each key
• Return a value U for each key
– Note that U may be a tuple
– zeroValue: initial value for U
– combOp(U, v): (function for) local reduction
– reduOp(U1, U2): global reduction
77
Computing group averages
• rdd1 = rddp.aggregateByKey((0,0), lambda
U,v: (U[0] + v, U[1] + 1), lambda U1,U2: (U1[0]
+ U2[0], U1[1] + U2[1]))
– [(2, (12, 3)), (1, (14, 4)), (3, (5, 1))]
• rdd1.map(lambda (x, (y, z)): (x, float(y)/z))
– [(2, 4.0), (1, 3.5), (3, 5.0)]
78
Example: aggregateByKey
• data = sc.parallelize([(1, 1), (1,2), (1,3)], 2)
• data.foreachPartition(printf)
– [(1, 1)]
– [(1, 2), (1, 3)]
• data.aggregateByKey(1, add, add).collect()
– [(1, 8)]
79
Compared with aggregate()
• data = sc.parallelize([1, 2, 3], 2)
• data.foreachPartition(printf)
– [1]
– [2, 3]
• data.aggregate(1, add, add)
– 9
80
aggregateByKey vs. aggregate
• zeroValue in aggregateByKey
– Used only combOp (i.e., reduction within a
partition)
• zeroValue in aggregate
– Used in both combOp and reduOp
– E.g., data.aggregate(1, add, add) => 9
81
aggregateByKey vs. reduceByKey
• aggregateByKey more general than reduceKey
– Can specify different functions for combiner and
reducer
– can specify initial value for U, the accumulator
– aggregated value may have different type than
that of value v of input RDD
• E.g., in previous example:
– v is an integer, while U is a tuple (sum, count)
82
Exercise
• Implement reduceByKey(add) using
aggregateByKey()
• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)
– rddp.reduceByKey(add) => [(2, 12), (1, 14), (3, 5)]
83
groupByKey()
• groupByKey()
– Similar to reduceByKey(func)
– But without func & returning (k, Iterable(v))
instead
• rddp.groupByKey()
[(2,
84
Example
• rddp.groupByKey().mapValues(list).collect()
– mapValues converts iterable value into a list
=> [(2, [2, 4, 6]), (1, [2, 3, 4, 5]), (3, [5])]
85
groupBy()
• rddp.groupBy(lambda t:
t[0]).mapValues(list).collect()
=>
[(2, [(2, 2), (2, 4), (2, 6)]), (1, [(1, 2), (1, 3), (1, 4),
(1, 5)]), (3, [(3, 5)])]
86
sortByKey(True/False)
• sortByKey([asc])
– Sort input RDD with (k, v) pairs by key
– Ascending if asc (a boolean value) is True
• rddp.sortByKey(False).collect()
=> [(3, 5), (2, 2), (2, 4), (2, 6), (1, 2), (1, 3), (1, 4),
(1, 5)]
87
distinct()
• Return an RDD with distinct elements of source RDD
• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]
• pdata = sc.parallelize(data, 2)
• pdata.distinct().collect()
=> [2, 4, 1, 3, 5]
88
Exercise
• Implement distinct() using
reduceByKey()/groupByKey()
• rdd = sc.parallelize([3, 1, 2, 3, 1, 3, 3, 2])
• rdd.distinct()
=> [1, 2, 3]
89
join(rdd)
• rdd1.join(rdd2)
– Joining tuples of two RDDs on the key
– rdd1: an RDD containing a list of (k, v)’s
– rdd2: another RDD containing a list of (k, w)’s
• Output an RDD containing (k, (v, w))’s
– That is, (k, v) joins with (k, w) => (k, (v, w))
90
Example
• ds1 = sc.parallelize([(1,2), (2,3)])
• ds2 = sc.parallelize([(2,4), (3,5)])
• ds1.join(ds2)
– [(2, (3, 4))]
91
Outer joins
• Also retain dangling tuples
• ds1.leftOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4))]
• ds1.rightOuterJoin(ds2)
– [(2, (3, 4)), (3, (None, 5))]
• ds1.fullOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4)), (3, (None, 5))]
92
mapValues
• mapValues(func)
– For each key, apply func to each value of the key
• x = sc.parallelize([(“a”, [“apple”, “banana”,
“lemon”]), (“b”, [“grapes”])])
• x.mapValues(lambda l: len(l)).collect()
– [(‘a’, 3), (‘b’, 1)]
93
flatMapValues(func)
• mapValues part
– For each key k, apply func to its value, return a list
[i1, i2, …]
• flatMap part
– flatten the lists into a single list but retain the key
=> [(k, i1), (k, i2), …, (k’, i1′), (k’, i2′), …]
94
Example
• rdd = sc.parallelize([(1, “hello world”), (2,
“hello this world”)])
– For example, 1 and 2 may be document id’s
• rdd2 = rdd.flatMapValues(lambda s: s.split())
– [(1, ‘hello’), (1, ‘world’), (2, ‘hello’), (2, ‘this’), (2,
‘world’)]
95
Exercise
• Use mapValues() and flatMap() implement
flatMapValues() in the previous slide
96
union(rdd)
• rdd1.union(rdd2)
– Returns all elements in rdd1 and rdd2
– Does not remove duplicates (so bag union)
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3], 2)
• rdd2 = sc.parallelize([1, 2, 2, 5], 2)
• rdd1.union(rdd2)
– [1, 1, 2, 3, 3, 3, 1, 2, 2, 5]
97
2 partitions
2 partitions
4 partitions
intersection(rdd)
• rdd1.intersection(rdd2)
– Returns elements in both rdd1 and rdd2
– Duplicates will be removed! (so set-semantics)
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
• rdd2 = sc.parallelize([1, 2, 2, 5])
• rdd1.intersection(rdd2)
– [2, 1]
98
subtract(rdd)
• rdd1.subtract(rdd2)
– Return values in rdd1 that do not appear in rdd2
– Note: neither set nor bag semantics!
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])
• rdd2 = sc.parallelize([1, 2, 2, 5])
• rdd1.subtract(rdd2)
– [3, 3, 3]
– Note: 1 not included in result (unlike bag difference)
99
subtractByKey(rdd)
• rdd1.subtractByKey(rdd2)
– Return each (key, value) pair in rdd1 that has no pair
with matching key in rdd2
• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3]).map(lambda
x: (x, 1))
• rdd2 = sc.parallelize([1, 2, 2, 5]).map(lambda x:
(x, 1))
• rdd1.subtractByKey(rdd2)
– [(3, 1), (3, 1), (3, 1)]
100
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
101
WordCount
• from operator import add
• lines = sc.textFile(“hello.txt”)
• counts = lines.flatMap(lambda x: x.split(‘ ‘)) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
• counts.collect()
=> [(u’this’, 1), (u’world’, 2), (u’hello’, 2)]
102
Word length histogram
• long: if > 4 letters
• short: otherwise
• def myFunc(x):
if len(x) > 4:
return (‘long’, 1)
else:
return (‘short’, 1)
103
Word length histogram
• sc.textFile(“hello.txt”) \
.flatMap(lambda x: x.split(” “)) \
.map(myFunc) \
.reduceByKey(add) \
.collect()
=> [(‘short’, 1), (‘long’, 4)]
104
Adding ratings for each person
Ratings.txt
(patrick, 4)
(matei, 3)
(patrick, 1)
(aaron, 2)
(aaron, 2)
(reynold, 1)
(aaron, 5)
105
(aaron, 9)
(patrick, 5)
…
Adding ratings for each person
• sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.collect()
=>
[[u’patrick’, u’4′], [u’matei’, u’3′], [u’patrick’, u’1′],
[u’aaron’, u’2)’], [u’aaron’, u’2′], [u’reynold’, u’1′],
[u’aaron’, u’5′]]
106
Strip off ()
Adding ratings for each person
• sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.map(lambda p: (p[0], int(p[1]))) \
.reduceByKey(lambda a, b: a + b) \
.collect()
=> [(u’patrick’, 5), (u’aaron’, 9), (u’reynold’, 1),
(u’matei’, 3)]
107
Execution steps
• Note that reduceByKey requires shuffling
108
Strip-off ()
Tokenize by ‘,’
Turn values
into integers
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
109
Shuffling
• Data are essentially repartitioned
– E.g., reduceByKey repartitions the data by key
• A costly operation: a lot of local & network
I/O’s
110
Another example: sortByKey
• Sampling stage:
– Sample data to create a range-partitioner
– Ensure even partitioning
• “Map” stage:
– Write (sorted) data to destined partition for reduce
stage
• “Reduce” stage:
– get map output for specific partition
– Merge the sorted data
111
Data are shuffled between Map and Reduce stage
Transformations that require shuffling
• reduceByKey(func)
• groupByKey()
• sortByKey([asc])
• distinct()
112
Transformations that require shuffling
• join(rdd):
– leftOuterJoin
– rightOuterJoin
– fullOuterJoin
• aggregateByKey(zeroValue, seqOp, combOp)
• intersection/subtract
• subtractByKey
113
Transformations that do not need
shuffling
• map(func)
• filter(func)
• flatMap(func)
• mapValues(func)
• union
• mapPartitions(func)
114
Roadmap
• Spark
– History, features, RDD, and installation
• RDD operations
– Creating initial RDDs
– Actions
– Transformations
• Examples
• Shuffling in Spark
• Persistence in Spark
115
RDD persistence
• rdd.persist(
• Store the content of RDD for later reuse
– storageLevel specifies where content is stored
– E.g., in memory (default) or on disk
• rdd.persist() or rdd.cache()
– Content stored in main memory
116
RDD persistence
• Executed at nodes having partitions of RDD
• Avoid re-computation of RDD in reuse
117
Example
• ratings = sc.textFile(“ratings.txt”) \
.map(lambda s: s[1:-1].split(“,”)) \
.map(lambda p: (p[0], int(p[1]))) \
.cache()
• ratings.reduceByKey(lambda a, b: a +
b).collect()
– ratings RDD will be computed for the first time &
result cached
118
Example
• ratings.countByKey()
– It will use cached content of “ratings” rdd
119
Automatic persistence
• Spark automatically persists intermediate data
in shuffling operations (e.g., reduceByKey)
• This avoids re-computation when node fails
120
https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
K-means clustering
• Find k clusters in a data set
– k is pre-determined
• Iterative process
– Start with initial guess of centers of clusters
– Repeatedly refine the guess until stable (e.g.,
centers do not change much)
• Need to use data set at each iteration
121
K-means clustering
• Assign point p to the closest center c
– Distance = Euclidean distance between p and c
• Re-compute the centers based on assignments
• Coordinates of center of a cluster =
– Average coordinate of all points in the cluster
– E.g., (1, 1, 1) (3, 3, 3) => center: (2, 2, 2)
122
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 1
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 2
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 3
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 4
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 5
-2 -1.5 -1 -0.5 0 0.5 1 1.5 2
0
0.5
1
1.5
2
2.5
3
x
y
Iteration 6
K-means clustering
124
Persist data points in memory
Sum of distances
between new and old
centers
Initial centers
New centers
Parse input & find closest center
125
kmeans-data.txt
• A text file contains the following lines
– 0.0 0.0 0.0
– 0.1 0.1 0.1
– 0.2 0.2 0.2
– 9.0 9.0 9.0
– 9.1 9.1 9.1
– 9.2 9.2 9.2
• Each line is a 3-dimensional data point
126
Parse & cache the input dataset
• “data” RDD is now cached in main memory
127
Generating initial centers
• Recall takeSample() action
– False: sample without replacement
– K = 2
128
Assign point to its closest center
• Center 0 has points: (0, 0, 0) and (.1, .1, .1)
• Center 1 has the rest: (.2, .2, .2), (.9, .9, .9), …
129
Getting statistics for each center
• pointStats has a key-value pair for each center
• Key is center # (0 or 1 for this example)
• Value is a tuple (sum, count)
– sum = the sum of coordinates over all points in
the cluster
– Count = # of points in the cluster
130
Computing coordinates of new centers
• Coordinate = sum of point coordinates/count
– E.g., center 0: [.1, .1, .1] /2 = [.05, .05, .05]
131
Can use mapValues here too:
newPoints1 = pointStats.mapValues(lambda stv: stv[0]/stv[1]).collect()
Distance btw new & old centers
• Old center: [.1, .1, .1] and [.2, .2, .2]
• New center: [.05, .05, .05] and [6.875, 6.875,
6.875]
• Distance = (.1-.05)2*3 + (6.875-.2)2*3 ~ 133.67
– To be more exact, it is sqrt(133.67) = 11.56
132
RDD operations
• A complete list:
– http://spark.apache.org/docs/latest/api/python/p
yspark.html
133
http://spark.apache.org/docs/latest/api/python/pyspark.html
Resources
• Spark programming guide:
– https://spark.apache.org/docs/latest/
• Lambda, filter, reduce and map:
– http://www.python-course.eu/lambda.php
• Improving Sort Performance in Apache Spark: It’s
a Double
– http://blog.cloudera.com/blog/2015/01/improving-
sort-performance-in-apache-spark-its-a-double/
134
https://spark.apache.org/docs/latest/
http://www.python-course.eu/lambda.php
http://www.python-course.eu/lambda.php
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
Readings
• Spark: Cluster Computing with Working Sets,
2010.
• Resilient Distributed Datasets: A Fault-Tolerant
Abstraction for In-Memory Cluster Computing,
2012.
135
http://people.csail.mit.edu/matei/papers/2010/hotcloud_spark.pdf
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
References
• Functional programming in Python
– https://docs.python.org/2/howto/functional.html
• Learning Spark by Matei Zaharia, et. Al.
O’Reilly, 2015
– https://www.safaribooksonline.com/library/view/l
earning-spark/9781449359034/
136
https://docs.python.org/2/howto/functional.html
https://docs.python.org/2/howto/functional.html
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/
References
• Sort-based shuffle implementation
– https://issues.apache.org/jira/browse/SPARK-
2045
• Sort-Based Shuffle in Spark
– https://issues.apache.org/jira/secure/attachment/
12655884/Sort-basedshuffledesign.pdf
137
https://issues.apache.org/jira/browse/SPARK-2045
https://issues.apache.org/jira/browse/SPARK-2045
https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf