程序代写代做代考 algorithm python cache file system How do I make an RDD?¶

How do I make an RDD?¶
RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from files on the local drive. All data files can be downloaded from https://www.cse.ust.hk/msbd5003/data/
In [1]:
import findspark
findspark.init()
import pyspark
In [2]:
from pyspark.sql import SparkSession
In [3]:
# Read data from local file system:
fruits = sc.textFile(‘/Users/kenny/Desktop/Spark/data/fruits.txt’)
yellowThings = sc.textFile(‘/Users/kenny/Desktop/Spark/data/yellowthings.txt’)
print (fruits.collect())
print (yellowThings.collect())

[‘apple’, ‘banana’, ‘canary melon’, ‘grape’, ‘lemon’, ‘orange’, ‘pineapple’, ‘strawberry’]
[‘banana’, ‘bee’, ‘butter’, ‘canary melon’, ‘gold’, ‘lemon’, ‘pineapple’, ‘sunflower’]
In [4]:
# Read data from HDFS :
fruits = sc.textFile(‘/Users/kenny/Desktop/Spark/data/fruits.txt’)
fruits.collect()
Out[4]:
[‘apple’,
‘banana’,
‘canary melon’,
‘grape’,
‘lemon’,
‘orange’,
‘pineapple’,
‘strawberry’]

RDD operations¶
In [5]:
# map
fruitsReversed = fruits.map(lambda fruit: fruit[::-1])
In [6]:
fruitsReversed.unpersist()
# try changing the file and re-execute with and without cache
fruitsReversed.collect()
Out[6]:
[‘elppa’,
‘ananab’,
‘nolem yranac’,
‘eparg’,
‘nomel’,
‘egnaro’,
‘elppaenip’,
‘yrrebwarts’]
In [7]:
# filter
shortFruits = fruits.filter(lambda fruit: len(fruit) <= 5) shortFruits.collect() Out[7]: ['apple', 'grape', 'lemon'] In [8]: # flatMap characters = fruits.flatMap(lambda fruit: list(fruit)) characters.collect() Out[8]: ['a', 'p', 'p', 'l', 'e', 'b', 'a', 'n', 'a', 'n', 'a', 'c', 'a', 'n', 'a', 'r', 'y', ' ', 'm', 'e', 'l', 'o', 'n', 'g', 'r', 'a', 'p', 'e', 'l', 'e', 'm', 'o', 'n', 'o', 'r', 'a', 'n', 'g', 'e', 'p', 'i', 'n', 'e', 'a', 'p', 'p', 'l', 'e', 's', 't', 'r', 'a', 'w', 'b', 'e', 'r', 'r', 'y'] In [9]: # union fruitsAndYellowThings = fruits.union(yellowThings) fruitsAndYellowThings.collect() Out[9]: ['apple', 'banana', 'canary melon', 'grape', 'lemon', 'orange', 'pineapple', 'strawberry', 'banana', 'bee', 'butter', 'canary melon', 'gold', 'lemon', 'pineapple', 'sunflower'] In [10]: # intersection yellowFruits = fruits.intersection(yellowThings) yellowFruits.collect() Out[10]: ['pineapple', 'canary melon', 'lemon', 'banana'] In [12]: # distinct distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct() distinctFruitsAndYellowThings.collect() Out[12]: ['orange', 'pineapple', 'canary melon', 'grape', 'lemon', 'bee', 'banana', 'butter', 'gold', 'sunflower', 'apple', 'strawberry'] RDD actions¶ Following are examples of some of the common actions available. For a detailed list, see RDD Actions. Run some transformations below to understand this better. Place the cursor in the cell and press SHIFT + ENTER. In [13]: # collect fruitsArray = fruits.collect() yellowThingsArray = yellowThings.collect() fruitsArray Out[13]: ['apple', 'banana', 'canary melon', 'grape', 'lemon', 'orange', 'pineapple', 'strawberry'] In [14]: # count numFruits = fruits.count() numFruits Out[14]: 8 In [15]: # take first3Fruits = fruits.take(3) first3Fruits Out[15]: ['apple', 'banana', 'canary melon'] In [16]: # reduce letterSet = fruits.map(lambda fruit: set(fruit)).reduce(lambda x, y: x.union(y)) letterSet Out[16]: {' ', 'a', 'b', 'c', 'e', 'g', 'i', 'l', 'm', 'n', 'o', 'p', 'r', 's', 't', 'w', 'y'} In [17]: letterSet = fruits.flatMap(lambda fruit: list(fruit)).distinct().collect() letterSet Out[17]: ['p', 'l', 'b', 'c', 'r', 'y', 'g', 'i', 's', 'a', 'e', 'n', ' ', 'm', 'o', 't', 'w'] Closure¶ In [18]: counter = 0 rdd = sc.parallelize(range(10)) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print (counter) 0 In [19]: rdd = sc.parallelize(range(10)) accum = sc.accumulator(0) def g(x): global accum accum += x a = rdd.foreach(g) print(accum.value) 45 In [20]: rdd = sc.parallelize(range(10)) accum = sc.accumulator(0) def g(x): global accum accum += x return x * x a = rdd.map(g) print (accum.value) print (rdd.reduce(lambda x, y: x+y)) a.cache() print(accum.value) tmp = a.count() print (accum.value) print (rdd.reduce(lambda x, y: x+y)) tmp = a.count() print (accum.value) print (rdd.reduce(lambda x, y: x+y)) 0 45 0 45 45 45 45 In [21]: from operator import add rdd = sc.parallelize(range(10)) print(rdd.sum()) 45 Computing Pi using Monte Carlo simulation¶ In [22]: # From the official spark examples. import sys import random partitions = 1 n = 100000 * partitions def f(_): x = random.random() y = random.random() return 1 if x ** 2 + y ** 2 < 1 else 0 count = sc.parallelize(range(1, n + 1), partitions) \ .map(f).sum() print("Pi is roughly", 4.0 * count / n) Pi is roughly 3.1466 In [23]: import sys import random a = sc.parallelize(range(0,20),4) print(a.collect()) print(a.glom().collect()) a.map(lambda x: random.random()).glom().collect() def f(index, it): s = index for i in it: s += i yield s a.mapPartitionsWithIndex(f).collect() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]] Out[23]: [0, 1, 3, 6, 10, 6, 12, 19, 27, 36, 12, 23, 35, 48, 62, 18, 34, 51, 69, 88] In [24]: # Correct version partitions = 100 n = 100000 * partitions def f(index, it): random.seed(index + 987236) for i in it: x = random.random() y = random.random() yield 1 if x ** 2 + y ** 2 < 1 else 0 count = sc.parallelize(range(1, n + 1), partitions) \ .mapPartitionsWithIndex(f).reduce(lambda a,b: a+b) print ("Pi is roughly", 4.0 * count / n) Pi is roughly 3.1410596 Closure and Persistence¶ In [25]: A = sc.parallelize(range(10)) x = 5 B = A.filter(lambda z: z < x) B.unpersist() # print B.take(10) print(B.collect()) x = 3 #print B.take(10) print(B.collect()) # collect() doesn't always re-collect data - bad design! [0, 1, 2, 3, 4] [0, 1, 2, 3, 4] In [26]: # RDD variables are references A = sc.parallelize(range(10)) B = A.map(lambda x: x*2) A = B.map(lambda x: x+1) A.take(10) Out[26]: [1, 3, 5, 7, 9, 11, 13, 15, 17, 19] In [27]: # Linear-time selection data = [34, 67, 21, 56, 47, 89, 12, 44, 74, 43, 26] A = sc.parallelize(data,2) k = 4 while True: x = A.first() A1 = A.filter(lambda z: z < x) A2 = A.filter(lambda z: z > x)
mid = A1.count()
if mid == k:
print(x)
break

if k < mid: A = A1 else: A = A2 k = k - mid - 1 A.cache() 43 In [28]: sorted(data) Out[28]: [12, 21, 26, 34, 43, 44, 47, 56, 67, 74, 89] Key-Value Pairs¶ In [29]: # reduceByKey numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y) numFruitsByLength.collect() Out[29]: [(6, 2), (12, 1), (10, 1), (5, 3), (9, 1)] In [30]: from operator import add lines = sc.textFile('/Users/kenny/Desktop/MSBD5003/data/course.txt') counts = lines.flatMap(lambda x: x.split()) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.collect() Out[30]: [('Course', 2), ('Information', 1), ('systems,', 1), ('cloud', 1), ('parallel', 1), ('as', 1), ('in', 2), ('mining', 1), ('massive', 1), ('amount', 1), ('of', 3), ('even', 1), ('servers', 1), ('centers.', 1), ('both', 1), ('hands-on', 1), ('this', 1), ('new', 1), ('Lecture', 1), ('videos', 1), ('Description', 1), ('Big', 1), ('data', 4), ('including', 1), ('computing', 1), ('and', 3), ('processing', 1), ('frameworks,', 1), ('emerge', 1), ('enabling', 1), ('technologies', 1), ('managing', 1), ('the', 2), ('across', 1), ('hundreds', 1), ('or', 1), ('thousands', 1), ('commodity', 1), ('This', 1), ('course', 1), ('exposes', 1), ('students', 1), ('to', 1), ('theory', 1), ('experience', 1), ('technology.', 1)] In [31]: counts.sortBy(lambda x: x[1], False).collect() Out[31]: [('data', 4), ('of', 3), ('and', 3), ('Course', 2), ('in', 2), ('the', 2), ('Information', 1), ('systems,', 1), ('cloud', 1), ('parallel', 1), ('as', 1), ('mining', 1), ('massive', 1), ('amount', 1), ('even', 1), ('servers', 1), ('centers.', 1), ('both', 1), ('hands-on', 1), ('this', 1), ('new', 1), ('Lecture', 1), ('videos', 1), ('Description', 1), ('Big', 1), ('including', 1), ('computing', 1), ('processing', 1), ('frameworks,', 1), ('emerge', 1), ('enabling', 1), ('technologies', 1), ('managing', 1), ('across', 1), ('hundreds', 1), ('or', 1), ('thousands', 1), ('commodity', 1), ('This', 1), ('course', 1), ('exposes', 1), ('students', 1), ('to', 1), ('theory', 1), ('experience', 1), ('technology.', 1)] In [32]: # Join simple example products = sc.parallelize([(1, "Apple"), (2, "Orange"), (3, "TV"), (5, "Computer")]) #trans = sc.parallelize([(1, 134, "OK"), (3, 34, "OK"), (5, 162, "Error"), (1, 135, "OK"), (2, 53, "OK"), (1, 45, "OK")]) trans = sc.parallelize([(1, (134, "OK")), (3, (34, "OK")), (5, (162, "Error")), (1, (135, "OK")), (2, (53, "OK")), (1, (45, "OK"))]) print(products.join(trans).collect()) [(1, ('Apple', (134, 'OK'))), (1, ('Apple', (135, 'OK'))), (1, ('Apple', (45, 'OK'))), (2, ('Orange', (53, 'OK'))), (3, ('TV', (34, 'OK'))), (5, ('Computer', (162, 'Error')))] K-means clustering¶ In [33]: import numpy as np def parseVector(line): return np.array([float(x) for x in line.split()]) def closestPoint(p, centers): bestIndex = 0 closest = float("+inf") for i in range(len(centers)): tempDist = np.sum((p - centers[i]) ** 2) if tempDist < closest: closest = tempDist bestIndex = i return bestIndex # The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_data.txt lines = sc.textFile('/Users/kenny/Desktop/MSBD5003/data/kmeans_data.txt', 5) # The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/kmeans_bigdata.txt # lines = sc.textFile('../data/kmeans_bigdata.txt', 5) # lines is an RDD of strings K = 3 convergeDist = 0.01 # terminate algorithm when the total distance from old center to new centers is less than this value data = lines.map(parseVector).cache() # data is an RDD of arrays kCenters = data.takeSample(False, K, 1) # intial centers as a list of arrays tempDist = 1.0 # total distance from old centers to new centers while tempDist > convergeDist:
closest = data.map(lambda p: (closestPoint(p, kCenters), (p, 1)))
# for each point in data, find its closest center
# closest is an RDD of tuples (index of closest center, (point, 1))

pointStats = closest.reduceByKey(lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]))
# pointStats is an RDD of tuples (index of center,
# (array of sums of coordinates, total number of points assigned))

newCenters = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
# compute the new centers

tempDist = sum(np.sum((kCenters[i] – p) ** 2) for (i, p) in newCenters)
# compute the total disctance from old centers to new centers

for (i, p) in newCenters:
kCenters[i] = p

print (“Final centers: “, kCenters)

Final centers: [array([0.1 , 0.33333333, 0.23333333]), array([9.05, 3.05, 4.65]), array([9.2, 2.2, 9.2])]

PageRank¶
In [34]:
import re
from operator import add

def computeContribs(urls, rank):
# Calculates URL contributions to the rank of other URLs.
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)

def parseNeighbors(urls):
# Parses a urls pair string into urls pair.”””
parts = urls.split(‘ ‘)
return parts[0], parts[1]

# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# …

# The data file can be downloaded at http://www.cse.ust.hk/msbd5003/data/*
lines = sc.textFile(“/Users/kenny/Desktop/MSBD5003/data/pagerank_data.txt”, 2)
# lines = sc.textFile(“../data/dblp.in”, 5)

numOfIterations = 10

# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)) \
.groupByKey()

# Loads all URLs with other URL(s) link to from input file
# and initialize ranks of them to one.
ranks = links.mapValues(lambda neighbors: 1.0)

# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(numOfIterations):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks) \
.flatMap(lambda url_urls_rank:
computeContribs(url_urls_rank[1][0],
url_urls_rank[1][1]))
# After the join, each element in the RDD is of the form
# (url, (list of neighbor urls, rank))

# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# ranks = contribs.reduceByKey(add).map(lambda (url, rank): (url, rank * 0.85 + 0.15))

print(ranks.top(5, lambda x: x[1]))

[(‘1’, 1.2981882732854677), (‘4’, 0.9999999999999998), (‘3’, 0.9999999999999998), (‘2′, 0.7018117267145316)]
In [35]:
from pyspark.sql.functions import *

numOfIterations = 10

lines = spark.read.text(“/Users/kenny/Desktop/MSBD5003/data/pagerank_data.txt”)
# You can also test your program on the follow larger data set:
# lines = spark.read.text(“dblp.in”)
In [36]:
a = lines.select(split(lines[0],’ ‘))
links = a.select(a[0][0].alias(‘src’), a[0][1].alias(‘dst’))
outdegrees = links.groupBy(‘src’).count()
ranks = outdegrees.select(‘src’, lit(1).alias(‘rank’))
In [39]:
for iteration in range(numOfIterations):
contribs = links.join(ranks, ‘src’).join(outdegrees,’src’,(ranks[‘rank’]/outdegrees[‘count’]).alias(‘contrib’))
ranks = contribs.groupby(‘src’).sum(‘contrib’)/(column(‘sum(contrib)’)*0.85+0.15).alias(‘rank’)
ranks.orderBy(desc(‘rank’)).show()

—————————————————————————
AssertionError Traceback (most recent call last)
in
1 for iteration in range(numOfIterations):
—-> 2 contribs = links.join(ranks, ‘src’).join(outdegrees,’src’,(ranks[‘rank’]/outdegrees[‘count’]).alias(‘contrib’))
3 ranks = contribs.groupby(‘src’).sum(‘contrib’)/(column(‘sum(contrib)’)*0.85+0.15).alias(‘rank’)
4 ranks.orderBy(desc(‘rank’)).show()

/anaconda3/lib/python3.6/site-packages/pyspark/sql/dataframe.py in join(self, other, on, how)
1046 if on is None:
1047 on = self._jseq([])
-> 1048 assert isinstance(how, basestring), “how should be basestring”
1049 jdf = self._jdf.join(other._jdf, on, how)
1050 return DataFrame(jdf, self.sql_ctx)

AssertionError: how should be basestring

Join vs. Broadcast Variables¶
In [38]:
products = sc.parallelize([(1, “Apple”), (2, “Orange”), (3, “TV”), (5, “Computer”)])
trans = sc.parallelize([(1, (134, “OK”)), (3, (34, “OK”)), (5, (162, “Error”)), (1, (135, “OK”)), (2, (53, “OK”)), (1, (45, “OK”))])

print(trans.join(products).collect())

[(1, ((134, ‘OK’), ‘Apple’)), (1, ((135, ‘OK’), ‘Apple’)), (1, ((45, ‘OK’), ‘Apple’)), (2, ((53, ‘OK’), ‘Orange’)), (3, ((34, ‘OK’), ‘TV’)), (5, ((162, ‘Error’), ‘Computer’))]
In [1]:
products = {1: “Apple”, 2: “Orange”, 3: “TV”, 5: “Computer”}
trans = sc.parallelize([(1, (134, “OK”)), (3, (34, “OK”)), (5, (162, “Error”)), (1, (135, “OK”)), (2, (53, “OK”)), (1, (45, “OK”))])

# broadcasted_products = sc.broadcast(products)

def f(x):
return (x[0], broadcasted_products.value[x[0]], x[1])

# results = trans.map(lambda x: (x[0], broadcasted_products.value[x[0]], x[1]))
results = trans.map(lambda x: (x[0], products[x[0]], x[1]))
print(results.collect())

[(1, ‘Apple’, (134, ‘OK’)), (3, ‘TV’, (34, ‘OK’)), (5, ‘Computer’, (162, ‘Error’)), (1, ‘Apple’, (135, ‘OK’)), (2, ‘Orange’, (53, ‘OK’)), (1, ‘Apple’, (45, ‘OK’))]
In [ ]: