程序代写代做代考 cache python Finding Prime Numbers¶

Finding Prime Numbers¶
In [1]:
n = 500000
allnumbers = sc.parallelize(range(2, n), 8).cache()
composite = allnumbers.flatMap(lambda x: range(x*2, n, x)).repartition(8)
prime = allnumbers.subtract(composite)
print(prime.take(10))

[17, 97, 113, 193, 241, 257, 337, 353, 401, 433]
In [3]:
# Find the number of elements in each parttion
def partitionsize(it):
s = 0
for i in it:
s += 1
yield s

print(allnumbers.mapPartitions(partitionsize).collect())
print(composite.mapPartitions(partitionsize).collect())
print(prime.mapPartitions(partitionsize).collect())
print(prime.glom().collect()[1][0:4])

[62499, 62500, 62500, 62500, 62499, 62500, 62500, 62500]
[704805, 704790, 704800, 704800, 704800, 704799, 704800, 704816]
[0, 5169, 1, 5219, 0, 5206, 0, 5189, 0, 5165, 0, 5199, 0, 5191, 0, 5199]
[17, 97, 113, 193]

Data Partitioning¶
In [5]:
data = [8, 96, 240, 400, 1, 800]
rdd = sc.parallelize(zip(data, data),4)
print(rdd.partitioner)
print(rdd.glom().collect())
rdd = rdd.reduceByKey(lambda x,y: x+y)
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)

rdd1 = rdd.mapValues(lambda x: x+1)
print(rdd1.partitioner.partitionFunc)

rdd = rdd.sortByKey()
print(rdd.glom().collect())
print(rdd.partitioner.partitionFunc)

None
[[(8, 8)], [(96, 96), (240, 240)], [(400, 400)], [(1, 1), (800, 800)]]
[[(8, 8), (96, 96), (240, 240), (400, 400), (800, 800)], [(1, 1)], [], []]


[[(1, 1), (8, 8)], [(96, 96), (240, 240)], [(400, 400)], [(800, 800)]]
.rangePartitioner at 0x10e0b0158>
In [6]:
def partitionsize(it): yield len(list(it))

n = 40000

def f(x):
return x % 9

data1 = range(0, n, 16) + range(0, n, 16)
data2 = range(0, n, 8)
rdd1 = sc.parallelize(zip(data1, data2), 8)
print rdd1.mapPartitions(partitionsize).collect()
rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
print rdd2.mapPartitions(partitionsize).collect()
rdd3 = rdd2.partitionBy(8, f)
print rdd3.mapPartitions(partitionsize).collect()
rdd4 = rdd1.reduceByKey(lambda x,y: x+y, partitionFunc=f)
print rdd4.mapPartitions(partitionsize).collect()

File ““, line 11
print rdd1.mapPartitions(partitionsize).collect()
^
SyntaxError: invalid syntax
In [7]:
a = sc.parallelize(zip(range(10000), range(10000)), 8)
b = sc.parallelize(zip(range(10000), range(10000)), 8)
print(a.partitioner)
a = a.reduceByKey(lambda x,y: x+y)
b = b.reduceByKey(lambda x,y: x+y)
c = a.join(b)
print(c.getNumPartitions())
print(c.partitioner.partitionFunc)
print(c.glom().first()[0:4])

None
8

[(0, (0, 0)), (8, (8, 8)), (16, (16, 16)), (24, (24, 24))]

Partitioning in DataFrames¶
In [8]:
data1 = [1, 1, 1, 2, 2, 2, 3, 3, 3, 4]
data2 = [2, 2, 3, 4, 5, 3, 1, 1, 2, 3]
df = spark.createDataFrame(zip(data1, data2))
print(df.rdd.getNumPartitions())
print(df.rdd.glom().collect())

8
[[Row(_1=1, _2=2)], [Row(_1=1, _2=2)], [Row(_1=1, _2=3)], [Row(_1=2, _2=4), Row(_1=2, _2=5)], [Row(_1=2, _2=3)], [Row(_1=3, _2=1)], [Row(_1=3, _2=1)], [Row(_1=3, _2=2), Row(_1=4, _2=3)]]
In [9]:
df1 = df.repartition(6, df._1)
print(df1.rdd.glom().collect())
df1.show()

[[], [], [Row(_1=2, _2=4), Row(_1=2, _2=5), Row(_1=2, _2=3), Row(_1=4, _2=3)], [Row(_1=3, _2=1), Row(_1=3, _2=1), Row(_1=3, _2=2)], [], [Row(_1=1, _2=2), Row(_1=1, _2=2), Row(_1=1, _2=3)]]
+—+—+
| _1| _2|
+—+—+
| 2| 4|
| 2| 5|
| 2| 3|
| 4| 3|
| 3| 1|
| 3| 1|
| 3| 2|
| 1| 2|
| 1| 2|
| 1| 3|
+—+—+

In [10]:
# A ‘real’ example from SF Express
# Prepare three relational tables

from pyspark.sql.functions import *

num_waybills = 1000
num_customers = 100

rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybills = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias(‘waybill’),
floor(rand()*num_customers).alias(‘customer’)) \
.repartition(‘waybill’)\
.cache()
waybills.show()
print(waybills.count())

rdd = sc.parallelize((i, i) for i in range(num_customers))
customers = spark.createDataFrame(rdd, [‘customer’, ‘phone’]).cache()
customers.show()
print(customers.count())

rdd = sc.parallelize((i, ) for i in range(num_waybills))
waybill_status = spark.createDataFrame(rdd).select(floor(rand()*num_waybills).alias(‘waybill’),
floor(rand()*10).alias(‘version’)) \
.groupBy(‘waybill’).max(‘version’).cache()
waybill_status.show()
print(waybill_status.count())

+——-+——–+
|waybill|customer|
+——-+——–+
| 964| 89|
| 29| 32|
| 26| 5|
| 558| 63|
| 541| 15|
| 558| 69|
| 418| 54|
| 418| 64|
| 243| 5|
| 720| 55|
| 278| 60|
| 367| 41|
| 243| 45|
| 278| 30|
| 442| 51|
| 296| 32|
| 348| 29|
| 847| 0|
| 287| 21|
| 415| 3|
+——-+——–+
only showing top 20 rows

1000
+——–+—–+
|customer|phone|
+——–+—–+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+——–+—–+
only showing top 20 rows

100
+——-+————+
|waybill|max(version)|
+——-+————+
| 964| 9|
| 474| 3|
| 29| 6|
| 65| 0|
| 418| 7|
| 541| 6|
| 270| 4|
| 293| 1|
| 222| 9|
| 367| 6|
| 720| 9|
| 705| 9|
| 278| 3|
| 296| 7|
| 926| 7|
| 965| 8|
| 54| 5|
| 0| 0|
| 287| 5|
| 277| 4|
+——-+————+
only showing top 20 rows

625
In [11]:
# We want to join 3 tables together.
# Knowing how each table is partitioned helps optimize the join order.

waybills.join(customers, ‘customer’).join(waybill_status, ‘waybill’).show()
# waybills.join(waybill_status, ‘waybill’).join(customers, ‘customer’).show()

+——-+——–+—–+————+
|waybill|customer|phone|max(version)|
+——-+——–+—–+————+
| 964| 89| 89| 9|
| 29| 32| 32| 6|
| 541| 15| 15| 6|
| 418| 54| 54| 7|
| 418| 64| 64| 7|
| 720| 55| 55| 9|
| 278| 60| 60| 3|
| 367| 41| 41| 6|
| 278| 30| 30| 3|
| 296| 32| 32| 7|
| 287| 21| 21| 5|
| 299| 15| 15| 7|
| 167| 65| 65| 6|
| 347| 90| 90| 1|
| 347| 89| 89| 1|
| 857| 96| 96| 6|
| 347| 39| 39| 1|
| 857| 15| 15| 6|
| 724| 59| 59| 9|
| 724| 33| 33| 9|
+——-+——–+—–+————+
only showing top 20 rows

Threading¶
In [12]:
import threading
import random

partitions = 10
n = 500000 * partitions

# use different seeds in different threads and different partitions
# a bit ugly, since mapPartitionsWithIndex takes a function with only index
# and it as parameters
def f1(index, it):
random.seed(index + 987231)
for i in it:
x = random.random() * 2 – 1
y = random.random() * 2 – 1
yield 1 if x ** 2 + y ** 2 < 1 else 0 def f2(index, it): random.seed(index + 987232) for i in it: x = random.random() * 2 - 1 y = random.random() * 2 - 1 yield 1 if x ** 2 + y ** 2 < 1 else 0 def f3(index, it): random.seed(index + 987233) for i in it: x = random.random() * 2 - 1 y = random.random() * 2 - 1 yield 1 if x ** 2 + y ** 2 < 1 else 0 def f4(index, it): random.seed(index + 987234) for i in it: x = random.random() * 2 - 1 y = random.random() * 2 - 1 yield 1 if x ** 2 + y ** 2 < 1 else 0 def f5(index, it): random.seed(index + 987245) for i in it: x = random.random() * 2 - 1 y = random.random() * 2 - 1 yield 1 if x ** 2 + y ** 2 < 1 else 0 f = [f1, f2, f3, f4, f5] # the function executed in each thread/job def dojob(i): count = sc.parallelize(range(1, n + 1), partitions) \ .mapPartitionsWithIndex(f[i]).reduce(lambda a,b: a+b) print("Worker", i, "reports: Pi is roughly", 4.0 * count / n) # create and execute the threads threads = [] for i in range(5): t = threading.Thread(target=dojob, args=(i,)) threads += [t] t.start() # wait for all threads to complete for t in threads: t.join() ''' for i in range(5): dojob(i) ''' Worker 0 reports: Pi is roughly 3.1426512 Worker 1 reports: Pi is roughly 3.1422232 Worker 2 reports: Pi is roughly 3.1424288 Worker 4 reports: Pi is roughly 3.1419832 Worker 3 reports: Pi is roughly 3.142096 Out[12]: '\nfor i in range(5):\n dojob(i)\n' In [ ]: