How do I make an RDD?¶
RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from files on the local drive. All data files can be downloaded from https://www.cse.ust.hk/msbd5003/data/
In [1]:
# Read data from local file system:
fruits = sc.textFile(‘file:///C:/Users/hanya/1Jupyter Notebook/fruits.txt’)
yellowThings = sc.textFile(‘file:///C:/Users/hanya/1Jupyter Notebook/yellowthings.txt’)
print(fruits.collect())
print(yellowThings.collect())
[‘apple’, ‘banana’, ‘canary melon’, ‘grape’, ‘lemon’, ‘orange’, ‘pineapple’, ‘strawberry’]
[‘banana’, ‘bee’, ‘butter’, ‘canary melon’, ‘gold’, ‘lemon’, ‘pineapple’, ‘sunflower’]
In [5]:
# Read data from HDFS :
fruits = sc.textFile(‘hdfs://url:9000/pathname/fruits.txt’)
fruits.collect()
—————————————————————————
Py4JJavaError Traceback (most recent call last)
~\Downloads\spark-2.4.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
62 try:
—> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
~\Downloads\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 “An error occurred while calling {0}{1}{2}.\n”.
–> 328 format(target_id, “.”, name), value)
329 else:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: java.net.UnknownHostException: url
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.
at org.apache.hadoop.hdfs.DFSClient.
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.net.UnknownHostException: url
… 44 more
During handling of the above exception, another exception occurred:
IllegalArgumentException Traceback (most recent call last)
1 # Read data from HDFS :
2 fruits = sc.textFile(‘hdfs://url:9000/pathname/fruits.txt’)
—-> 3 fruits.collect()
~\Downloads\spark-2.4.0-bin-hadoop2.7\python\pyspark\rdd.py in collect(self)
814 “””
815 with SCCallSiteSync(self.context) as css:
–> 816 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
817 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
818
~\Downloads\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\Downloads\spark-2.4.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
77 raise QueryExecutionException(s.split(‘: ‘, 1)[1], stackTrace)
78 if s.startswith(‘java.lang.IllegalArgumentException: ‘):
—> 79 raise IllegalArgumentException(s.split(‘: ‘, 1)[1], stackTrace)
80 raise
81 return deco
IllegalArgumentException: ‘java.net.UnknownHostException: url’
RDD operations¶
In [2]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
In [3]:
fruitsReversed.persist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()
Out[3]:
[‘elppa’,
‘ananab’,
‘nolem yranac’,
‘eparg’,
‘nomel’,
‘egnaro’,
‘elppaenip’,
‘yrrebwarts’]
In [5]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5)
shortFruits.collect()
Out[5]:
[u'apple', u'grape', u'lemon']
In [6]:
# flatMap
characters = fruits.flatMap(lambda fruit: list(fruit))
characters.collect()
Out[6]:
[u'a', u'p', u'p', u'l', u'e', u'b', u'a', u'n', u'a', u'n', u'a', u'c', u'a', u'n', u'a', u'r', u'y', u' ', u'm', u'e', u'l', u'o', u'n', u'g', u'r', u'a', u'p', u'e', u'l', u'e', u'm', u'o', u'n', u'o', u'r', u'a', u'n', u'g', u'e', u'p', u'i', u'n', u'e', u'a', u'p', u'p', u'l', u'e', u's', u't', u'r', u'a', u'w', u'b', u'e', u'r', u'r', u'y']
In [14]:
# union
fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.collect()
Out[14]:
[u'apple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry', u'banana', u'bee', u'butter', u'canary melon', u'gold', u'lemon', u'pineapple', u'sunflower']
In [15]:
# intersection
yellowFruits = fruits.intersection(yellowThings)
yellowFruits.collect()
Out[15]:
[u'lemon', u'canary melon', u'banana', u'pineapple']
In [16]:
# distinct
distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.collect()
Out[16]:
[u'orange', u'grape', u'lemon', u'butter', u'canary melon', u'strawberry', u'apple', u'banana', u'sunflower', u'gold', u'bee', u'pineapple']
RDD actions¶
Following are examples of some of the common actions available. For a detailed list, see RDD Actions.
Run some transformations below to understand this better. Place the cursor in the cell and press SHIFT + ENTER.
In [7]:
# collect
fruitsArray = fruits.collect()
yellowThingsArray = yellowThings.collect()
fruitsArray
Out[7]:
[u'apple', u'banana', u'canary melon', u'grape', u'lemon', u'orange', u'pineapple', u'strawberry']
In [19]:
# count
numFruits = fruits.count()
numFruits
Out[19]:
8
In [20]:
# take
first3Fruits = fruits.take(3)
first3Fruits
Out[20]:
[u'apple', u'banana', u'canary melon']
In [26]:
# reduce
letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y))
letterSet
Out[26]:
set([u'a', u' ', u'c', u'b', u'e', u'g', u'i', u'm', u'l', u'o', u'n', u'p', u's', u'r', u't', u'w', u'y'])
In [ ]:
letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect()
letterSet
Closure¶
In [22]:
counter = 0
rdd = sc.parallelize(xrange(10))
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print counter
0
In [2]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
a = rdd.foreach(g)
print accum.value
45
In [26]:
rdd = sc.parallelize(xrange(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
return x * x
a = rdd.map(g)
print accum.value
print rdd.reduce(lambda x, y: x+y)
#a.cache()
print accum.value
tmp = a.count()
print accum.value
print rdd.reduce(lambda x, y: x+y)
tmp = a.count()
print accum.value
print rdd.reduce(lambda x, y: x+y)
0
45
0
45
45
90
45
In [1]:
from operator import add
rdd = sc.parallelize(xrange(10))
print rdd.sum()
45
Computing Pi using Monte Carlo simulation¶
In [8]:
# From the official spark examples.
import sys
import random
partitions = 1
n = 10000000 * partitions
def f(_):
x = random.random()
y = random.random()
return 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(range(1, n + 1), partitions) \
.map(f).sum()
print("Pi is roughly", 4.0 * count / n)
Pi is roughly 3.141138
In [14]:
import sys
import random
a = sc.parallelize(xrange(0,20),4)
print a.collect()
print a.glom().collect()
a.map(lambda x: random.random()).glom().collect()
def f(index, it):
s = index
for i in it:
s += i
yield s
a.mapPartitionsWithIndex(f).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]
Out[14]:
[0, 1, 3, 6, 10, 6, 12, 19, 27, 36, 12, 23, 35, 48, 62, 18, 34, 51, 69, 88]
In [7]:
# Correct version
partitions = 100
n = 100000 * partitions
def f(index, it):
random.seed(index + 987236)
for i in it:
x = random.random()
y = random.random()
yield 1 if x ** 2 + y ** 2 < 1 else 0
count = sc.parallelize(range(1, n + 1), partitions) \
.mapPartitionsWithIndex(f).reduce(lambda a,b: a+b)
print("Pi is roughly", 4.0 * count / n)
Pi is roughly 3.1410596
Closure and Persistence¶
In [1]:
A = sc.parallelize(range(10))
x = 5
B = A.filter(lambda z: z < x)
B.unpersist()
# print B.take(10)
print(B.collect())
x = 3
#print B.take(10)
print(B.collect())
# collect() doesn't always re-collect data - bad design!
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
In [21]:
# RDD variables are references
A = sc.parallelize(xrange(10))
B = A.map(lambda x: x*2)
A = B.map(lambda x: x+1)
A.take(10)
Out[21]:
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
In [12]:
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4
In [14]:
A.first()
Out[14]:
34
In [2]:
# Linear-time selection
data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26]
A = sc.parallelize(data,2)
k = 4
while True:
x = A.first()
A1 = A.filter(lambda z: z < x)
A2 = A.filter(lambda z: z > x)
mid = A1.count()
if mid == k:
print(x)
break
if k < mid:
A = A1
else:
A = A2
k = k - mid - 1
A.cache()
44
In [13]:
sorted(data)
Out[13]:
[12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89]
Key-Value Pairs¶
In [25]:
# reduceByKey
numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y)
numFruitsByLength.collect()
Out[25]:
[(10, 1), (12, 1), (6, 2), (9, 1), (5, 3)]
In [23]:
from operator import add
lines = sc.textFile('../data/course.txt')
counts = lines.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
counts.collect()
Out[23]:
[(u'and', 3), (u'videos', 1), (u'exposes', 1), (u'as', 1), (u'including', 1), (u'frameworks,', 1), (u'cloud', 1), (u'even', 1), (u'managing', 1), (u'data', 4), (u'students', 1), (u'systems,', 1), (u'thousands', 1), (u'mining', 1), (u'This', 1), (u'technologies', 1), (u'hands-on', 1), (u'commodity', 1), (u'this', 1), (u'experience', 1), (u'enabling', 1), (u'centers.', 1), (u'amount', 1), (u'the', 2), (u'Information', 1), (u'computing', 1), (u'servers', 1), (u'course', 1), (u'in', 2), (u'Lecture', 1), (u'Description', 1), (u'Big', 1), (u'to', 1), (u'new', 1), (u'across', 1), (u'theory', 1), (u'processing', 1), (u'hundreds', 1), (u'parallel', 1), (u'both', 1), (u'technology.', 1), (u'of', 3), (u'emerge', 1), (u'Course', 2), (u'massive', 1), (u'or', 1)]
In [24]:
counts.sortBy(lambda x: x[1], False).collect()
Out[24]:
[(u'data', 4), (u'and', 3), (u'of', 3), (u'the', 2), (u'in', 2), (u'Course', 2), (u'videos', 1), (u'exposes', 1), (u'as', 1), (u'including', 1), (u'frameworks,', 1), (u'cloud', 1), (u'even', 1), (u'managing', 1), (u'students', 1), (u'systems,', 1), (u'thousands', 1), (u'mining', 1), (u'This', 1), (u'technologies', 1), (u'hands-on', 1), (u'commodity', 1), (u'this', 1), (u'experience', 1), (u'enabling', 1), (u'centers.', 1), (u'amount', 1), (u'Information', 1), (u'computing', 1), (u'servers', 1), (u'course', 1), (u'Lecture', 1), (u'Description', 1), (u'Big', 1), (u'to', 1), (u'new', 1), (u'across', 1), (u'theory', 1), (u'processing', 1), (u'hundreds', 1), (u'parallel', 1), (u'both', 1), (u'technology.', 1), (u'emerge', 1), (u'massive', 1), (u'or', 1)]
In [38]:
# Join simple example
products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")])
#trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")])
trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))])
print products.join(trans).collect()
[(1, ('Apple', (134, 'OK'))), (1, ('Apple', (135, 'OK'))), (1, ('Apple', (45, 'OK'))), (2, ('Orange', (53, 'OK'))), (3, ('TV', (34, 'OK'))), (5, ('Computer', (162, 'Error')))]
K-means clustering¶
In [39]:
import numpy as np
def parseVector(line):
return np.array([float(x) for x in line.split()])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt
lines = sc.textFile('../data/kmeans_data.txt', 5)
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt
# lines = sc.textFile('../data/kmeans_bigdata.txt', 5)
# lines is an RDD of strings
K = 3
convergeDist = 0.01
# terminate algorithm when the total distance from old center to new centers is less than this value
data = lines.map(parseVector).cache() # data is an RDD of arrays
kCenters = data.takeSample(False, K, 1) # intial centers as a list of arrays
tempDist = 1.0 # total distance from old centers to new centers
while tempDist > convergeDist:
closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
# for each point in data, find its closest center
# closest is an RDD of tuples (index of closest center, (point, 1))
pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
# pointStats is an RDD of tuples (index of center,
# (array of sums of coordinates, total number of points assigned))
newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
# compute the new centers
tempDist = sum(np.sum((kCenters[i] – p) ** 2) for (i, p) in newCenters)
# compute the total disctance from old centers to new centers
for (i, p) in newCenters:
kCenters[i] = p
print “Final centers: “, kCenters
Final centers: [array([ 0.05, 0.3 , 0.05]), array([ 0.2, 0.4, 0.6]), array([ 9.1 , 2.76666667, 6.16666667])]
PageRank¶
In [3]:
import re
from operator import add
def computeContribs(urls, rank):
# Calculates URL contributions to the rank of other URLs.
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)
def parseNeighbors(urls):
# Parses a urls pair string into urls pair.”””
parts = urls.split(‘ ‘)
return parts[0], parts[1]
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# …
# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile(“../data/pagerank_data.txt”, 2)
# lines = sc.textFile(“../data/dblp.in”, 5)
numOfIterations = 10
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)) \
.groupByKey()
# Loads all URLs with other URL(s) link to from input file
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks) \
.flatMap(lambda url_urls_rank:
computeContribs(url_urls_rank[1][0],
url_urls_rank[1][1]))
# After the join, each element in the RDD is of the form
# (url, (list of neighbor urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))
print ranks.top(5, lambda x: x[1])
[(u’1′, 1.2981882732854677), (u’3′, 0.9999999999999998), (u’4′, 0.9999999999999998), (u’2’, 0.7018117267145316)]
Join vs. Broadcast Variables¶
In [ ]:
products = sc.parallelize([(1, “Apple”), (2, “Orange”), (3, “TV”), (5, “Computer”)])
trans = sc.parallelize([(1, (134, “OK”)), (3, (34, “OK”)), (5, (162, “Error”)), (1, (135, “OK”)), (2, (53, “OK”)), (1, (45, “OK”))])
print trans.join(products).collect()
In [2]:
products = {1: “Apple”, 2: “Orange”, 3: “TV”, 5: “Computer”}
trans = sc.parallelize([(1, (134, “OK”)), (3, (34, “OK”)), (5, (162, “Error”)), (1, (135, “OK”)), (2, (53, “OK”)), (1, (45, “OK”))])
# broadcasted_products = sc.broadcast(products)
def f(x):
return (x[0], broadcasted_products.value[x[0]], x[1])
# results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print results.collect()
[(1, ‘Apple’, (134, ‘OK’)), (3, ‘TV’, (34, ‘OK’)), (5, ‘Computer’, (162, ‘Error’)), (1, ‘Apple’, (135, ‘OK’)), (2, ‘Orange’, (53, ‘OK’)), (1, ‘Apple’, (45, ‘OK’))]
In [ ]: