CS计算机代考程序代写 SQL python Java hbase data mining hadoop cache algorithm Spark

Spark

Apache Spark

DSCI 551

Wensheng Wu

1

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark

• Persistence in Spark

2

History

3

Apache took over Hadoop

Characteristics of Hadoop

• Acyclic data flow model

– Data loaded from stable storage (e.g., HDFS)

– Processed through a sequence of steps

– Results written to disk

• Batch processing

– No interactions permitted during processing

4

Problems

• Ill-suited for iterative algorithms that requires
repeated reuse of data

– E.g., machine learning and data mining algorithms
such as k-means, PageRank, logistic regression

• Ill-suited for interactive exploration of data

– E.g., OLAP on big data

5

Spark

• Support working sets (of data) through RDD

– Enabling reuse & fault-tolerance

• 10x faster than Hadoop in iterative jobs

• Interactively explore 39GB (Wikipedia dump)
with sub-second response time

– Data were distributed over 15 EC2 instances

6

Spark

• Provides libraries to support

– embedded use of SQL

– stream data processing

– machine learning algorithms

– processing of graph data

7

Spark

• Support diverse data sources including HDFS,
Cassandra, HBase, and Amazon S3

8

RDD: Resilient Distributed Dataset

• RDD

– Read-only, partitioned collection of records

– Operations performed on partitions in parallel

– Maintain lineage for efficient fault-tolerance

• Methods of creating an RDD

– from an existing collection (e.g., Python list/tuple)

– from an external file

9

RDD: Resilient Distributed Dataset

• Distributed
– Data are divided into a number of partitions

– & distributed across nodes of a cluster to be
processed in parallel

• Resilient
– Spark keeps track of transformations to dataset

– Enable efficient recovery on failure (no need to
replicate large amount of data across network)

10

Architecture

• SparkContext (SC) object coordinates the
execution of application in multiple nodes

– Similar to Job Tracker in Hadoop MapReduce

11

SC: sending tasks

Acquiring resources

Executor: sending responses

Components

• Cluster manager

– Allocate resources across applications

– Can run Spark’s own cluster manager or

– Apache YARN (Yet Another Resource Negotiator)

• Executors

– Run tasks & store data

12

Spark installation

• http://spark.apache.org/downloads.html

– Choose “pre-built for Hadoop 3.2 and later”

• Direct link (choose version 3.0.2):

– https://downloads.apache.org/spark/spark-
3.0.2/spark-3.0.2-bin-hadoop3.2.tgz

13

http://spark.apache.org/downloads.html

Spark installation

• tar xvf spark-3.0.2-bin-hadoop3.2.tgz

– This will create “spark-3.0.2-bin-hadoop3.2” folder

– Containing all Spark stuffs (scripts, programs,
libraries, examples, data)

14

Prerequisites

• Make sure Java is installed & JAVA_HOME is
set

15

Accessing Spark from Python

• Interactive shell:
– bin/pyspark

– A SparkContext object sc will be automatically
created

• bin/pyspark –master local[4]
– This starts Spark on local host with 4 threads

– “–master” specifies the location of Spark master
node

16

Accessing Spark from Python

• Standalone program

– Executed using spark-submit script

– E.g., bin/spark-submit wc.py

• You may find many Python Spark examples
under

– examples/src/main/python

17

wc.py

from pyspark import SparkContext
from operator import add

sc = SparkContext(appName=”dsci351″)

lines = sc.textFile(‘hello.txt’)

counts = lines.flatMap(lambda x: x.split(‘ ‘)) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)

output = counts.collect()

for v in output:
print(v[0], v[1])

18

Make sure you have this file
under the same directory

where wc.py is located

hello.txt

hello world

hello this world

19

Suppress verbose log messages

• cd conf

• cp log4j.properties.template log4j.properties

• edit log4j.properties

– change first line to:

• log4j.rootCategory=ERROR, console

– Or to:

• log4j.rootCategory=WARN, console

20

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark

• Persistence in Spark

21

Creating an initial RDD

• From an external file

– textFile(, [# of partitions])

– lines = sc.textFile(“hello.txt”, 2)

• From an existing Python collection (e.g., list,
tuple, and dictionary)

– data = sc.parallelize([1, 2, 3, 4, 5], 2)

– create two partitions from given list

22

Creating RDD from an external file

• lines = sc.textFile(“hello.txt”) # lines is an RDD

– Return a collection of lines

– Spark does not check if file exists right away

– Nor does it read from the file now

23

Action

• Perform a computation on an RDD

– Return a final value (not an RDD) to client

• Usually the last operation on an RDD

• E.g., reduce(func)

– aggregates all elements in the RDD using func

– returns aggregated value to client

24

Actions

• getNumPartitions()

• foreachPartition(func)

• collect()

• take(n)

• count(), sum(), max(), min(), mean()

• reduce(func)

• aggregate(zeroVal, seqOp, combOp)

• takeSample(withReplacement, num, [seed])

• countByKey()
25

getNumPartitions()

• How many partitions does an RDD have?

• E.g., lines.getNumPartitions()

=> 1

• E.g., data.getNumPartitions()

=> 2

26

foreachPartition(func)

• What are in each partition?

• def printf(iterator):

par = list(iterator)

print ‘partition:’, par

• sc.parallelize([1, 2, 3, 4, 5], 2).foreachPartition(printf)

=> partition: [3, 4, 5]

partition: [1, 2]

27

Iterator for the list of elements
in the partition

collect()

• Show the entire content of an RDD

• sc.parallelize([1, 2, 3, 4, 5], 2).collect()

• collect()
– Fetch the entire RDD as a Python list
– RDD may be partitioned among multiple nodes
– collect() brings all partitions to the client’s node

• Problem:
– may run out of memory when the data set is large

28

take(n)

• take(n): collect first n elements from an RDD

• l = [1,2,3,4,5]

• rdd = sc.parallelize(l, 2)

• rdd.take(3)

=>

[1,2,3]

29

count()

• Return the number of elements in the dataset
– It first counts in each partition

– Then sum them up in the client

• l = [1,2,3,4,5]

• rdd = sc.parallelize(l, 2)

• rdd.count()

=> 5
30

sum()

• Add up the elements in the dataset

• l = [1,2,3,4,5]

• rdd = sc.parallelize(l)

• rdd.sum()

=> 15

31

reduce(func)

• Use func to aggregate the elements in RDD

• func(a,b):

– Takes two input arguments, e.g., a and b

– Outputs a value, e.g., a + b

• func should be commutative and associative

– Applied to each partition (like a combiner)

32

reduce(func)

• func is continually applied to elements in RDD

– [1, 2, 3]

– First, compute func(1, 2) => x

– Then, compute func(x, 3)

• If RDD has only one element x, it outputs x

• Similar to reduce() in Python

33

Recall Python example

• def add(a, b): return a + b

• reduce(add, [1, 2, 3])

 6

Or simply reduce(lambda a, b: a + b, [1, 2, 3])

34

Spark example

• def add(a, b): return a + b

• data = sc.parallelize([1, 2, 3], 2)

• data.reduce(add)
 6

Or simply: data.reduce(lambda a, b: a + b)

35

Implementation of reduce(func)

• Suppose [1, 2, 3, 4, 5] => two partitions:

– [1, 2] and [3, 4, 5]

• rdd = sc.parallelize([1, 2, 3, 4, 5], 2)

• Consider reduce(add)

36

Local reduction

• Apply add to reduce each partition locally

– Using mapPartition(func) (see transformations)

• Func: apply ‘add’ function to reduce a
partition

– E.g., using Python reduce function

– reduce(add, [1, 2]) => 3

– reduce(add, [3, 4, 5]) => 12

37

Global reduction

• Collect all local results

– using collect()

=> res = [3, 12]

• Use Python reduce to obtain final result

– reduce(add, res) => reduce(add, [3, 12]) =15

38

Example: finding largest integers

• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]

• pdata = sc.parallelize(data)

• pdata.reduce(lambda x, y: max(x, y))

5

• Or simply: pdata.reduce(max)

39

aggregate(zeroValue, seqOp, combOp)

• For each partition p (values in the partition),

– “reduce”(seqOp, p, zeroValue)

– Note if p is empty, it will return zeroValue

• For a list of values, vals, from all partitions,
execute:

– reduce(combOp, vals, zeroValue)

40

But note reduce here is different from that in Python:
zeroValue can have different type than values in p

seqOp and combOp

• seqOp(U, v):
– how to aggregate values v’s in the partition into U

– U: accumulator, initially U = zeroValue

– Note: U and v may be of different data type

• combOp(U, p):
– how to combine results from multiple partitions

– U: accumulator, initially U = zeroValue

– p: result from a partition

41

Python reduce() w/o initial value

• reduce(func, list)

• If list is empty => ERROR

• Else if list contains a single element v, return v

• Otherwise, set accumulator x = list[0]

– for each of remaining element list[i]

• x = func(x, list[i])

– Return final value of x

42

Python reduce() with initial value

• reduce(func, list, initialValue)

• Same as:

– reduce(func, [initialValue] + list)

• Note: list can be empty now

– reduce() will return initialValue when list is empty

43

reduce(f) vs aggregate(z, f1, f2)

• func in reduce(func) needs to be commutative
and associative

– While f1 and f2 in aggregate(z, f1, f2) do not need
to be

– f1: similar to the combiner function in Hadoop

• Need to specify initial value for aggregate()

– & it can be of different type than values in RDD

44

Example

• data = sc.parallelize([1], 2)

• data.foreachPartition(printf)
– P1: []

– P2: [1]

• data.aggregate(1, add, add)
– P1 => [1] => after reduction => 1

– P2 => [1] + [1] = [1, 1] => 2

– final: [1] + [1, 2] => [1, 1, 2] => 4

45

Example

• data.aggregate(2, add, lambda U, v: U * v)

– P1 => 2

– P2 => 3

– Final: [2] + [2, 3] => 2 * 2 * 3 = 12

(where [2] is zeroValue, [2,3] is the list of values
from partitions)

46

Implementing count() using
aggregate()

• data = sc.parallelize([1, 2, 3, 4, 5])

• …

47

Implementing mean() using
aggregate()

• data = sc.parallelize([1, 2, 3, 4, 5])

• …

48

takeSample(withReplacement, num,
[seed])

• Take a random sample of elements in rdd

• withReplacement: True if with replacement

• num: sample size

• optional seed: for random number generator

• Useful in many applications, e.g., k-means
clustering

49

Example

• data = sc.parallelize(xrange(10))

• data.takeSample(False, 2, 1)

– [8, 0]

50

countByKey()

• Only available on RDDs of type (K, V)

– i.e., RDD that contains a list of key-value pairs,
e.g., (‘hello’, 3)

• Return a hashmap (dictionary in Python) of (K,
Int) pairs with count for each unique key in
RDD

– Count for key k = # of tuples whose key is k

51

Example

• d = [(‘hello’, 1), (‘world’, 1), (‘hello’, 2), (‘this’,
1), (‘world’,0)]

• data = sc.parallelize(d)

• data.countByKey()

=> {‘this’: 1, ‘world’: 2, ‘hello’: 2}

52

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark

• Persistence in Spark

53

Transformation

• Create a new RDD from an existing one

• E.g., map(func)

– Applies func to each element of an RDD

– Returns a new RDD representing mapped result

54

Lazy transformations

• Spark does not apply them to RDD right away

– Just remember what needs to be done

– Perform transformations until an action is applied

• Advantage

– Results of transformations pipelined to the action

– No need to return intermediate results to clients

=> more efficient

55

Avoid re-computation

• However, this means that the same RDD may
be recomputed multiple times if it is used in
multiple actions

=> All transformations need to be redone

=> Consequence: costly

• Solution: allow caching of RDDs in memory

– May also persist them on disk

56

Transformations

• map(func)

• filter(func)

• flatMap(func)

• reduceByKey(func)

• groupByKey()

• sortByKey(True/False)

• distinct()

• mapPartitions(func)

57

func, numPartitions=None, partitionFunc=

numPartitions=None, partitionFunc=

ascending=True, numPartitions=None, keyfunc
=>)

numPartitions=None

Transformations

• join(rdd, [numTasks])
– leftOuterJoin
– rightOuterJoin
– fullOuterJoin

• aggregateByKey(zeroValue, seqOp, combOp,
[numTasks])

• mapValues(func)
• flatMapValues(func)
• union/intersection/subtract
• subtractByKey

58

Transformations

• groupBy(f)

– f is a function that produces the group key

59

f, numPartitions=None, partitionFunc=)

map(func)

• map(func): Apply a function func to each
element in input RDD

– func returns a value (could be a list)

• Output the new RDD containing the
transformed values produced by func

60

Example

• lines = sc.textFile(“hello.txt”)

• lineSplit = lines.map(lambda s: s.split())

=> [[‘hello’, ‘world’], [‘hello’, ‘this’, ‘world’]]

• lineLengths = lines.map(lambda s: len(s))

=> [11, 16]

61

filter(func)

• filter(func): return a new RDD with elements
of existing RDD for which func returns true

• func should be a boolean function

• lines1 = lines.filter(lambda line: “this” in line)
 [‘hello this world’]

• What about: lines.filter(lambda s: len(s) > 11)?

62

Notes

• data = sc.parallelize([1, 2, 3, 4, 5, 1, 3, 5], 2)

• data.map(lambda x: x if x % 2 == 0 else None).collect()

• def f(x):
if x % 2 == 0:

return x
else:

pass

• data.map(f).collect()

63

Same as “return None”

Produce the same result as above

[None, 2, None, 4, None, None, None, None]

Result

Python filter

• l = [1, 2, 3, 4, 5, 1, 3, 5]

• filter(lambda x: x % 2 == 0, l)

– [2, 4]

64

Spark implementation of filter

• def even(x): return x % 2 == 0

• data.filter(even)

Implemented as follows:

• def processPartition(iterator):

return filter(even, iterator)

• data.mapPartitions(processPartition)

65

mapPartitions(func)

• Apply transformation to a partition

– input to func is an iterator (over the elements in
the partition)

– func must return an iterable (a list or use yield to
return a generator)

• Different from map(func)

– func in map(func) applies to an element

66

Implementing aggregate()

• rdd.aggregate((0,0), combFunc, reduFunc)

• def combFunc(U, x): return (U[0] + x, U[1] + 1)
• def reduFunc(U, V): return (U[0] + V[0], U[1] +

V[1])

• def sumf(iterator):
return [reduce(combFunc, iterator, (0, 0))]

• rdd.mapPartitions(sumf).reduce(reduFunc)

67

Exercise

• Implement count() using mapPartitions() and
reduce() only

– rdd = sc.parallelize([1, 1, 2, 3, 3, 3], 2)

– rdd.count() => 6

68

flatMap(func)

• flatMap(func):

– similar to map

– But func here must return a list (or generator) of
elements

– & flatMap merges these lists into a single list

• lines.flatMap(lambda x: x.split())

=>rdd: [‘hello’, ‘world’, ‘hello’, ‘this’, ‘world’]

69

reduceByKey()

• reduceByKey(func)

– Input: a collection of (k, v) pairs

– Output: a collection of (k, v’) pairs

• v’: aggregated value of v’s in all (k, v) pairs
with the same key k by applying func

• func is the aggregation function

– Similar to func in the reduce(func, list) in Python
70

reduceByKey(func)

• It first performs partition-site reduction & then
global reduction
– By executing the same reduce function

• In other words, func needs to be commutative
and associative

• More details:
– http://spark.apache.org/docs/latest/api/python/pysp

ark.html

71

http://spark.apache.org/docs/latest/api/python/pyspark.html

Example

• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)

• def printf(part):

print list(part)

• rddp.foreachPartition(printf)
– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)]

– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)]

72

Example

• from operator import add

• rddp.reduceByKey(add)

• It will first execute local reduce:

– Partition 1: [(1, 2), (1, 3), (2, 2), (1, 4)] => (1, 9),
(2,2)

– Partition 2: [(3, 5), (2, 4), (1, 5), (2, 6)] => (3, 5), (1,
5), (2, 10)

73

Example

• Final reduce at reducer side

– (1, 9), (1, 5) => (1, 14)

– (2, 2), (2, 10) => (2, 12)

– (3, 5) => (3, 5)

• Note that if there are two reducers, then:

– Some keys, e.g., 1, may be reduced by one reducer

– Others, e.g., 2 and 3, by the other

74

reduceByKey() vs. reduce()

• reduceByKey() returns an RDD

– Reduce values per key

• reduce() returns a non-RDD value

– Reduce all values!

75

Exercise

• Implement countByKey using reduceByKey

– rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4), (3,5),
(2, 4), (1, 5), (2, 6)], 2)

– rddp.countByKey() => {1: 4, 2: 3, 3: 1}

76

aggregateByKey

• aggregateByKey(zeroValue, combOp, reduOp)

– Input RDD: a list of (k, v) pairs

– Aggregate values for each key

• Return a value U for each key

– Note that U may be a tuple

– zeroValue: initial value for U

– combOp(U, v): (function for) local reduction

– reduOp(U1, U2): global reduction
77

Computing group averages

• rdd1 = rddp.aggregateByKey((0,0), lambda
U,v: (U[0] + v, U[1] + 1), lambda U1,U2: (U1[0]
+ U2[0], U1[1] + U2[1]))

– [(2, (12, 3)), (1, (14, 4)), (3, (5, 1))]

• rdd1.map(lambda (x, (y, z)): (x, float(y)/z))

– [(2, 4.0), (1, 3.5), (3, 5.0)]

78

Example: aggregateByKey

• data = sc.parallelize([(1, 1), (1,2), (1,3)], 2)

• data.foreachPartition(printf)

– [(1, 1)]

– [(1, 2), (1, 3)]

• data.aggregateByKey(1, add, add).collect()

– [(1, 8)]

79

Compared with aggregate()

• data = sc.parallelize([1, 2, 3], 2)

• data.foreachPartition(printf)

– [1]

– [2, 3]

• data.aggregate(1, add, add)

– 9

80

aggregateByKey vs. aggregate

• zeroValue in aggregateByKey

– Used only combOp (i.e., reduction within a
partition)

• zeroValue in aggregate

– Used in both combOp and reduOp

– E.g., data.aggregate(1, add, add) => 9

81

aggregateByKey vs. reduceByKey

• aggregateByKey more general than reduceKey
– Can specify different functions for combiner and

reducer

– can specify initial value for U, the accumulator

– aggregated value may have different type than
that of value v of input RDD

• E.g., in previous example:
– v is an integer, while U is a tuple (sum, count)

82

Exercise

• Implement reduceByKey(add) using
aggregateByKey()

• rddp = sc.parallelize([(1,2), (1,3), (2,2), (1,4),
(3,5), (2, 4), (1, 5), (2, 6)], 2)

– rddp.reduceByKey(add) => [(2, 12), (1, 14), (3, 5)]

83

groupByKey()

• groupByKey()

– Similar to reduceByKey(func)

– But without func & returning (k, Iterable(v))
instead

• rddp.groupByKey()

[(2, ), (1, …), (3, …)]

84

Example

• rddp.groupByKey().mapValues(list).collect()

– mapValues converts iterable value into a list

=> [(2, [2, 4, 6]), (1, [2, 3, 4, 5]), (3, [5])]

85

groupBy()

• rddp.groupBy(lambda t:
t[0]).mapValues(list).collect()

=>

[(2, [(2, 2), (2, 4), (2, 6)]), (1, [(1, 2), (1, 3), (1, 4),
(1, 5)]), (3, [(3, 5)])]

86

sortByKey(True/False)

• sortByKey([asc])

– Sort input RDD with (k, v) pairs by key

– Ascending if asc (a boolean value) is True

• rddp.sortByKey(False).collect()

=> [(3, 5), (2, 2), (2, 4), (2, 6), (1, 2), (1, 3), (1, 4),
(1, 5)]

87

distinct()

• Return an RDD with distinct elements of source RDD

• data = [5, 4, 4, 1, 2, 3, 3, 1, 2, 5, 4, 5]

• pdata = sc.parallelize(data, 2)

• pdata.distinct().collect()

=> [2, 4, 1, 3, 5]

88

Exercise

• Implement distinct() using
reduceByKey()/groupByKey()

• rdd = sc.parallelize([3, 1, 2, 3, 1, 3, 3, 2])

• rdd.distinct()

=> [1, 2, 3]

89

join(rdd)

• rdd1.join(rdd2)

– Joining tuples of two RDDs on the key

– rdd1: an RDD containing a list of (k, v)’s

– rdd2: another RDD containing a list of (k, w)’s

• Output an RDD containing (k, (v, w))’s

– That is, (k, v) joins with (k, w) => (k, (v, w))

90

Example

• ds1 = sc.parallelize([(1,2), (2,3)])

• ds2 = sc.parallelize([(2,4), (3,5)])

• ds1.join(ds2)

– [(2, (3, 4))]

91

Outer joins

• Also retain dangling tuples

• ds1.leftOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4))]

• ds1.rightOuterJoin(ds2)
– [(2, (3, 4)), (3, (None, 5))]

• ds1.fullOuterJoin(ds2)
– [(1, (2, None)), (2, (3, 4)), (3, (None, 5))]

92

mapValues

• mapValues(func)

– For each key, apply func to each value of the key

• x = sc.parallelize([(“a”, [“apple”, “banana”,
“lemon”]), (“b”, [“grapes”])])

• x.mapValues(lambda l: len(l)).collect()

– [(‘a’, 3), (‘b’, 1)]

93

flatMapValues(func)

• mapValues part

– For each key k, apply func to its value, return a list
[i1, i2, …]

• flatMap part

– flatten the lists into a single list but retain the key

=> [(k, i1), (k, i2), …, (k’, i1′), (k’, i2′), …]

94

Example

• rdd = sc.parallelize([(1, “hello world”), (2,
“hello this world”)])

– For example, 1 and 2 may be document id’s

• rdd2 = rdd.flatMapValues(lambda s: s.split())

– [(1, ‘hello’), (1, ‘world’), (2, ‘hello’), (2, ‘this’), (2,
‘world’)]

95

Exercise

• Use mapValues() and flatMap() implement
flatMapValues() in the previous slide

96

union(rdd)

• rdd1.union(rdd2)

– Returns all elements in rdd1 and rdd2

– Does not remove duplicates (so bag union)

• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3], 2)

• rdd2 = sc.parallelize([1, 2, 2, 5], 2)

• rdd1.union(rdd2)

– [1, 1, 2, 3, 3, 3, 1, 2, 2, 5]

97

2 partitions

2 partitions

4 partitions

intersection(rdd)

• rdd1.intersection(rdd2)

– Returns elements in both rdd1 and rdd2

– Duplicates will be removed! (so set-semantics)

• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])

• rdd2 = sc.parallelize([1, 2, 2, 5])

• rdd1.intersection(rdd2)

– [2, 1]

98

subtract(rdd)

• rdd1.subtract(rdd2)

– Return values in rdd1 that do not appear in rdd2

– Note: neither set nor bag semantics!

• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3])

• rdd2 = sc.parallelize([1, 2, 2, 5])

• rdd1.subtract(rdd2)

– [3, 3, 3]

– Note: 1 not included in result (unlike bag difference)

99

subtractByKey(rdd)

• rdd1.subtractByKey(rdd2)
– Return each (key, value) pair in rdd1 that has no pair

with matching key in rdd2

• rdd1 = sc.parallelize([1, 1, 2, 3, 3, 3]).map(lambda
x: (x, 1))

• rdd2 = sc.parallelize([1, 2, 2, 5]).map(lambda x:
(x, 1))

• rdd1.subtractByKey(rdd2)
– [(3, 1), (3, 1), (3, 1)]

100

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark
101

WordCount

• from operator import add

• lines = sc.textFile(“hello.txt”)

• counts = lines.flatMap(lambda x: x.split(‘ ‘)) \

.map(lambda x: (x, 1)) \

.reduceByKey(add)

• counts.collect()

=> [(u’this’, 1), (u’world’, 2), (u’hello’, 2)]

102

Word length histogram

• long: if > 4 letters

• short: otherwise

• def myFunc(x):

if len(x) > 4:

return (‘long’, 1)

else:

return (‘short’, 1)

103

Word length histogram

• sc.textFile(“hello.txt”) \

.flatMap(lambda x: x.split(” “)) \

.map(myFunc) \

.reduceByKey(add) \

.collect()

=> [(‘short’, 1), (‘long’, 4)]

104

Adding ratings for each person

Ratings.txt

(patrick, 4)

(matei, 3)

(patrick, 1)

(aaron, 2)

(aaron, 2)

(reynold, 1)

(aaron, 5)

105

(aaron, 9)
(patrick, 5)

Adding ratings for each person

• sc.textFile(“ratings.txt”) \

.map(lambda s: s[1:-1].split(“,”)) \

.collect()

=>

[[u’patrick’, u’4′], [u’matei’, u’3′], [u’patrick’, u’1′],
[u’aaron’, u’2)’], [u’aaron’, u’2′], [u’reynold’, u’1′],
[u’aaron’, u’5′]]

106

Strip off ()

Adding ratings for each person

• sc.textFile(“ratings.txt”) \

.map(lambda s: s[1:-1].split(“,”)) \

.map(lambda p: (p[0], int(p[1]))) \

.reduceByKey(lambda a, b: a + b) \

.collect()

=> [(u’patrick’, 5), (u’aaron’, 9), (u’reynold’, 1),
(u’matei’, 3)]

107

Execution steps

• Note that reduceByKey requires shuffling

108

Strip-off ()
Tokenize by ‘,’

Turn values
into integers

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark

• Persistence in Spark

109

Shuffling

• Data are essentially repartitioned

– E.g., reduceByKey repartitions the data by key

• A costly operation: a lot of local & network
I/O’s

110

Another example: sortByKey

• Sampling stage:

– Sample data to create a range-partitioner

– Ensure even partitioning

• “Map” stage:

– Write (sorted) data to destined partition for reduce
stage

• “Reduce” stage:

– get map output for specific partition

– Merge the sorted data
111

Data are shuffled between Map and Reduce stage

Transformations that require shuffling

• reduceByKey(func)

• groupByKey()

• sortByKey([asc])

• distinct()

112

Transformations that require shuffling

• join(rdd):

– leftOuterJoin

– rightOuterJoin

– fullOuterJoin

• aggregateByKey(zeroValue, seqOp, combOp)

• intersection/subtract

• subtractByKey

113

Transformations that do not need
shuffling

• map(func)

• filter(func)

• flatMap(func)

• mapValues(func)

• union

• mapPartitions(func)

114

Roadmap

• Spark
– History, features, RDD, and installation

• RDD operations
– Creating initial RDDs
– Actions
– Transformations

• Examples

• Shuffling in Spark

• Persistence in Spark

115

RDD persistence

• rdd.persist()

• Store the content of RDD for later reuse

– storageLevel specifies where content is stored

– E.g., in memory (default) or on disk

• rdd.persist() or rdd.cache()

– Content stored in main memory

116

RDD persistence

• Executed at nodes having partitions of RDD

• Avoid re-computation of RDD in reuse

117

Example

• ratings = sc.textFile(“ratings.txt”) \

.map(lambda s: s[1:-1].split(“,”)) \

.map(lambda p: (p[0], int(p[1]))) \

.cache()

• ratings.reduceByKey(lambda a, b: a +
b).collect()

– ratings RDD will be computed for the first time &
result cached

118

Example

• ratings.countByKey()

– It will use cached content of “ratings” rdd

119

Automatic persistence

• Spark automatically persists intermediate data
in shuffling operations (e.g., reduceByKey)

• This avoids re-computation when node fails

120

https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose

K-means clustering

• Find k clusters in a data set
– k is pre-determined

