Finding Prime Numbers¶
In [1]:
n = 500000
allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x)).repartition(8)
prime = allnumbers.subtract(composite)
print(prime.take(10))
[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
In [3]:
# Find the number of elements in each parttion
def partitionsize(it):
s = 0
for i in it:
s += 1
yield s
print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[1][0:4])
[62499, 62500, 62500, 62500, 62499, 62500, 62500, 62500]
[704805, 704790, 704800, 704800, 704800, 704799, 704800, 704816]
[0, 5169, 1, 5219, 0, 5206, 0, 5189, 0, 5165, 0, 5199, 0, 5191, 0, 5199]
[17, 97, 113, 193]
Data Partitioning¶
In [5]:
data = [8, 96, 240, 400, 1, 800]
rdd = sc.parallelize(zip(data, data),4)
print(rdd.partitioner)
print(rdd.glom().collect())
rdd = rdd.reduceByKey(lambda x,y: x+y)
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)
rdd1 = rdd.mapValues(lambda x: x+1)
print(rdd1.partitioner.partitionFunc)
rdd = rdd.sortByKey()
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)
None
[[(8, 8)], [(96, 96), (240, 240)], [(400, 400)], [(1, 1), (800, 800)]]
[[(8, 8), (96, 96), (240, 240), (400, 400), (800, 800)], [(1, 1)], [], []]
[[(1, 1), (8, 8)], [(96, 96), (240, 240)], [(400, 400)], [(800, 800)]]
In [6]:
def partitionsize(it): yield len(list(it))
n = 40000
def f(x):
return x % 9
data1 = range(0, n, 16) + range(0, n, 16)
data2 = range(0, n, 8)
rdd1 = sc.parallelize(zip(data1, data2), 8)
print rdd1.mapPartitions(partitionsize).collect()
rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
print rdd2.mapPartitions(partitionsize).collect()
rdd3 = rdd2.partitionBy(8, f)
print rdd3.mapPartitions(partitionsize).collect()
rdd4 = rdd1.reduceByKey(lambda x,y: x+y, partitionFunc=f)
print rdd4.mapPartitions(partitionsize).collect()
File “
print rdd1.mapPartitions(partitionsize).collect()
^
SyntaxError: invalid syntax
In [7]:
a = sc.parallelize(zip(range(10000), range(10000)), 8)
b = sc.parallelize(zip(range(10000), range(10000)), 8)
print(a.partitioner)
a = a.reduceByKey(lambda x,y: x+y)
b = b.reduceByKey(lambda x,y: x+y)
c = a.join(b)
print(c.getNumPartitions())
print(c.partitioner.partitionFunc)
print(c.glom().first()[0:4])
None
8
[(0, (0, 0)), (8, (8, 8)), (16, (16, 16)), (24, (24, 24))]
Partitioning in DataFrames¶
In [8]:
data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())
8
[[Row(_1=1, _2=2)], [Row(_1=1, _2=2)], [Row(_1=1, _2=3)], [Row(_1=2, _2=4), Row(_1=2, _2=5)], [Row(_1=2, _2=3)], [Row(_1=3, _2=1)], [Row(_1=3, _2=1)], [Row(_1=3, _2=2), Row(_1=4, _2=3)]]
In [9]:
df1 = df.repartition(6, df._1)
print(df1.rdd.glom().collect())
df1.show()
[[], [], [Row(_1=2, _2=4), Row(_1=2, _2=5), Row(_1=2, _2=3), Row(_1=4, _2=3)], [Row(_1=3, _2=1), Row(_1=3, _2=1), Row(_1=3, _2=2)], [], [Row(_1=1, _2=2), Row(_1=1, _2=2), Row(_1=1, _2=3)]]
+—+—+
| _1| _2|
+—+—+
| 2| 4|
| 2| 5|
| 2| 3|
| 4| 3|
| 3| 1|
| 3| 1|
| 3| 2|
| 1| 2|
| 1| 2|
| 1| 3|
+—+—+
In [10]:
# A ‘real’ example from SF Express
# Prepare three relational tables
from pyspark.sql.functions import *
num_waybills = 1000
num_customers = 100
rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybills = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias(‘waybill’),
floor(rand()*num_customers).alias(‘customer’)) \
.repartition(‘waybill’)\
.cache()
waybills.show()
print(waybills.count())
rdd = sc.parallelize((i, i) for i in range(num_customers))
customers = spark.createDataFrame(rdd, [‘customer’, ‘phone’]).cache()
customers.show()
print(customers.count())
rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybill_status = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias(‘waybill’),
floor(rand()*10).alias(‘version’)) \
.groupBy(‘waybill’).max(‘version’).cache()
waybill_status.show()
print(waybill_status.count())
+——-+——–+
|waybill|customer|
+——-+——–+
| 964| 89|
| 29| 32|
| 26| 5|
| 558| 63|
| 541| 15|
| 558| 69|
| 418| 54|
| 418| 64|
| 243| 5|
| 720| 55|
| 278| 60|
| 367| 41|
| 243| 45|
| 278| 30|
| 442| 51|
| 296| 32|
| 348| 29|
| 847| 0|
| 287| 21|
| 415| 3|
+——-+——–+
only showing top 20 rows
1000
+——–+—–+
|customer|phone|
+——–+—–+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+——–+—–+
only showing top 20 rows
100
+——-+————+
|waybill|max(version)|
+——-+————+
| 964| 9|
| 474| 3|
| 29| 6|
| 65| 0|
| 418| 7|
| 541| 6|
| 270| 4|
| 293| 1|
| 222| 9|
| 367| 6|
| 720| 9|
| 705| 9|
| 278| 3|
| 296| 7|
| 926| 7|
| 965| 8|
| 54| 5|
| 0| 0|
| 287| 5|
| 277| 4|
+——-+————+
only showing top 20 rows
625
In [11]:
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.
waybills.join(customers, ‘customer’).join(waybill_status, ‘waybill’).show()
# waybills.join(waybill_status, ‘waybill’).join(customers, ‘customer’).show()
+——-+——–+—–+————+
|waybill|customer|phone|max(version)|
+——-+——–+—–+————+
| 964| 89| 89| 9|
| 29| 32| 32| 6|
| 541| 15| 15| 6|
| 418| 54| 54| 7|
| 418| 64| 64| 7|
| 720| 55| 55| 9|
| 278| 60| 60| 3|
| 367| 41| 41| 6|
| 278| 30| 30| 3|
| 296| 32| 32| 7|
| 287| 21| 21| 5|
| 299| 15| 15| 7|
| 167| 65| 65| 6|
| 347| 90| 90| 1|
| 347| 89| 89| 1|
| 857| 96| 96| 6|
| 347| 39| 39| 1|
| 857| 15| 15| 6|
| 724| 59| 59| 9|
| 724| 33| 33| 9|
+——-+——–+—–+————+
only showing top 20 rows
Threading¶
In [12]:
import threading
import random
partitions = 10
n = 500000 * partitions
# use different seeds in different threads and different partitions
# a bit ugly, since mapPartitionsWithIndex takes a function with only index
# and it as parameters
def f1(index, it):
random.seed(index + 987231)
for i in it:
x = random.random() * 2 – 1
y = random.random() * 2 – 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f2(index, it):
random.seed(index + 987232)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f3(index, it):
random.seed(index + 987233)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f4(index, it):
random.seed(index + 987234)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
def f5(index, it):
random.seed(index + 987245)
for i in it:
x = random.random() * 2 - 1
y = random.random() * 2 - 1
yield 1 if x ** 2 + y ** 2 < 1 else 0
f = [f1, f2, f3, f4, f5]
# the function executed in each thread/job
def dojob(i):
count = sc.parallelize(range(1, n + 1), partitions) \
.mapPartitionsWithIndex(f[i]).reduce(lambda a,b: a+b)
print("Worker", i, "reports: Pi is roughly", 4.0 * count / n)
# create and execute the threads
threads = []
for i in range(5):
t = threading.Thread(target=dojob, args=(i,))
threads += [t]
t.start()
# wait for all threads to complete
for t in threads:
t.join()
'''
for i in range(5):
dojob(i)
'''
Worker 0 reports: Pi is roughly 3.1426512
Worker 1 reports: Pi is roughly 3.1422232
Worker 2 reports: Pi is roughly 3.1424288
Worker 4 reports: Pi is roughly 3.1419832
Worker 3 reports: Pi is roughly 3.142096
Out[12]:
'\nfor i in range(5):\n dojob(i)\n'
In [ ]: