CIS_545_HW_2_STUDENT_F2020
CIS 545 Homework 2¶
Due 12 October, 2020 by 10pm Eastern Time¶
Worth 100 points in total¶
Welcome to Homework 2! By now, you should be familiar with the world of data science and the Pandas library. This assignment will focus on broadening both of these horizons by covering hierarchical data, graphs, and traversing relationships as well as two new tools: SQL and Spark.
In the first section, we will familiarize ourselves with SQL (specifically pandassql and explore the Stack Exchange dataset. We will also finish out the section with some text analysis.
The second section will focus on graph data and give you a small preview of Spark using the Yelp dataset. This homework is designed to introduce you to Spark’s required workflow before you fully unlease its power next homework and deploy it on an AWS cluster.
We are introducing a lot of new things in this homework, and it is often where students start to get lost in the data science sauce, so we strongly encourage you to review the slides/material as you work through this assignment and will try to link the most relevant sections!
Before you Begin
Be sure to click “Copy to Drive” to make sure you are working on your own personal version of the homework
Read the Piazza and FAQ for updates! If you have been stuck, chances are other students are too! We don’t want you to waste away for two hours trying to get that last point on the autograder so do check Piazza for similar struggles or even homework bugs that will be clarified in the FAQ 🙂
Section 0: Homework Initialization¶
Part -2: Install the Proper Version of Pandas¶
Run the following cell to install the proper version of pandas. After running this cell, restart your runtime (Runtime > Restart runtime) and then run all the remaining set up cells.
In [ ]:
!pip3 install pandas==1.0.5
In [ ]:
# make sure that this cell prints True! Otherwise you may have forgotten to
# restart your runtime after running the cell above
import pandas as pd
print(pd.__version__ == ‘1.0.5’)
Part -1: Enter your PennID¶
In [ ]:
STUDENT_ID = #ENTER YOUR PENNID HERE
Part 0: Libraries and Set Up Jargon (The usual wall of imports)¶
In [ ]:
#! sudo apt install openjdk-8-jdk
#! sudo update-alternatives –config java
In [ ]:
!pip3 install penngrader
from penngrader.grader import *
In [ ]:
grader = PennGrader(homework_id = ‘CIS545_Fall_2020_HW2’, student_id = STUDENT_ID)
In [ ]:
### Install required packages
%%capture
!pip3 install lxml
!pip install pandasql
!pip install googledrivedownloader
In [ ]:
import numpy as np
import matplotlib
import gc
import time
import warnings
import json # JSON parsing
from lxml import etree # HTML parsing
import time # Time conversions
from lxml import etree # XML Parser
import pandasql as ps #SQL on Pandas Dataframe
import nltk
nltk.download(‘punkt’)
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from collections import Counter
Section 1: Exploring the Stack Exchange Dataset¶
To survive as a student at Penn , you’ve certainly used Stack Exchange or Stack Overflow, as a source for all your technical queries. Stack Exchange looks a lot like a social network, it has the following pieces of information to tie it all together:
Users: All stack exchange users including admins etc.
Posts: All the questions as well as the answers that users post
Comments: As the name suggests, these are comments on posts
Votes: Up/Downvotes
For this homework we’ll be parsing this data (dumped in XML) into dataframes and relations, and then exploring how to query and assemble the tables into results with Pandas and PandaSQL.
Part 1: Loading our datasets [12 points total]¶
Before we get into the data, we first need to load our datasets. We will actually only be using the Users and Posts datasets for our queries, but we want you to write a generalized xml parsing function that would be able to convert any of the xml files into a dataframe.
1.0 Importing Data¶
Below is the code to import the xml files from our shared Google Drive. The data is relatively small, so this shouldn’t take too long. We will only import the Users and Posts xmls for now, but the other datasets are there in case you want to take a look 🙂
In [ ]:
from google_drive_downloader import GoogleDriveDownloader as gdd
gdd.download_file_from_google_drive(file_id=’1T-SGFULQIkpg6LN5XLhBfiXhcAWNgCLe’,
dest_path=’/content/Users.xml’)
gdd.download_file_from_google_drive(file_id=’1etuY-EjzgEfMdPCSd7NblNz0qJJpAH3b’,
dest_path=’/content/Posts.xml’)
1.1 Load Dataset Function¶
Now that we finally have all our packages imported and datasets initalized, it’s time to finally write some code! Your first task is to write the function xml_to_df(file_path) that will parse the specified file into a dataframe. This function should be generalized, in the sense that it can accept any of the xml files that we loaded and return a dataframe. We highly recommend looking over the xml documentation in order to accomplish this task.
TODO: Once you have written xml_to_df(file_path), create a posts_df and users_df with the parsed XML files (/content/Users.xml and /content/Posts.xml)
Tip: try figuring out the steps with one of the two XML files first!
In [ ]:
#Solution
def xml_to_df(file_path):
“”” Converts an xml file to a dataframe
:param file_path: path to file
:return: dataframe
“””
## TODO
In [ ]:
posts_df = ##TODO: call your function here
In [ ]:
# [CIS 545 PennGrader Cell] – 5 points
grader.grade(test_case_id = ‘test_xml_to_posts_df’, answer = posts_df[:75])
In [ ]:
users_df = ##TODO: call your function here
In [ ]:
grader.grade(test_case_id = ‘test_xml_to_users_df’, answer = users_df[:75])
1.2 Clean Dataset¶
Next, we are going to want to clean up our dataframes, namely 1) removing null values, 2) changing datatypes, and 3) dropping columns
Originally, we were going to have you identify the datatypes with this image on your own, but I (the TA writing this section) found this part really tedious and rage-inducing so we have defined the specific columns to convert below. All you need to do is write the function 🙂
TODO: 1) replace all null values in both datasets in-place. 2) define a function dtype_converter(df, int_columns) that takes in a dateframe and a list specifying which columns should be integers. Then, use this function on both posts_df and users_df using the lists defined below. (Note: we don’t need to convert any columns to strings since they’re already objects, and we’re ignoring datetime)
In [ ]:
##TODO replace NA values here
In [ ]:
#columns that need to be integers
int_posts_cols = [“Id”, “PostTypeId”, “AcceptedAnswerId”, “ParentId”, “Score”,
“ViewCount”, “OwnerUserId”, “LastEditorUserId”, “AnswerCount”,
“CommentCount”, “FavoriteCount”]
int_users_cols = [“Id”, “Reputation”, “Views”, “UpVotes”, “DownVotes”, “AccountId”]
In [ ]:
def dtype_converter(df, int_columns):
“””converts columns to type integer
:param df: dataframe to convert
:param int_columns: list of columns to convert
:return: dataframe
“””
#TODO: implement function below
In [ ]:
posts_df = #TODO: call function here
users_df = #TODO: call function here
In [ ]:
#check your datatypes
posts_df.dtypes
In [ ]:
grader.grade(test_case_id = ‘test_posts_dtypes’, answer = posts_df[:75])
In [ ]:
grader.grade(test_case_id = ‘test_users_dtypes’, answer = users_df[:75])
Part 1.5 Your Sandbox¶
Instead of throwing you straight into the deep end, we wanted to give you a chance to take some time and explore the data on your own. This section is not graded, so for the speedrunners out there feel free to just jump in, but we wanted to at least give you a small space to utilize your basic EDA toolkit to familiarize yourself with all the info you just downloaded.
Some suggestions to get you started:
df.head()
df.info()
df.describe()
Also, definitely take a look at this readme that provides a good overview of all the datasets (ignore the ones that you did not ask you to convert)
In [ ]:
# your EDA here! feel free to add more cells
Part 2: Exploring the data with Pandas and PandasSQL [20 points total]¶
Now that you are familiar (or still unfamiliar) with the dataset, we will now introduce you to SQL, or more specifically pandasql: a package create to allow users to query pandas DataFrames with SQL statements.
The typical flow to use pandasql (shortened to ps) is as follows:
write a SQL query in the form of a string (Tip: use triple quotes “””x””” to write multi-line strings)
run the query using ps.sqldf(your_query, locals())
Pandasql is convenient in that it allows you to reference the dataframes that are currently defined in your notebook, so you will be able to fully utilize the posts_df and users_df that you have created above!
Given that it is a brand new language, we wanted to give you a chance to directly compare the similarities/differences of the pandas that you already know and the SQL you are about to learn. Thus, for each query, we ask that you to look into the question twice: once with pandas and once with pandasql.
Each answer will thus require both a pd_ and sql_ prefixed-dataframe that you will submit seperately to the autograder. We will be reviewing your code to make sure you wrote the code in the corresponding languages.
Here is a good resource to review pandasql.
2.1 Spliting Up posts_df¶
posts_df actually contains both posted questions and the answers. The provided readme details the distinguishing factors as follows:
– PostTypeId
– 1: Question
– 2: Answer
– ParentID (only present if PostTypeId is 2)
– AcceptedAnswerId (only present if PostTypeId is 1)
TODO: Using pandas/pandasql, split posts_df into a pd/sql_questions_df and pd/sql_answers_df based on these values of PostTypeId.
In [ ]:
pd_questions_df = #TODO
pd_answers_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_pd_questions_df’, answer = pd_questions_df[“PostTypeId”].values)
In [ ]:
grader.grade(test_case_id = ‘test_pd_answers_df’, answer = pd_answers_df[“PostTypeId”].values)
In [ ]:
questions_query = #TODO
answers_query = #TODO
sql_questions_df = #TODO use ps.sqldf here on questions_query
sql_answers_df = #TODO use ps.sqldf here on answers_query
In [ ]:
grader.grade(test_case_id = ‘test_qa_query’, answer = (questions_query,answers_query))
In [ ]:
#using just our sql dataframe moving forward
questions_df = sql_questions_df
answers_df = sql_answers_df
2.2 What are the most popular questions?¶
TODO: Use questions_df to find the 10 most popular questions by ViewCount.
Store the results in pd/sql_popular_df which be have the following format:
Id | Title | ViewCount
— | — | —
Hint: for your SQL query, you will need to know ORDER BY, LIMIT
In [ ]:
pd_popular_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_pd_popular_df’, answer = pd_popular_df)
In [ ]:
popular_query = #TODO
sql_popular_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_popular_query’, answer = popular_query)
In [ ]:
grader.grade(test_case_id = ‘test_sql_popular_df’, answer = sql_popular_df)
2.3 Who are the most helpful users?¶
TODO: Use answers_df to find the names of the top 10 users who answer the most questions on stack exchange. This should be based on the count of unique answers made by the user.
Your answer, stored in pd/sql_talkative_df will have the following format:
UserId | DisplayName | ResponseCount
— | — | —
Note: both users_df and answers_df have an Id column, but store entirely different values in them!
SQL Hint: The tools that you will need include, but are not limited toAS, JOIN, GROUP BY, ORDER BY and LIMIT
In [ ]:
pd_talkative_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_pd_talkative_df’, answer = pd_talkative_df)
In [ ]:
#theres probably going to be a lot of questions on how to order by count here
talkative_query = #TODO
sql_talkative_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_talkative_query’, answer = talkative_query)
In [ ]:
grader.grade(test_case_id = ‘test_sql_talkative_df’, answer = sql_talkative_df)
2.4 Who are the most helpful-in-a-different-kind-of-way users?¶
TODO: find the users that ask a lot of questions, but have never posted an answer. To accomplish this, you are going to want to find all the users in questions_df that don’t appear in answers_df. Sort by QuestionsCount descending and store only the top 5 results.
The query will require you to write a nested SQL query. That is, there will be at least one select statement inside of a select statement. This means that you should NOT write two seperate SQL commands and call ps.sqldf() twice.
Though it would be helpful, you do NOT have to implement this in pandas. Your answer, stored in askers_df will have the following format:
UserId | DisplayName | QuestionsCount
— | — | —
SQL Hint: You can use NOT IN or LEFT JOIN.
In [ ]:
askers_query = #TODO
askers_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_askers_query’, answer = askers_query)
In [ ]:
grader.grade(test_case_id = ‘test_askers_df’, answer = askers_df)
2.5 So which is better, SQL or Pandas?¶
Now that you have a taste for SQL, let’s try to use our new skill to query stack exchange in this notebook and put this debate to rest.
TODO: Find all of the answers to a post that asks about Pandas vs. SQL. Here are some clues that will come in handy:
This post contains the words “pandas” and “sql”
This post has the most viewcount out of all the posts with both of those words
The answers to this post have the column ParentId equal to the post’s Id
Again, no need to do this in pandas, but your answer, stored in versus_df will have the following format:
QuestionId | Question | QuestionBody | AnswerId | AnswerBody
— | — | — | — | —
SQL Hint: take a look at the LIKE function
In [ ]:
versus_query = #TODO
versus_df = #TODO
We highly recommend that you read the responses! They are actually all pretty accurate and go into the pros/cons that you probably encountered while working through the problem set. Use pd.set_option(‘display.max_colwidth’, -1) to view the full columns and when you’re done set the colwidth back to a value like 20 so that you don’t have giant dataframes in the next steps.
(You could also try to find the same question via Google Search)
In [ ]:
#pd.set_option(‘display.max_colwidth’, -1)
#versus_df
In [ ]:
grader.grade(test_case_id = ‘test_versus_query’, answer = versus_query)
In [ ]:
grader.grade(test_case_id = ‘test_versus_df’, answer = versus_df)
Part 3: Working with Text Data [22 points]¶
Shifting gears, let’s now try to do some text-based analysis. Our Stack Exchange data has plenty of text that we can play with, from the user descriptions to the posts themselves. Text data is complex, but can also be used to generate extremely interpretable results, making it valuable and interesting.
Throughout this section, we will attempt to answer the following:
What types of questions should I ask to get a higher reputation on Stack Exchange?¶
Users on stack exchange are valued based on their reputation, which depends on the quality of your posts. Each post receives a score, where score = number of upvotes – number of downvotes. This value is already present in your posts_df.
Both questions and answers get scores, but let’s just focus on what types of questions we should/shouldn’t ask in order to get a higher score and thus higher reputation.
3.1 Getting Highest and Lowest Scored Posts¶
TODO: First, let’s get questions with the negative scores from questions_df and then get the same number of questions with highest scores. Convert the Body column of the highest/lowest scorers into two lists: highest_content and lowest_content. Be sure to sort when needed!
Feel free to use either pandas or pandasql to accomplish this 🙂
In [ ]:
highest_content = #TODO: should be a list
lowest_content = #TODO: should be a list
In [ ]:
grader.grade(test_case_id = ‘test_lowest_content’, answer = lowest_content)
In [ ]:
grader.grade(test_case_id = ‘test_highest_content’, answer = highest_content)
3.2 Cleaning our Text with Regex¶
Now that we have the content of our highest/lowest scored posts, we will now need to clean and tokenize them.
First, before we do anything, let’s just take a look at what we are working with
In [ ]:
highest_content[0]
You probably noticed a couple of things:
html tags (\
, \, etc.)
embedded latex (words surrounded $$)
newline characters(\n)
We are going to clean out all of these cases using regex, a staple text processing tool that matches strings based on a specified pattern. Creating these patterns is actually considered a form of art to some, as the syntax is very extensive. As a brief introduction here are some basic pattern components that you will need to know:
“c”: matches a “c” character in a string
“c*?”: matches 0 or more c characters
“.” matches any character
“.*?c”: matches any characters until you encounter “c”
Note: the “?” makes the astericks less greedy and severe when removing parts of the string. It’s good practice to include it, but not always necessary.
TODO: Below, create a function remove_bad_patterns(text) that removes all of the 3 cases listed above from a given string, text. You will need to
create patterns to handle each of the cases
use re.sub(pattern, newstring) to substitute all matches with the empty string, “”. If you want to test your pattern, check out this tool.
Note: “$” is considered a special character in regex, so you will need to escape it with “\\$” to specify you want to match the character.
In [ ]:
import re
def remove_bad_patterns(text):
“””Remove html, latex, and newline characters from a string
:param text: content as a string
:return: cleaned text string
“””
Now, apply this function to both highest_content and lowest_content to create cleaned_highest_content and cleaned_lowest_content, respectively, and let’s take another look at the new and improved first entry:
In [ ]:
cleaned_highest_content = #TODO
cleaned_lowest_content = #TODO
cleaned_highest_content[0]
In [ ]:
grader.grade(test_case_id = ‘test_cleaned_highest’, answer = cleaned_highest_content)
In [ ]:
grader.grade(test_case_id = ‘test_cleaned_lowest’, answer = cleaned_lowest_content)
A lot cleaner, right? Of course, it’s not perfect but it’ll do for our purposes in this homework. With that out of the way let us now…
3.3 Tokenize the Text¶
Here, we are going to split up the content into a list of words. Here, we will use the nltk package, which contains an extensive set of tools to process text. Of course, like regex, this homework would be miles long if we really went into detail, so we are only going to utilize the following components:
nltk.word_tokenize(): a function used to tokenize our text
nltk.corpus.stopwords: a list of commonly used words such as “a”,”an”,”in” that are often ignored in text-related analysis
TODO: First, use stopwords to create a set of the most common english stopwords. Then, implement tokenized_content(content) that takes in a content string and
tokenizes the text
lowercases the token
removes stop words (commonly used words such as “a”,”an”, “in”)]
keeps words with only alphabet characters (no punctuation)
In [ ]:
import nltk
from nltk.corpus import stopwords
nltk.download(‘stopwords’)
In [ ]:
stopwords = set(stopwords.words(‘english’))
In [ ]:
def tokenize_content(content):
“””returns tokenized string
:param content: text string
:return: tokenized text/list of words
“””
#TODO
Now, apply your tokenized_titles function to each piece of content in cleaned_highest_content and cleaned_lowest content and flatten both of the lists to create highest_tokens and lowest_tokens
In [ ]:
highest_tokens = #TODO
lowest_tokens = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_highest_tokens’, answer = highest_tokens)
In [ ]:
grader.grade(test_case_id = ‘test_lowest_tokens’, answer = lowest_tokens)
3.4 Most Frequent Words¶
Now, find the 20 most common words amongst the content of your highest and lowest questions.
Hint: https://docs.python.org/2/library/collections.html#counter-objects
In [ ]:
lowest_counter = #TODO
lowest_most_common = #TODO
In [ ]:
highest_counter = #TODO
highest_most_common = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_highest_most_common’, answer = highest_most_common)
In [ ]:
grader.grade(test_case_id = ‘test_lowest_most_common’, answer = lowest_most_common)
3.5 Refining our Lists¶
Hmmm…both of these lists seem to overrepresent the common jargon of data science. Let’s try to tease out words that distinguish the high from the low scoring posts.
One approach would be to find words in one list that are not in the other. This, however, may be too naive, as even if a word is extremely common in our high list, if it appears only once in our low list, it would get removed from consideration.
Let’s instead find the difference between the counts within our two lists. Thus, if a word is really common in one, but not the other, the count would only decrease slightly. Alternatively, if a word is common in both lists, it would effectively zero out.
TODO: Using the difference method, create a distinct_highest_common and distinct_lowest_commonr that find the top 20 counts of words within each group of posts after using the difference method described above. Be careful on which list you are subtracting!
In [ ]:
distinct_highest_common = #TODO
In [ ]:
distinct_lowest_common = #TODO
In [ ]:
grader.grade(test_case_id = ‘test_distinct_highest_common’, answer = distinct_highest_common)
In [ ]:
grader.grade(test_case_id = ‘test_distinct_lowest_common’, answer = distinct_lowest_common)
The lists are much more different right? It seems as if low scoring posts tend to ask a lot about errors/code while higher posts are much more conceptual based.
So if you’re a looking for a high reputation, don’t ask people to debug your code!
3.6 Word Clouds¶
Before we move on from this dataset, let’s do one final step and visualize our results with wordclouds.
TODO: Take a look at this documentation and create two word clouds for our two groups of distinct tokens.
Be sure to create these on the full list of distinct tokens, and not just the top 20. We will be going through your notebooks and manually grading your world clouds (worth 4 points).
In [ ]:
highest_wordcloud = #TODO
In [ ]:
lowest_wordcloud = #TODO
Section 2: Spark, Hierarchical Data and Graph Data on Yelp Reviews Dataset¶
Getting Started with Apache Spark¶
Now that you’ve seen how to run SQL queries through pandas, we’ll working with running SQL in Apache Spark! Apache Spark is a complex, cluster-based data processing system written in Scala used for big data processing. For the most part, Spark interfaces “smoothly” to Python.
While Spark dataframes try to emulate the same programming style as Pandas DataFrames, there are some differences in how you express things. Please refer to the Lecture Slides or the following resources to learn about these differences:
https://lab.getbase.com/pandarize-spark-dataframes/
From Pandas to Apache Spark’s Dataframe
For this assignment, we are going to get familiar with Spark without worrying too much about sharding and distribution. This isn’t really using it to its strengths — and in fact you might find Spark to be slow — but it will get you comfortable with programming in Spark without worrying about distributed nodes, clusters, and spending real dollars on the cloud. For Homework 3, we’ll connect your Jupyter instance to Spark running on the cloud.
Initializing a Connection to Spark¶
We’ll open a connection to Spark as follows. From SparkSession, you can load data into Spark DataFrames as well as RDDs.
Run the following cells to setup this part of the notebook!
In [ ]:
%%capture
!apt install libkrb5-dev
!wget https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install findspark
!pip install sparkmagic
!pip install pyspark
! pip install pyspark –user
! pip install seaborn –user
! pip install plotly –user
! pip install imageio –user
! pip install folium –user
In [ ]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
spark = SparkSession.builder.appName(‘Graphs-HW2’).getOrCreate()
In [ ]:
%load_ext sparkmagic.magics
In [ ]:
import numpy as np
import pandas as pd
import matplotlib
#misc
import gc
import time
import warnings
#graph section
import networkx as nx
#import heapq # for getting top n number of things from list,dict
import pandas as pd
import numpy as np
# JSON parsing
import json
# HTML parsing
from lxml import etree
import urllib
# SQLite RDBMS
import sqlite3
# Time conversions
import time
# Parallel processing
# import swifter
# NoSQL DB
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError, OperationFailure
import os
os.environ[“SPARK_HOME”] = “/content/spark-3.0.1-bin-hadoop3.2”
os.environ[“JAVA_HOME”] = “/usr/lib/jvm/java-8-openjdk-amd64″
import pyspark
from pyspark.sql import SQLContext
In [ ]:
try:
if(spark == None):
spark = SparkSession.builder.appName(‘Initial’).getOrCreate()
sqlContext=SQLContext(spark)
except NameError:
spark = SparkSession.builder.appName(‘Initial’).getOrCreate()
sqlContext=SQLContext(spark)
Download data¶
The following code retrieves the Yelp dataset files from Google Drive.
In [ ]:
from google_drive_downloader import GoogleDriveDownloader as gdd
gdd.download_file_from_google_drive(file_id=’1XCANGSCd0pUNcXq18t2QDwCIpJxus8Dy’,
dest_path=’/content/yelp_business_attributes.csv’)
gdd.download_file_from_google_drive(file_id=’11lwBibxX7PYGgOfHU25_dDDDsPX1Pt0Y’,
dest_path=’/content/yelp_business.csv’)
gdd.download_file_from_google_drive(file_id=’1FU5Q-96erhTmk8SjC4XHUm94yWc6h3a0′,
dest_path=’/content/yelp_checkin.csv’)
gdd.download_file_from_google_drive(file_id=’1UaaLrCKjqoQ7G3JT_VUw56pc-dnTwyrS’, dest_path=’/content/yelp_review2.csv’)
gdd.download_file_from_google_drive(file_id=’1JNFZeLlimxNSwcOb-oBxxbwJqdg22WgD’,
dest_path=’/content/yelp_user.csv’)
Part 4: Working with Spark [21 points total]¶
4.1 Load Our Datasets¶
In this section, we’ll be using Spark to look into social data from Yelp. To start, let’s read our data into Spark. As an example of how to do this, to load the file input.txt into a Spark DataFrame, you can use lines like the following.
# Read lines from the text file
input_sdf = spark.read.load(‘input.txt’, format=”text”)
We’ll use the suffix _sdf to represent “Spark DataFrame,” much as we used _df to denote a Pandas DataFrame.
TODO: Load the various files from Yelp. Your datasets should be named yelp_business_sdf, yelp_business_attributes_sdf, yelp_check_in_sdf, yelp_reviews_sdf, and yelp_users_sdf. Submit the first 75 entries of the yelp_business_sdf, sorted by the “name” column in ascending order, to the autograder as a pandas dataframe by using the toPandas() function to convert it.
In [ ]:
# TODO: load Yelp datasets
yelp_business_sdf = #TODO
yelp_business_attributes_sdf = #TODO
yelp_check_in_sdf = #TODO
yelp_reviews_sdf = #TODO
yelp_users_sdf = #TODO
In [ ]:
yelp_business = yelp_business_sdf.toPandas().sort_values(by = [‘name’], ascending = True)[:75]
In [ ]:
grader.grade(test_case_id = ‘check_yelp_load’, answer = yelp_business[:75])
One key difference between using Pandas SQL and Spark SQL is that you’ll need to create a view of your data before Spark is able to query it. Note that when using a temporary view as we will be doing, Spark does not persist the data in memory.
TODO: Put all of your Spark dataframes from the previous section into temporary tables. The syntax is as follows:
yelp_business_sdf.createOrReplaceTempView(‘yelp_business’)
Note that when you’re accessing the yelp data within Spark SQL later in the homework you’ll want to refer to each table as the name of the view you assigned – for instance “yelp_business” not “yelp_business_sdf”. This distinction is important as this table is only visible to Spark as “yelp_business”.
In [ ]:
# TODO: save tables with names such as yelp_business, yelp_users
Next, explore the sdf’s using the Sandbox area below. Some functions that might be useful are:
show (shows the first few rows of data, similar to head in Pandas)
count (counts number of rows, similar to using len in Pandas)
dtypes (same as in Pandas)
describe (use with show to see summary statistics, similar to just describe on its own in Pandas)
You are not limited to these functions and are welcome to use any other ones. The purpose of this exploration is to get a sense of what the data looks like before moving on. Again, this section is not graded, but we encourage you to get familiar with the data before diving in.
In [ ]:
# your EDA here! feel free to add more cells
4.2 Simple Analytics on the Data¶
In this section, we will be executing Spark operations on the data given. Beyond simply executing the queries, you may try using .explain() method to see more about the query execution. Also, please read the data description prior to attempting the following questions to understand the data.
4.2.1 Spark and Big Data¶
You may be wondering why we can’t just use Pandas SQL for analytics on the yelp data. As the data we’re working with gets larger data, performance in Pandas will slow – or the data may even be too large to load into Pandas.
For a simple example, let’s compare how long the same query takes to run in Pandas SQL and Spark SQL.
TODO: First, convert the yelp business table to Pandas. Then, using the yelp business table, select the name of businesses located in Pennsylvania. Run this query in both Pandas SQL and Spark SQL and time how long the query takes to run. The time module will be useful for this. You may want to separate your code into several cells to ensure you are only timing one query at a time. Submit the ratio of the time it took the query to run in Pandas SQL to the time it took the query to run in Spark SQL, called time_ratio, to the autograder.
As a reminder, Spark uses lazy computation, meaning results are delayed until they are actually needed. Therefore, you will need to show your table (or do some other computation that requires the table to be generated) in order for your query to run in Spark.
In [ ]:
import time
In [ ]:
# TODO: Convert the yelp_business_sdf to Pandas
yelp_business_df = #TODO
In [ ]:
# TODO: Time the query takes to run in Pandas SQL
In [ ]:
# TODO: Time the query takes to run in Spark SQL
In [ ]:
# TODO: Ratio of time taken in Pandas SQL to time taken in SPark SQL
time_ratio = #TODO
In [ ]:
grader.grade(test_case_id = ‘time_check’, answer = time_ratio)
4.2.2 Cities by number of businesses¶
Next, we’ll explore which cities have the most restaurants.
TODO: Find the top 10 cities by number of (Yelp-listed) businesses. This table should include city, state, and num_restaurants, which is the number of restaurants in the city. Convert this sdf to Pandas and submit top10_cities_df to the autograder. Remember to only convert small tables to Pandas!
Your table should look something like:
city state num_restaurants
city 1 state 1 rating 1 number 1
city 2 state 2 rating 2 number 2
In [ ]:
# TODO: cities by number of businesses
# Worth 5 points
top10_cities_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘top10CitiesCheck’, answer = top10_cities_df)
4.2.3 Business ratings across states¶
Next, we’ll be looking into how ratings for the same business vary state by state. Throughout this problem, we’ll be intersted in the average rating by business and by state.
TODO: For each business, find the states where the business’s average rating is below the maximum of the business’s per-state average rating. Think about how to factor that into steps!
Compute the average rating for each business name by state. For each business, find the maximum average rating across all states’ average ratings.
Then compute an sdf containing the business name, state, avg_rating, and max_avg_rating for businesses in states where that business is not most highly rated. Order the output in order of business name, decreasing avg_rating, and increasing state name.
Convert the top 100 rows to Pandas and submit below_avg_states_df to the autograder.
Your table should look something like:
name state avg_rating max_avg_rating
business name 1 state 1 rating 1 maxing rating 1
business name 1 state 2 rating 2 maxing rating 1
business name 2 state 3 rating 3 maxing rating 2
In [ ]:
below_avg_states_df = #TODO
In [ ]:
grader.grade(test_case_id = ‘check_by_state_rating’, answer = below_avg_states_df)
4.3 Format Yelp Data as a Graph¶
The Yelp data you’ve been working with can be thought of as graph data. Recall that a graph is made up of a set of verticies that are connected by edges. Within the context of our data, we can think of the users/businesses as nodes. Edges would then represent a review by a user for a business.
With this in mind, we now want to reformat the yelp_reviews dataset to look more like a graph.
TODO: Use Spark SQL to rename the user_id column of yelp_reviews data to from_node and rename the business_id column to to_node. Filter to rows where both the user_id and business_id are not null. Create a temporary view with this table.
Your table should look something like:
from_node to_node score
user id 1 business id 1 stars 1
user id 2 business id 2 stars 2
In [ ]:
review_graph_sdf = #TODO
In [ ]:
review_graph_sdf.show(10)
TODO: Once you’ve made your graph and created a temporary view, use Spark SQL to filter to the rows in the graph that contain the sequence “abc” anywhere in the from_node. Convert this subset to a Pandas dataframe called named review_graph_abc and submit this to the autograder.
HINT: Look into the LIKE keyword and wildcards in SQL.
In [ ]:
review_graph_abc_sdf = #TODO
In [ ]:
# Add test case for making graph
grader.grade(test_case_id = ‘reviewGraphCheck’, answer = review_graph_abc)
Part 5. “Traversing” a Graph [21 points total]¶
For our next tasks, we will be “walking” the graph and making connections.
5.1 Intro to Distributed Breadth-First Search¶
Now that we have created our graph, we will be implementing a graph traversal algorithm known as Breadth First Search. It works in a way that’s equivalent to how a stain spreads on a white t-shirt. Take a look at the graph below:
Consider starting BFS from point A (green). This is considered the starting frontier/singular origin node.
The first round of BFS would involve finding all the nodes directly reachable from A, namely B-F (blue circles). These blue nodes make up the next frontier at depth 1 away from our starting node A.
The second round would then be identifying the red nodes which are the neighbors of the blue nodes. Now, the red nodes all belong to a frontier 2 depth away from A.
This process continues until all the nodes in the graph have been visited.
If you would like to learn more about BFS, I highly suggest looking here.
We will now be implementing spark_bfs(G, N, d), our spark flavor of BFS that takes a graph G, a set of origin nodes N, and a max depth d.
In order to write a successful BFS function, you are going to need to figure out
how to keep track of nodes that we have visited
how to properly find all the nodes at the next depth
how to avoid cycles and ensure that we do not constantly loop through the same edges (take a look at J-K in the graph)
5.2 Implement one Traversal¶
To break down this process, let’s think about how we would implement a single traversal of the graph. That is given the green node in the graph above, how are we going to get the blue nodes?
Consider the simple graph below which is different from the graph in the image above:
In [ ]:
simple = [(‘A’, ‘B’),
(‘A’, ‘C’),
(‘A’, ‘D’),
(‘C’, ‘F’),
(‘F’, ‘A’),
(‘B’, ‘G’),
(‘G’, ‘H’),
(‘D’, ‘E’)]
simple_dict = {‘from_node’: [‘A’, ‘A’, ‘A’, ‘C’, ‘F’, ‘B’, ‘G’, ‘D’],
‘to_node’: [‘B’, ‘C’, ‘D’, ‘F’, ‘A’, ‘G’, ‘H’, ‘E’]}
simple_graph_df = pd.DataFrame.from_dict(simple_dict)
simple_graph_sdf = spark.createDataFrame(simple_graph_df)
simple_graph_sdf.show()
As you can see, each row of this dataframe represents an edge between two nodes Although the nodes are labeled “from” and “to”, the edges are actually undirected, meaning that A–>B represents the same edge as B–>A.
Let’s define our starting node as follows:
In [ ]:
smallOrig = [{‘node’: ‘A’}]
Then, bfs with graph G, starting from smallOrig to depth 1, or spark_bfs(G, smallOrig, 1) would output as follows:
In [ ]:
simple_1_round_dict = {‘node’: [‘F’, ‘B’, ‘D’, ‘C’, ‘A’],
‘distance’: [1, 1, 1, 1, 0]}
simple_1_round_bfs_df = pd.DataFrame.from_dict(simple_1_round_dict)
simple_1_round_bfs_sdf = spark.createDataFrame(simple_1_round_bfs_df)
simple_1_round_bfs_sdf.show()
As you can see, this dataframe logs each node with its corresponding distance away from A. Moreover, we also know that these nodes are visited.
Hopefully, you can see how we can use our original graph and this new information to find the nodes at depth two.
This is exactly what we will try to accomplish with spark_bfs_1_round(visited_nodes) which will ultimately be the inner function of spark_bfs that we use to perform exactly one traversal of a graph.
TODO: Write spark_bfs_1_round(visted_nodes) that takes the currently dataframe of visited_nodes, performs one round of BFS, and returns an updated visited nodes dataframe. You should assume that a temporary sdf G already exists.
In [ ]:
def spark_bfs_1_round(visited_nodes):
“””
:param visited_nodes: dataframe with columns node and distance
:return: dataframe of updated visuted nodes, with columns node and distance
“””
#TODO
Now, run the inner function on simple_1_round_bfs_sdf result of 1 round of BFS on simple graph and store the results in simple_bfs_result. This is ultimately what the output of BFS to depth 2 should look like.
In [ ]:
simple_graph_sdf.createOrReplaceTempView(‘G’)
simple_bfs_result = #TODO
simple_bfs_result.show()
Convert this result to Pandas, sorted by the node, and submit it to the autograder.
In [ ]:
simple_bfs_test = #TODO
In [ ]:
grader.grade(test_case_id = ‘checksimpleBFS’, answer = simple_bfs_test)
5.3 Full BFS Implemntation¶
Now, we will fully implement spark_bfs. This function should iteratively call your implemented version of spark_bfs_1_round and ultimately return the output of this function at max_depth.
You are also responsible for initializing the starting dataframe, that is converting the list of origin nodes into a spark dataframe with the nodes logged at distance 0.
Consider the following:
schema = StructType([
StructField(“node”, StringType(), True)
])
my_sdf = spark.createDataFrame(origins, schema)
The schema ultimately specifies the structure of the Spark DataFrame with a string node column. It then calls spark.createDataFrame to map this schema to the origins nodes. Also, you are responsible for ensuring that a view of your graph is available within this function. (Note: you will also need to add in a distance column)
TODO: implement spark_bfs(G,origins,max_depth) and run on review_graph_sdf initalized in 4.3. Note: you may want to run tests on the simple_graph example as the review_graph_sdf will take quite some time to run.
In [ ]:
# TODO: iterative search over undirected graph
# Worth 5 points directly, but will be needed later
def spark_bfs(G, origins, max_depth):
“”” runs distributed BFS to a specified max depth
:param G: graph dataframe from 4.3
:param origins: list of origin nodes stored as {“node”: nodeValue}
:param max_depth: integer value of max depth to run BFS to
:return: dataframe with columns node, distance of all visited nodes
“””
#TODO
Test that this function works on the simple example first.
In [ ]:
simple_bfs_iterative_result = spark_bfs(simple_graph_sdf, smallOrig, 3)
simple_bfs_iterative_result.show()
TODO: Using the starting node defined below, create bfs_3 as the result of running sparkbfs on review_graph_sdf to a depth of 3. Finally, create a pandas dataframe of the first 75 results sorted by id as answer_75_df and submit this to the autograder.
In [ ]:
orig = [{‘node’: ‘bv2nCi5Qv5vroFiqKGopiw’}]
#grab the count
bfs_3 = #TODO
In [ ]:
answer_75_df = #TODO
When submitting to the autograder, submit as a tuple where first value is the length of your output dataframe and the second is the first 75 rows of your result.
However, before you grab your first 75 rows, sort by the ids
In [ ]:
#13603 is just obtained from running count.count()
grader.grade(test_case_id = ‘checkBFS’, answer = (length, answer_75_df))
Congratulations on making it to the end of Homework 2! Feel free to fill out this form with any feedback for this and prior homeworks. We know this assignment was pretty dense, but we hope that you still managed to learn a lot from it 🙂