• Iterative process
– Start with initial guess of centers of clusters

– Repeatedly refine the guess until stable (e.g.,
centers do not change much)

• Need to use data set at each iteration

121

K-means clustering

• Assign point p to the closest center c

– Distance = Euclidean distance between p and c

• Re-compute the centers based on assignments

• Coordinates of center of a cluster =

– Average coordinate of all points in the cluster

– E.g., (1, 1, 1) (3, 3, 3) => center: (2, 2, 2)

122

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 1

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 2

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 3

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 4

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 5

-2 -1.5 -1 -0.5 0 0.5 1 1.5 2

0

0.5

1

1.5

2

2.5

3

x

y

Iteration 6

K-means clustering

124

Persist data points in memory

Sum of distances
between new and old

centers

Initial centers

New centers

Parse input & find closest center

125

kmeans-data.txt

• A text file contains the following lines

– 0.0 0.0 0.0

– 0.1 0.1 0.1

– 0.2 0.2 0.2

– 9.0 9.0 9.0

– 9.1 9.1 9.1

– 9.2 9.2 9.2

• Each line is a 3-dimensional data point

126

Parse & cache the input dataset

• “data” RDD is now cached in main memory

127

Generating initial centers

• Recall takeSample() action

– False: sample without replacement

– K = 2

128

Assign point to its closest center

• Center 0 has points: (0, 0, 0) and (.1, .1, .1)

• Center 1 has the rest: (.2, .2, .2), (.9, .9, .9), …

129

Getting statistics for each center

• pointStats has a key-value pair for each center

• Key is center # (0 or 1 for this example)

• Value is a tuple (sum, count)

– sum = the sum of coordinates over all points in
the cluster

– Count = # of points in the cluster

130

Computing coordinates of new centers

• Coordinate = sum of point coordinates/count

– E.g., center 0: [.1, .1, .1] /2 = [.05, .05, .05]

131

Can use mapValues here too:
newPoints1 = pointStats.mapValues(lambda stv: stv[0]/stv[1]).collect()

Distance btw new & old centers

• Old center: [.1, .1, .1] and [.2, .2, .2]

• New center: [.05, .05, .05] and [6.875, 6.875,
6.875]

• Distance = (.1-.05)2*3 + (6.875-.2)2*3 ~ 133.67

– To be more exact, it is sqrt(133.67) = 11.56

132

RDD operations

• A complete list:

– http://spark.apache.org/docs/latest/api/python/p
yspark.html

133

http://spark.apache.org/docs/latest/api/python/pyspark.html

Resources

• Spark programming guide:
– https://spark.apache.org/docs/latest/

• Lambda, filter, reduce and map:
– http://www.python-course.eu/lambda.php

• Improving Sort Performance in Apache Spark: It’s
a Double
– http://blog.cloudera.com/blog/2015/01/improving-

sort-performance-in-apache-spark-its-a-double/

134

https://spark.apache.org/docs/latest/
http://www.python-course.eu/lambda.php
http://www.python-course.eu/lambda.php
http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/

Readings

• Spark: Cluster Computing with Working Sets,
2010.

• Resilient Distributed Datasets: A Fault-Tolerant
Abstraction for In-Memory Cluster Computing,
2012.

135

http://people.csail.mit.edu/matei/papers/2010/hotcloud_spark.pdf
https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

References

• Functional programming in Python

– https://docs.python.org/2/howto/functional.html

• Learning Spark by Matei Zaharia, et. Al.
O’Reilly, 2015

– https://www.safaribooksonline.com/library/view/l
earning-spark/9781449359034/

136

https://docs.python.org/2/howto/functional.html
https://docs.python.org/2/howto/functional.html
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/

References

• Sort-based shuffle implementation

– https://issues.apache.org/jira/browse/SPARK-
2045

• Sort-Based Shuffle in Spark

– https://issues.apache.org/jira/secure/attachment/
12655884/Sort-basedshuffledesign.pdf

137

https://issues.apache.org/jira/browse/SPARK-2045
https://issues.apache.org/jira/browse/SPARK-2045
https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf