程序代写代做 algorithm Java -sandbox

-sandbox
Spotify Music Recommendation System by Hongyang (Bruce) Yang¶

-sandbox
1. Load and display the data¶
In [3]:
%sql
SELECT * FROM tracks_csv
In [4]:
%sql

SELECT count(*) FROM tracks_csv
In [5]:
%sql
SELECT * FROM music_csv
In [6]:
%sql

SELECT count(*) FROM music_csv
In [7]:
%sql
SELECT count(Distinct TrackID) FROM music_csv
In [8]:
%sql
SELECT * FROM cust_csv
In [9]:
%sql

SELECT count(*) FROM cust_csv
In [10]:
%sql
SELECT count(Distinct CustID) FROM cust_csv
In [11]:
trackDF = sqlContext.read.format(“csv”).options(header=’true’, inferSchema=’true’).load(“/FileStore/tables/tracks.csv”)
In [12]:
trackDF.printSchema()
In [13]:
display(trackDF)
In [14]:
musicDF = sqlContext.read.format(“csv”).options(header=’true’, inferSchema=’true’).load(“/FileStore/tables/music.csv”)
In [15]:
musicDF.printSchema()
In [16]:
display(musicDF)
In [17]:
customerDF = sqlContext.read.format(“csv”).options(header=’true’, inferSchema=’true’).load(“/FileStore/tables/cust.csv”)
In [18]:
customerDF.printSchema()
In [19]:
display(customerDF)

-sandbox
2. Exploratory Data Analysis in SQL¶
In [21]:
CustName = customerDF.select(“*”).withColumnRenamed(“CustID”,’CCustID’)
display(CustName)
In [22]:
MusicName = musicDF.select(“*”).withColumnRenamed(“TrackID”,’MTrackID’)
display(MusicName)
In [23]:
from pyspark.sql.functions import col
track_custJoin=trackDF.join(CustName, trackDF.CustID == CustName.CCustID, “left_outer”)
In [24]:
track_allJoin = track_custJoin.join(MusicName,track_custJoin.TrackId == MusicName.MTrackID,”left_outer”)
In [25]:
display(track_allJoin)
In [26]:
track_allJoin.createOrReplaceTempView(“track_allJoin”)
In [27]:
%sql
SELECT CustID, Name, Count(DISTINCT TrackID) AS NumOfTrack
FROM track_allJoin
Group By CustID, Name
Order BY NumOfTrack DESC

-sandbox That means Gregory Koval (CustID = 0) listened a total of 1617 out of 1735 tracks.
He almost finished all the tracks. He really loves music!
In [29]:
%sql
SELECT count(Distinct TrackID) FROM track_allJoin
WHERE CustID = 0
In [30]:
%sql
SELECT CustID, Name,count(TrackID) as mobileCount
FROM track_allJoin
WHERE mobile = 0
GROUP BY CustID,Name
ORDER BY mobileCount DESC

-sandbox Paula Peltier really likes listening music using mobile device
In [32]:
%sql
SELECT TrackID, Title, Count(DISTINCT CustID ) AS NumOfCust
FROM track_allJoin
Group By TrackID, Title
Order BY NumOfCust DESC

-sandbox That means Caught Up in You (TrackId = 0) is the most popular music, there are 4190 out of 5000 customers ever listened to this track
In [34]:
%sql
SELECT DateTime FROM track_allJoin
WHERE CustID = 0
In [35]:
#unix_timestamp(DateTime,”MM/dd/yyyy HH:mm:ss”).cast(“double”).cast(“timestamp”)
In [36]:
%sql
SELECT CustID, Name, TrackId, Title, Length,Mobile, split(DateTime, ” “)[0] as DateListen, split(DateTime, ” “)[1] as TimeListen
FROM track_allJoin
In [37]:
from pyspark.sql.functions import split

split_datetime = split(track_allJoin[‘DateTime’], ‘ ‘)
track_allJoin = track_allJoin.withColumn(‘DateL’, split_datetime.getItem(0))
track_allJoin = track_allJoin.withColumn(‘TimeL’, split_datetime.getItem(1))
In [38]:
display(track_allJoin)
In [39]:
track_allJoin.createOrReplaceTempView(“track_allJoin”)
In [40]:
final_table = sqlContext.sql(“””SELECT CustId, Name, TrackId, Title, Length, Mobile, Gender, DateL, TimeL FROM track_allJoin”””)
In [41]:
%sql
select count(*) as totalRows, count(distinct CustID) AS uniqueCustomer, count(distinct TrackID) as uniqueTrack
from final_table
In [42]:
display(final_table)
In [43]:
from pyspark.sql.functions import to_date, to_timestamp,to_utc_timestamp,hour
final_table=final_table.withColumn(“DateL”, to_date(“DateL”, “MM/dd/yy”))
final_table=final_table.withColumn(“HourL”, hour(“TimeL”))

final_table.createOrReplaceTempView(“final_table”)
In [44]:
display(final_table)
In [45]:
%sql
select HourL,count(TrackId) as TotalTrack
from final_table
GROup by HourL
Order by TotalTrack

-sandbox
It turns out that the customers are more likely to listen to the music at night before bedtime
In [47]:
from pyspark.sql import DataFrameWriter

result_writer = DataFrameWriter(final_table)
result_writer.saveAsTable(‘final_table’,format=’parquet’, mode=’overwrite’,path=’/path/to/new/data/files’)

-sandbox
3. Study Implicit Rating¶
In [49]:
final_table.createOrReplaceTempView(“final_table”)
In [50]:
%sql
SELECT count(Name)
FROM final_table
WHERE Name IS NULL;
In [51]:
%sql
SELECT TrackId,Title,count(*)
FROM final_table
WHERE CustId=0
Group by TrackId, Title
Order by TrackId
In [52]:
%sql
select count(distinct TrackId) from final_table
In [53]:
%sql

SELECT * from(
SELECT TrackId, Length, rank() over (partition by TrackId order by count(*) desc) as rank
FROM final_table
WHERE Length>0
Group by TrackId, Length
order by rank ) r
where r.rank=1
In [54]:
%sql
select percentile(cnt,0.25) as cnt_25,
percentile(cnt,0.5) as cnt_50,
percentile(cnt,0.75) as cnt_75,
percentile(cnt,0.9) as cnt_90,
percentile(cnt,0.95) as cnt_95,
percentile(cnt,0.99) as cnt_99,
percentile(cnt,0.995) as cnt_995,
percentile(cnt,0.999) as cnt_999
from (
select TrackId, count(*) as cnt
from final_table
group by TrackId
)

-sandbox a song has been listened 4009 times, this number falls in the 99.5% of all song played
the median play time is 422 times
In [56]:
%sql
select TrackId, count(*) as cnt
from final_table
group by TrackId
In [57]:
%sql
select distinct DateL
from final_table
In [58]:
%sql
select weekofyear(DateL),min(DateL), max(DateL), count(*)
from final_table
group by weekofyear(DateL)
order by weekofyear(DateL)
–week 13-18 are whole weeks
In [59]:
%sql
select CustId, TrackID, count(*),sum(case when Length>=200 then 1 else 0 end) as cnt
from final_table
group by CustId, TrackID
Order by CustId
In [60]:
%sql
select CustId, TrackID, count(*)
from final_table
group by CustId, TrackID
In [61]:
%sql –cumulative play times
select percentile(cnt,0.25) as cnt_25, percentile(cnt,0.5) as cnt_50, percentile(cnt,0.75) as cnt_75, percentile(cnt,0.90) as cnt_90, percentile(cnt,0.925) as cnt_93, percentile(cnt,0.95) as cnt_95, percentile(cnt,0.975) as cnt_975, percentile(cnt,0.99) as cnt_99, percentile(cnt,0.999) as cnt_999
from (
select CustId, TrackID, count(*),sum(case when Length>=0 then 1 else 0 end) as cnt
from final_table
group by CustId, TrackID
Order by CustId
)

-sandbox
4. Study Explicit Rating¶
In [63]:
%sql

select CustId, TrackID,
case when sum(case when Length>=0 then 1 else 0 end)<2 then 0 when sum(case when Length>=0 then 1 else 0 end)<4 then 1 when sum(case when Length>=0 then 1 else 0 end)<7 then 2 else 3 end as score from final_table group by CustId, TrackID In [64]: %sql select score,count(*) from ( select CustId, TrackID, case when sum(case when Length>=0 then 1 else 0 end)<2 then 0 when sum(case when Length>=0 then 1 else 0 end)<4 then 1 when sum(case when Length>=0 then 1 else 0 end)<7 then 2 else 3 end as score from final_table group by CustId, TrackID ) group by score order by score -sandbox 5. Build Popularity-based Benchmark model and Machine Learning model (ALS)¶ -sandbox https://www.ercim.eu/publication/ws-proceedings/DELOS5/nichols.pdf We believe an ideal solution is to improve the user interface to acquire implicit ratings by watching user behaviors. Implicit ratings include measures of interest such as whether the user read an article and, if so, how much time the user spent reading it. The main motivation for using implicit ratings is that it removes the cost to the evaluator of examining and rating the item. Each implicit rating will probably contain less 'value' than an explicit rating but the appropriate cost-benefit trade-off for different types of implicit data will have to be determined empirically three types of implicit data: read/ignored, saved/deleted and replied/not replied -sandbox 5.1 Recommendation Model from Implicit Ratings: Train-Week 40-51, Test-Week 52 and 53¶ -sandbox 'rating' is Customer #i has listened to the Song #j in a total of n times over the period of t, which n is the raing¶ In [69]: #set training data using week 40 to week 51 rating_im_train=spark.sql(""" select CustId, TrackID, sum(case when Length>0 then 1 else 0 end) as rating
from final_table
where weekofyear(DateL)<=51 and weekofyear(DateL)>=40
group by CustId, TrackID

“””)

rating_im_train.createOrReplaceTempView(“rating_im_train”)
In [70]:
display(rating_im_train)
In [71]:
#set testing data using week 51 and 52
rating_im_test=spark.sql(“””

select CustId, TrackID, sum(case when Length>0 then 1 else 0 end) as rating
from final_table
where weekofyear(DateL)>51
group by CustId, TrackID

“””)
rating_im_test.createOrReplaceTempView(“rating_im_test”)
In [72]:
display(rating_im_test)

-sandbox
1. CustId #1060 has listened to song #0 in a total of 1 time in the test-set period, so rating is 1
2. CustId #69 has listened to song #1 in a total of 3 times in the test-set period, so rating is 3
In [74]:
%sql
select count(*) from rating_im_train
In [75]:
%sql
select count(*) from rating_im_test
In [76]:
%sql
select * from rating_im_train
order by TrackID
limit 20
In [77]:
%sql
select TrackID, count(distinct CustId) as prediction
from rating_im_train
group by TrackID
In [78]:
model = spark.sql(“””
select TrackID, count(*) as prediction
from rating_im_train
group by TrackID
“””)
model.createOrReplaceTempView(“model”)

-sandbox
‘prediction’ is the number of unique customers that have listened to the same track¶
In [80]:
display(model)
In [81]:
predictions = spark.sql(“””
select t.*, m.prediction
from rating_im_test t left join
model m
on t.TrackId=m.TrackId
“””)
predictions.createOrReplaceTempView(“predictions”)
In [82]:
display(predictions)
In [83]:
%sql
select * from predictions
where CustID = 148
In [84]:
%sql
select CustId, TrackID, rating,
(rank() over (partition by CustId order by prediction desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from predictions

-sandbox
‘p_rank’ the percentile-ranking of program i within the ordered list of all programs prepared for user u.¶
In [86]:
%sql
select CustId, TrackID, rating, prediction,
(rank() over (partition by CustId order by prediction desc)-1) as numerator, (count(TrackID) over (partition by CustId)-1) as denominator
from predictions

-sandbox
1. (count(TrackID) over (partition by CustId)-1) as denominator => CustId #148 has listened a total of 46 unique tracks
2. minus 1 because we need 0% to 100%, so (numerator-1 )/ (denominator -1)
3. (rank() over (partition by CustId order by prediction desc)-1) as numerator => create rank based on prediction (count total unique customers group by song)
4. for example, CustId #148 has ever listend to 46 tracks in total, of these 46 tracks, TrackID #2 has ever been listened by a total of 3206 unique customers, so TrackID #2 ranks the first which is rank #0 in all of the 46 tracks in CustID #148.
5. rankui = 0% would mean that program i is predicted to be the most desirable for user u, thus preceding all other programs in the list. On the other hand, rankui = 100% indicates that program i is predicted to be the least preferred for user u, thus placed at the end of the list.
In [88]:
%sql
select CustId, count(TrackID)-1
from predictions
group by(CustId)
In [89]:
%sql
select CustId, TrackID, rating, prediction,
(rank() over (partition by CustId order by prediction desc)-1) as numerator
from predictions
where CustId = 148
In [90]:
EPR_evaluation=spark.sql(“””
select CustId, TrackID, rating,
(rank() over (partition by CustId order by prediction desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from predictions
“””)
EPR_evaluation.createOrReplaceTempView(“EPR_evaluation”)
In [91]:
display(EPR_evaluation)
In [92]:
%sql
select CustID,sum(p_rank * rating) / sum(rating) as EPR_Customer
from EPR_evaluation
group by CustID
order by EPR_Customer
In [93]:
result = spark.sql(“””
select sum(p_rank*rating)/sum(rating) as p_EPR
from EPR_evaluation
“””)
In [94]:
display(result)
In [95]:
print(str(round(result.collect()[0][0]*100,2))+”%”)

-sandbox
EPR >= 50% indicates an algorithm no better than random.
In [97]:
%sql
–Benchmark: best EPR possible
select sum(p_rank*rating)/sum(rating) as Best_EPR
from (
select CustId, TrackID, rating,
(rank() over (partition by CustId order by rating desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from rating_im_test
)

In [98]:
from pyspark.ml.evaluation import Evaluator

class eprEvaluator(Evaluator):
def _evaluate(self, predictions):
predictions.createOrReplaceTempView(“predictions”)
result = spark.sql(“””
select sum(p_rank*rating)/sum(rating) as p_EPR
from (
select CustId, TrackID, rating,
(rank() over (partition by CustId order by prediction desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from predictions
)
“””).collect()[0][0]
return float(result)
In [99]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als_im= ALS(alpha=30, maxIter=5, rank=50, regParam=0.1, userCol=”CustId”, itemCol=”TrackID”, ratingCol=”rating”, coldStartStrategy=”drop”, implicitPrefs=True, nonnegative=False)

model_im = als_im.fit(rating_im_train)

predictions_im = model_im.transform(rating_im_test)

epr_evaluator = eprEvaluator()
epr_im = epr_evaluator.evaluate(predictions_im)
print(“Expected percentile ranking for implicit rating = ” + str(epr_im))
In [100]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

als_im= ALS(alpha=200, maxIter=7, rank=50, regParam=0.08, userCol=”CustId”, itemCol=”TrackID”, ratingCol=”rating”, coldStartStrategy=”drop”, implicitPrefs=True, nonnegative=False)

model_im = als_im.fit(rating_im_train)

predictions_im = model_im.transform(rating_im_test)

epr_evaluator = eprEvaluator()
epr_im = epr_evaluator.evaluate(predictions_im)
print(“Expected percentile ranking for implicit rating = ” + str(epr_im))
In [101]:
# Generate top 10 movie recommendations for each user
userRecs_im = model_im.recommendForAllUsers(10)
# Generate top 10 user recommendations for each song
songRecs_im = model_im.recommendForAllItems(10)
In [102]:
display(userRecs_im)
In [103]:
display(songRecs_im)

-sandbox
5.2 Recommendation Model from Implicit Ratings: Randomly split the dataset¶
In [105]:
def pop_fit(ratings_mat):
ratings_mat.createOrReplaceTempView(“ratings_mat”)
model = spark.sql(“””
select TrackID, count(*) as prediction
from rating_im_train
group by TrackID
“””)
return model

def pop_transform(model, test):
model.createOrReplaceTempView(“model”)
test.createOrReplaceTempView(“test”)
predictions = spark.sql(“””
select t.*, m.prediction
from rating_im_test t left join
model m
on t.TrackId=m.TrackId
“””)
return predictions

def bestepr(test):
test.createOrReplaceTempView(“test”)
result = spark.sql(“””
select sum(p_rank*rating)/sum(rating) as Best_EPR
from (
select CustId, TrackID, rating,
(rank() over (partition by CustId order by rating desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from test
)
“””).collect()[0][0]
return float(result)
In [106]:
#set training data using week 40 to week 51
rating_im=spark.sql(“””

select CustId, TrackID, sum(case when Length>0 then 1 else 0 end) as rating
from final_table
group by CustId, TrackID

“””)

(rating_im_tr, rating_im_te) = rating_im.randomSplit([0.8, 0.2],seed=50)

rating_im.createOrReplaceTempView(“rating_im”)
rating_im_tr.createOrReplaceTempView(“rating_im_tr”)
rating_im_te.createOrReplaceTempView(“rating_im_te”)
In [107]:
model_pop=pop_fit(rating_im_tr)
predictions_pop=pop_transform(model_pop, rating_im_te)
epr_evaluator = eprEvaluator()
epr_pop = epr_evaluator.evaluate(predictions_pop)
print(“Expected percentile ranking for popularity recommendation = ” + str(epr_pop))
In [108]:
%sql
–Benchmark: best EPR possible
select sum(p_rank*rating)/sum(rating) as Best_EPR
from (
select CustId, TrackID, rating,
(rank() over (partition by CustId order by rating desc)-1)*1.0/(count(TrackID) over (partition by CustId)-1) as p_rank
from rating_im_te
)
In [109]:
als_im= ALS(alpha=30, maxIter=5, rank=50, regParam=0.1, userCol=”CustId”, itemCol=”TrackID”, ratingCol=”rating”, coldStartStrategy=”drop”, implicitPrefs=True, nonnegative=False)

model_im_r = als_im.fit(rating_im_tr)

predictions_im_r = model_im.transform(rating_im_te)
epr_evaluator = eprEvaluator()
epr_im_r = epr_evaluator.evaluate(predictions_im_r)
print(“Expected percentile ranking for implicit rating – random split = ” + str(epr_im_r))
In [110]:
als_im= ALS(alpha=100, maxIter=7, rank=50, regParam=0.08, userCol=”CustId”, itemCol=”TrackID”, ratingCol=”rating”, coldStartStrategy=”drop”, implicitPrefs=True, nonnegative=False)

model_im_r = als_im.fit(rating_im_tr)

predictions_im_r = model_im.transform(rating_im_te)
epr_evaluator = eprEvaluator()
epr_im_r = epr_evaluator.evaluate(predictions_im_r)
print(“Expected percentile ranking for implicit rating – random split = ” + str(epr_im_r))

-sandbox
5.3 Recommendation Model From Explicit Rating¶
In [112]:
rating_ex=spark.sql(“””

select CustId, TrackID,
case when sum(case when Length>=0 then 1 else 0 end)<2 then 0 when sum(case when Length>=0 then 1 else 0 end)<4 then 1 when sum(case when Length>=0 then 1 else 0 end)<7 then 2 else 3 end as rating from final_table group by CustId, TrackID """) # Randomly split the dataset to train:test as 0.8:0.2; random seed=20 (rating_ex_train, rating_ex_test) = rating_ex.randomSplit([0.8, 0.2],seed=20) rating_ex.createOrReplaceTempView("rating_ex") rating_ex_train.createOrReplaceTempView("rating_ex_train") rating_ex_test.createOrReplaceTempView("rating_ex_test") In [113]: display(rating_ex) In [114]: model_pop=pop_fit(rating_ex_train) predictions_pop=pop_transform(model_pop, rating_ex_test) epr_evaluator = eprEvaluator() epr_pop = epr_evaluator.evaluate(predictions_pop) print("Expected percentile ranking for popularity recommendation = " + str(epr_pop)) best_epr = bestepr(rating_ex_test) print("The best percentile ranking possible =" + str(best_epr)) In [115]: als_ex = ALS(rank=50, maxIter=7, regParam=0.06, userCol="CustId", itemCol="TrackID", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=False, nonnegative=False) model_ex = als_ex.fit(rating_ex_train) predictions_ex = model_ex.transform(rating_ex_test) rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse_ex = rmse_evaluator.evaluate(predictions_ex) epr_evaluator = eprEvaluator() epr_ex = epr_evaluator.evaluate(predictions_ex) print("RMSE for explicit rating = " + str(rmse_ex)) print ("Expected percentile ranking for explicit rating = " + str(epr_ex)) In [116]: als_ex = ALS(rank=100, maxIter=7, regParam=0.1, userCol="CustId", itemCol="TrackID", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=False, nonnegative=False) model = als_ex.fit(rating_ex_train) predictions = model.transform(rating_ex_test) rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse = rmse_evaluator.evaluate(predictions) epr_evaluator = eprEvaluator() epr = epr_evaluator.evaluate(predictions) print("RMSE for explicit rating = " + str(rmse)) print ("Expected percentile ranking = " + str(epr)) In [117]: #Grid search with TestValidationSplit from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit als_ex= ALS(userCol="CustId", itemCol="TrackID", ratingCol="rating", implicitPrefs=False, coldStartStrategy="drop", maxIter=1, rank=5) grid=ParamGridBuilder().addGrid(als_ex.regParam, [0.03,0.06]).build() #.addGrid(als_ex.regParam, [0.03,0.06,0.09]) rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") tvs = TrainValidationSplit(estimator=als_ex, estimatorParamMaps=grid, evaluator=rmse_evaluator,trainRatio=0.8,seed=100) tvsModel = tvs.fit(rating_ex_train) predictions = tvsModel.transform(rating_ex_test) best_model= tvsModel.bestModel print ("The rank for best model is " + str(best_model.rank)) print ("The Max number of iteration for best model is " + str(best_model._java_obj.parent().getMaxIter())) In [118]: # Generate top 10 movie recommendations for each user userRecs_ex = model_ex.recommendForAllUsers(10) # Generate top 10 user recommendations for each song songRecs_ex = model_ex.recommendForAllItems(10) In [119]: display(userRecs_ex) In [120]: display(songRecs_ex) In [121]: