代写 Spark Question 1

Question 1
1 / 1 pts
Find all distinct countries.
Hint: use select(), distinct()
Your Answer:
a= df.select(‘Country’).distinct()
a.show(50,False)
 
Question 2
1 / 1 pts
Find the Name and Price of sales records in Brazil.
Hint: use filter().
Your Answer:
b= df.filter(df[‘Country’] == ‘Brazil’)
b.select(‘Name’, ‘Price’).show()
 
Question 3
1 / 1 pts
For each country, find the total Price.
Hint: Use groupBy()
 
Your Answer:
df.groupBy(‘Country’).sum(‘Price’).show(50)
 
Question 4
2 / 2 pts
List countries by their total Price in descending order.
Hint: Use orderBy()
Your Answer:
t= df.groupBy(‘Country’).sum(‘Price’)
t.orderBy(‘sum(Price)’,ascending=False).show(50)
 
Question 5
2 / 2 pts
Now load a second table ‘countries’:
http://www.cse.ust.hk/msbd5003/data/countries.csv 
df2 = spark.read.csv(‘countries.csv’, header=True, inferSchema=True)
Redo Question 3, but replace the country names by their IDs.
Hint: Use join()
Your Answer:
t= df.join(df2, df.Country == df2.Country)
c= t.groupBy(‘ID’).sum(‘Price’)
c.select(‘ID’, ‘sum(Price)’).show(50)
 
Question 6
1 / 3 pts
Rewrite the PageRank example using DataFrame API.  Here is a skeleton of the code.  Your job is to fill in the missing part.  The data files can be downloaded at:
https://www.cse.ust.hk/msbd5003/data/pagerank_data.txt
https://www.cse.ust.hk/msbd5003/data/dblp.in
from pyspark.sql.functions import *

numOfIterations = 10

lines = spark.read.text(“pagerank_data.txt”)
# You can also test your program on the follow larger data set:
# lines = spark.read.text(“dblp.in”)

a = lines.select(split(lines[0],’ ‘))
links = a.select(a[0][0].alias(‘src’), a[0][1].alias(‘dst’))
outdegrees = links.groupBy(‘src’).count()
ranks = outdegrees.select(‘src’, lit(1).alias(‘rank’))

for iteration in range(numOfIterations):
# FILL IN THIS PART

ranks.orderBy(desc(‘rank’)).show()
Note: There is a bug in the current SparkSQL implementation: The groupBy (followed by some aggregation) method sometimes fails to group all rows with the same key.  A temporary workaround is the following:
Suppose you want to do
df.groupBy(‘A’).sum(‘B’)
If it fails to produce the desired result, try
df.withColumnRenamed(‘A’, ‘A1’).groupBy(‘A1’).sum(‘B’)
We have reported this bug to the Spark developers and the issue is currently under investigation:
https://issues.apache.org/jira/browse/SPARK-20169 (Links to an external site.)Links to an external site.
Your Answer:
from pyspark.sql.functions import *
numOfIterations = 10
lines = spark.read.text(“pagerank_data.txt”)
a = lines.select(split(lines[0],’ ‘))
links = a.select(a[0][0].alias(‘src’), a[0][1].alias(‘dst’))
outdegrees = links.groupBy(‘src’).count()
ranks = outdegrees.select(‘src’, lit(1).alias(‘rank’))

# joining outdegrees and ranks dataframe based on ‘src’ column before entering the loop
joined_df = outdegrees.join(ranks, on=’src’)
joined_df = joined_df.withColumn(‘X’, (col(‘rank’)/col(‘count’)))
for iteration in range(numOfIterations):
 joined_df = joined_df.withColumn(‘Result’, (col(‘rank’)*col(‘X’)))
 ranks = links.join(joined_df, on=’src’, how=’left’).select(‘dst’,’Result’)
 ranks = ranks.withColumnRenamed(‘dst’,’PageName’).groupBy(‘PageName’).sum(‘Result’)
 joined_df = ranks.join(joined_df, joined_df.src==ranks.PageName).drop(ranks.PageName)
 joined_df = joined_df.drop(‘rank’).withColumnRenamed(“sum(Result)”, “rank”)

ranks = ranks.withColumnRenamed(“sum(Result)”, “rank”)
ranks.orderBy(desc(‘rank’)).show()