HW3_STUDENT
CIS 545 Homework 3: Spark SQL¶
Due Date: October 30th, 2020 at 10pm EDT¶
Worth 100 points (plus 5 points extra credit)¶
Welcome to CIS 545 Homework 3! In this homework you will gain a mastery of using Spark SQL. By the end, you’ll be a star (not that you aren’t already one). Over the next few days you will be using an EMR cluster to use Spark to manipulate the linkedin_small_real.json dataset as well as the stocks.csv.
The goal of the homework will be to create a training dataset for a Random Forest Machine learning model. Yes, you’ll be playing with machine learning shortly!
The training data set will contain the monthly number of employees hired by companies in linkedin_small_real.json and their corresponding closing stock prices over a 10 year period (2000-2011). We will try and predict, based on this data, if the company will have a positive or negative growth in stock in the first quarter of the next year. Who’s ready to make some money?
The Necessary Notes and Nags¶
Before we begin here are some important notes to keep in mind,
IMPORTANT! I said it twice, it’s really important. In this homework, we will be using AWS resources. You are given a quota ($100) to use for the entirety of the homework. There is a small chance you will use all this money, however it is important that at the end of every session, you shut down your EMR cluster.
Be sure you use Google Colab for this Homework since we must connect to the EMR cluster and local Jupyter will have issues doing that. Using a Google Colab Notebook with an EMR cluster has two important abnormalities: The first line of any cell in which you will use the spark session must be %%spark. Notice that all cells below have this.
You will, unfortunately, not be able to stop a cell while it is running. If you wish to do so, you will need to restart your cluster. See the Setup EMR Document for reference.
You are required to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following function list to see all the SQL functions avaliable in Spark.
Throughout the homework you will be manipulating Spark dataframes (sdfs). We do not specify any ordering on the final output. You are welcome to order your final tables in whatever way you deem fit. We will conduct our own ordering when we grade.
Based on the challenges you’ve faced in the previous homework, we are including information on the expected schema of your results. Apache Spark is very fiddly but we hope this will help.
There are portions of this homework that are very hard. We urge you start early to come to office hours and get help if you get stuck. But don’t worry, I can see the future, and you all got this.
With that said, let’s dive in.
Step 0: Set up EMR¶
Your first task is to create an EMR (Elastic MapReduce) cluster your AWS Educate Accounts. Please see the attached document for detailed instructions.
Move on to Step 0.1 after you have completed all the steps in the document.
Step 0.1: The Superfluous Setup¶
Run the following two cells. These will allow your colab notebook to connect to an use your EMR.
In [ ]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic
In [ ]:
%%capture
%load_ext sparkmagic.magics
Step 0.2: The Sharp Spark¶
Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add http:// to the beginning of the address and the port to the end. The final format should be,
http://
For example, if my DNS (directly from the AWS EMR console) is ec2-3-15-237-211.us-east-2.compute.amazonaws.com my address would be,
http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998
Insert this in the # TODO # below. For our example, the cell would read,
%spark add -s spark_session -l python -u http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com:8998
In [ ]:
# TODO: Enter your Master Public DNS with the proper formatting and host
In [ ]:
# If you ever need to restart, you may need to…
# %spark delete -s my_session
#OR just factory reset runtime under the runtime tab
Step 0.3: Cluster Log¶
In order to keep track of clusters you have created and terminated as well as give us information about time spent on this assignment, please enter each date and time you created a cluster and the date and time you terminated the cluster. This will not impact your score.
EX:
10/12 9:00am – 10/12 12:00pm
10/13 7:00pm – 10/13 9:00pm
…
TODO: Create cluster log
Enter your 8-digit Penn Key as an integer in the cell
below. This will be used in the autograder.
In [ ]:
%%spark
from penngrader.grader import *
STUDENT_ID = #TODO
In [ ]:
%%spark
grader = PennGrader(homework_id = ‘CIS545_Fall_2020_HW3′, student_id = STUDENT_ID)
Run the following cell to setup the autograder, make sure to have set your 8 digit Penn ID in the cell above. It will also import all the modules you need for the homework.
Note: Since we are using an EMR cluster we will only have access to some of modules that exist for Python, meaning things like pandas, numpy, etc. may not all be available. We have written the entire homework such that the solution does not require any of these.
Step 1: Data Wrangling, Cleaning, and Shaping¶
The data you will use is stored in an S3 bucket, a cloud storage service. You now need to download it onto the nodes of your EMR cluster.
Step 1.1: The Stupendous Schema¶
When loading data, Spark will try to infer it’s structure on it’s own. This process is faulty because it will sometimes infer the type incorrectly. JSON documents, like the one we will use, can have nested types, such as: arrays, arrays of dictionaries, dictionaries of dictionaries, etc. Spark’s ability to determine these nested types is not reliable, thus you will define a schema for linkedin_small_real.json.
A schema is a description of the structure of data. You will be defining an explicit schema for linkedin_small_real.json. In Spark, schema’s are defined using a StructType object. This is a collection of data types, termed StructField’s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,
{
“student_name”: “Data Wrangler”,
“GPA”: 1.4,
“courses”: [
{“department”: “Computer and Information Science”,
“course_id”: “CIS 545”,
“semester”: “Fall 2019”},
{“department”: “Computer and Information Science”,
“course_id”: “CIS 555”,
“semester”: “Fall 2019”}
],
“grad_year”: 2021
}
We would define its schema as follows,
schema = StructType([
StructField(“student_name”, StringType(), nullable=True),
StructField(“GPA”, FloatType(), nullable=True),
StructField(“courses”, ArrayType(
StructType([
StructField(“department”, StringType(), nullable=True),
StructField(“course_id”, StringType(), nullable=True),
StructField(“semester”, StringType(), nullable=True)
])
), nullable=True),
StructField(“grad_year”, IntegerType(), nullable=True)
])
Each StructField has the following structure: (name, type, nullable). The nullable flag defines that the specified field may be empty. Your first task is to define the schema of linkedin_small_real.json. A smaller version of the JSON dataset can be found here.
Note: In linkedin_small_real.json the field specilities is spelled incorrectly. This is not a typo. (Well, it is, but it’s a typo in the raw data, and we have to live with what we’re given.)
There is also no grading cell for this step. But your JSON file won’t load if it’s wrong, so you have a way of testing.
In [ ]:
%%spark
from pyspark.sql.types import *
# TODO: Finish defining the linkedin_small_real.json schema
# We’ve provided most of the fiddly details, but you’ll
# need to fill in the **name** and the **experience**!
schema = StructType([
StructField(“_id”, StringType(), nullable=True),
StructField(“education”, ArrayType(
StructType([
StructField(“start”, StringType(), nullable=True),
StructField(“major”, StringType(), nullable=True),
StructField(“end”, StringType(), nullable=True),
StructField(“name”, StringType(), nullable=True),
StructField(“desc”, StringType(), nullable=True),
StructField(“degree”, StringType(), nullable=True)
])
), nullable=True),
StructField(“group”, StructType([
StructField(“affilition”, ArrayType(StringType()), nullable=True),
StructField(“member”, StringType(), nullable=True)
]), nullable=True),
# TODO: fill in the necessary structure for the name (Don’t forget the comma at the end)!
StructField(“locality”, StringType(), nullable=True),
StructField(“skills”, ArrayType(StringType()), nullable=True),
StructField(“industry”, StringType(), nullable=True),
StructField(“interval”, IntegerType(), nullable=True),
# TODO: fill in structure for experience (Don’t forget the comma at the end)!
StructField(“summary”, StringType(), nullable=True),
StructField(“interests”, StringType(), nullable=True),
StructField(“overview_html”, StringType(), nullable=True),
StructField(“specilities”, StringType(), nullable=True),
StructField(“homepage”, ArrayType(StringType()), nullable=True),
StructField(“honors”, ArrayType(StringType()), nullable=True),
StructField(“url”, StringType(), nullable=True),
StructField(“also_view”, ArrayType(
StructType([
StructField(“id”, StringType(), nullable=True),
StructField(“url”, StringType(), nullable=True)
])
), nullable=True),
StructField(“events”, ArrayType(
StructType([
StructField(“from”, StringType(), nullable=True),
StructField(“to”, StringType(), nullable=True),
StructField(“title1”, StringType(), nullable=True),
StructField(“start”, IntegerType(), nullable=True),
StructField(“title2”, StringType(), nullable=True),
StructField(“end”, IntegerType(), nullable=True)
])), nullable=True)
])
Step 1.2: The Langorous Load¶
Load the linkedin_small_real.json dataset from your S3 bucket into a Spark dataframe (sdf) called raw_data_sdf. If you have constructed schema correctly spark.read.json() will read in the dataset. You do not need to edit this cell.
If this doesn’t work, go back to the prior cell and update your schema!
In [ ]:
%%spark
raw_data_sdf = spark.read.json(“s3a://penn-cis545-files/linkedin_small_real.json”, schema=schema)
In [ ]:
%%spark
raw_data_sdf.show(5)
Importing pandas for the grader and verifying it is version 1.0.5…
In [ ]:
%%spark
import pandas as pd
print(pd.__version__)
The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook. You do not need to edit this cell.
In [ ]:
%%spark
# Create SQL-accesible table
raw_data_sdf.createOrReplaceTempView(“raw_data”)
# Declare SQL query to be excecuted
query = ”’SELECT *
FROM raw_data LIMIT 10”’
# Save the output sdf of spark.sql() as answer_sdf and convert to Pandas
answer_sdf = spark.sql(query).toPandas()
to_submit = pd.read_json(answer_sdf.to_json())
to_submit
In [ ]:
%%spark
## AUTOGRADER Step 1.2: ##
grader.grade(test_case_id = ‘first’, answer = to_submit)
In the next cell, create step_1_2_sdf to fetch the data from the above table, returning rows with schema (_id, given_name), in lexicographical order of _id. (Note _id is a string as opposed to an int.) Limit your sdf to 10 rows. Save your final answer to a variable to_submit as shown above.
In [ ]:
%%spark
# TODO
In [ ]:
%%spark
grader.grade(test_case_id = ‘lex_10_ids_last_names’, answer = to_submit)
Step 1.3: The Extravagant Extraction¶
In our training model, we are interested in when individuals began working at a company. From creating the schema, you should notice that the collection of companies inviduals worked at are contained in the experience field as an array of dictionaries. You should use the org for the company name and start for the start date. Here is an example of an experience field,
{
“experience”: [
{
“org”: “The Walt Disney Company”,
“title” : “Mickey Mouse”,
“end” : “Present”,
“start”: “November 1928”,
“desc”: “Sailed a boat.”
},
{
“org”: “Walt Disney World Resort”,
“title”: “Mickey Mouse Mascot”,
“start”: “January 2005”,
“desc”: “Took pictures with kids.”
}
]
}
Your task is to extract each pair of company and start date from these arrays. In Spark, this is known as “exploding” a row. If you think about how we used relational data to model a nested list in a separate table — that’s basically what explode does to the nested data within linkedin.
Create an sdf called raw_start_dates_sdf that contains the company and start date for every experience of every individual in raw_data_sdf. Drop any row that contains a null in either column with dropna(). Remember we will sort the dataframe when grading so you can sort the elements however you wish (you don’t need to if you don’t want to). The sdf should look similar to:
+————————–+—————+
|org |start_date |
+————————–+—————+
|Walt Disney World Resort |January 2005 |
|The Walt Disney Company |November 1928 |
|… |… |
+————————–+—————+
Hint: You may want to do two separate explodes for org and start. In an explode, the position of the element in the array can be extracted as well, and used to merge two seperate explodes. Reference the function list.
Note: Some of the entires in org are “weird”, i.e. made up of non-english letters and characters. Keep them. DO NOT edit any name in the original dataframe unless we specify. DO NOT drop any row unless there is a null value as stated before. This goes for the rest of the homework as well, unless otherwise specified.
In [ ]:
%%spark
# TODO: Create [raw_start_dates_sdf]
# POSEXPLODE() will explode a specified column of an sdf and return two rows
# corresponding to the index of an element in an array “pos” and the element
# itself.
############################################
# Explode the org array in experience
# Save as a SQL-accesible table called “orgs”
# Explode the start_date array in experience
# Join each explode based on a person’s id and the index of each element in the
# original array
# Define and save raw_start_dates_sdf
In [ ]:
%%spark
# For your info, see if it looks reasonable
raw_start_dates_sdf.show(4)
In [ ]:
%%spark
## AUTOGRADER Step 1.3: ##
raw_start_dates_sdf.createOrReplaceTempView(“test_1_3”)
test_1_3_sdf = spark.sql(“SELECT * FROM test_1_3 ORDER BY org ASC, start_date DESC LIMIT 10”).toPandas()
to_submit = pd.read_json(test_1_3_sdf.to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘explosion’, answer = to_submit)
Step 1.4: The Fortuitous Formatting¶
There are two issues with the values in our date column. First, the values are saved as strings, not datetime types. This halts us from running functions such as ORDER BY or GROUP BY on common months or years. Second, some values do not have both month and year information or are in other languages. Your task is to filter out and clean the date column. We are interested in only those rows that have date in the following format “(month_name) (year)”, e.g. “October 2010”.
Create an sdf called filtered_start_dates_sdf from raw_start_dates_sdf with the date column filtered in the manner above. Keep only those rows with a start date between January 2000 to December 2011, inclusive. Ensure that any dates that are not in our desired format are ommitted. Drop any row that contains a null in either column. The format of the sdf is shown below:
+————————–+—————+
|org |start_date |
+————————–+—————+
|Walt Disney World Resort |2005-01-01 |
|… |… |
+————————–+—————+
Hint: Refer to the function list to format the date column. In Spark SQL the date format we are interested in is “MMMM y”.
Note: Spark will return the date in the format above, with the day as 01. This is ok, since we are interested in the month and year each individual began working and all dates will have 01 as their day.
In [ ]:
%%spark
# TO_DATE() will convert a string to a datetime object. The string’s format,
# i.e. “MMMM y” must be provided. Any string that does not have the specified
# format will be returned as null.
# Use TO_DATE() to convert the start_date column from a string to a datetime
# object. Keep only dates that are between January 2000 and December 2011,
# inclusive.
# TODO: Create [filtered_start_dates_sdf]
# Define and save filtered_start_dates_sdf
In [ ]:
%%spark
## AUTOGRADER Step 1.4: ##
filtered_start_dates_sdf.createOrReplaceTempView(“test_1_4”)
test_1_4_sdf = spark.sql(“SELECT org, DATE_FORMAT(start_date, ‘yyyy-MM-dd’) AS start_date FROM test_1_4 ORDER BY org DESC, start_date DESC LIMIT 20”).toPandas()
to_submit = pd.read_json(test_1_4_sdf.to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘fortuitous’, answer = to_submit)
Step 1.5 The Gregarious Grouping¶
We now want to collect the number of individuals that started in the same month and year for each company. Create an sdf called start_dates_sdf that has the total number of employees who began working at the same company on the same start date. The format of the sdf is shown below:
+————————–+—————+—————+
|org |start_date |num_employees |
+————————–+—————+—————+
|Walt Disney World Resort |2005-01-01 |1 |
|… |… |… |
+————————–+—————+—————+
In [ ]:
%%spark
# TODO: Create [start_dates_sdf]
# GROUP BY on org and start_date, in that order.
# Define and save start_dates_sdf
In [ ]:
%%spark
## AUTOGRADER Step 1.5: ##
start_dates_sdf.createOrReplaceTempView(“test_1_5”)
test_1_5_sdf = spark.sql(“SELECT org, DATE_FORMAT(start_date, ‘yyyy-MM-dd’) as start_date, num_employees FROM test_1_5 ORDER BY num_employees DESC, org DESC, start_date ASC LIMIT 10”).toPandas()
to_submit = pd.read_json(test_1_5_sdf.to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘gregarious’, answer = to_submit)
Step 2: Hiring Trends Analysis¶
Now we will analyze start_dates_sdf to find monthly and annual hiring trends.
Step 2.1: The Marvelous Months¶
Your task is to answer the question: “What is the most popular month for employees to start working?” Create an sdf called monthly_hires_sdf which contains the total number of employees that started working on a specific month, at any company and on any year. The month column should be of type int, i.e. 1-12. The format of the sdf is shown below:
+—————+—————+
|month |num_employees |
+—————+—————+
|1 |… |
|2 |… |
|3 |… |
|… |… |
+—————+—————+
Find the month in which the most employees start working and save its number as an integer to the variable most_common_month. This can be submitted by hardcoding the month to the variable, but we encourage you to try it without hardcoding the integer of the month.
Hint: Be careful. The start dates we have right now have both month and year. We only want the common months. See if you can find something in the function list that will help you do this.
In [ ]:
%%spark
# TODO: Create [monthly_hire_sdf] and find the most common month people were
# hired. Save its number as an integer to [most_common_month]
# HINT: You can aggregate by month, and use ordering to figure out the
# most or least common.
# MONTH() will return the month for any datetime object.
# GROUP BY on the month of start_date using MONTH(). Sum the num_employees
# column to find the total number of employees that started on that month
# Define and save monthly_hires_sdf
In [ ]:
%%spark
## AUTOGRADER Step 2.1: ##
monthly_hires_sdf.createOrReplaceTempView(“test_2_1”)
test_2_1_sdf = spark.sql(“SELECT * FROM test_2_1 ORDER BY month ASC”).toPandas()
to_submit = pd.read_json(test_2_1_sdf.to_json())
print(to_submit)
print(most_common_month)
In [ ]:
%%spark
grader.grade(test_case_id = ‘marvelous’, answer = to_submit)
Step 2.2: The Preposterous Perennial Percentages¶
The next question we will answer is “What is the percentage change in hires between 2009 and 2010 for each company?” Create an sdf called percentage_change_sdf that has the percentage change between 2009 and 2010 for each company. The sdf should look as follows:
+—————————+——————–+
|org |percentage_change |
+—————————+——————–+
|Walt Disney World Resort |12.3 |
|… |… |
+—————————+——————–+
Note: A percentage change can be positive or negative depending
on the difference between the two years. The formula for percent change is given below,
$$\text{% change} = \frac{P_f-P_i}{P_i} \times 100$$Here, $P_f$ is the final element (in this case the number of hires in 2010) and $P_i$ is initial element (the number of hires in 2009).
Hint: This is a nontrivial question and involves you putting the 2009 and 2010 data on the same row. I’m really sorry it isn’t easier, but that’s why you are in the class (I hope!). We recommend using a combination of GROUP BY and JOIN. Keep in mind that operations between columns in SQL dataframes are often easier than those between rows. Come to office hours if you need help.
In [ ]:
%%spark
# TODO: Create [percentage_change_sdf]
# YEAR() will return the year of a datetime object.
# The column percentage_change is calculated by doing a JOIN. We join an sdf
# “y1” created by doing a GROUP BY on start_dates by org and the year of
# start_date where the year is 2009. We sum the number of employees. That sdf is
# joined with a similar sdf, “y2”, but for the case when year is 2010. Then,
# using the formula in the description above we find the percentage_change of
# the two sum columns.
# Define and save percentage_change_sdf
In [ ]:
%%spark
percentage_change_sdf.show()
In [ ]:
%%spark
## AUTOGRADER Step 2.2: ##
percentage_change_sdf.createOrReplaceTempView(“test_2_2”)
test_2_2_sdf = spark.sql(“SELECT * FROM test_2_2 ORDER BY percentage_change DESC, org DESC LIMIT 10”)
to_submit = pd.read_json(test_2_2_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘preposterous’, answer = to_submit)
The Blessed Break¶
That last question was hard. And it’s gonna get harder. Take a break. Sit back and relax for a minute. Listen to some music. Here’s a suggestion.
In the cell below fill out the boolean variable whatd_you_think with True if you liked it or False if you didn’t. You will be graded on your response.
In [ ]:
%%spark
whatd_you_think = # TODO
In [ ]:
%%spark
grader.grade(test_case_id = ‘tunes’, answer = whatd_you_think)
Step 3: Formatting the Training Data¶
Our overarching goal is to train a machine learning (ML) model that will use the monthly hiring trends of a company to predict a positive or negative gain in the company’s stock in the first quarter of the following year. A ML model is trained on a set of observations. Each observation contains a set of features, X, and a label, y. The goal of the ML model is to create a function that takes any X as an input and outputs a predicted y.
The machine learning model we will use is a Random Forest Classifier. Each observation we will pass in will have 24 features (columns). These are the number of people hired from Jan to Dec and the company stock price on the last day of each month. The label will be the direction of the company’s stock percentage change (positive, 1, or negative, -1) in the first quarter of the following year. Each observation will correspond to a specified company’s trends on a specified year. The format of our final training sdf is shown below. The first 26 columns define our observations, X, and the last column the label, y.
+—-+—–+———-+———+———-+———-+———+———-+————-+
|org |year |jan_hired | … |dec_hired |jan_stock | … |dec_stock |stock_result |
+—-+—–+———-+———+———-+———-+———+———-+————-+
|IBM |2009 |… | … |… |… | … |… |1 |
|IBM |2010 |… | … |… |… | … |… |-1 |
|… |… |… | … |… |… | … |… |… |
+—-+—–+———-+———+———-+———-+———+———-+————-+
Note: We will use the first three letters of each month in naming, i.e. jan, feb, mar, apr, may, jun, jul, aug, sep, oct, nov, dec.
Step 3.1: The Harmonious Hires¶
Your first task is to create the first half of the training table, i.e. the jan_hired through dec_hired columns. This will involve reshaping start_dates_sdf. Currently, start_dates_sdf has columns org, start_date, and num_employees. We want to group the rows together based on common org and years and create new columns for the number of employees that started working in each month of that year.
Create an sdf called raw_hirings_for_training_sdf that has for a single company and a single year, the number of hires in Jan through Dec, and the total number of hires that year. Note that for each company you will have several rows corresponding to years between 2000 and 2011. It is ok if for a given company you don’t have a given year. However, ensure that for a given company and given year, each month column has an entry, i.e. if no one was hired the value should be 0. The format of the sdf is shown below:
+—-+—–+———-+———+———-+———-+
|org |year |jan_hired | … |dec_hired |total_num |
+—-+—–+———-+———+———-+———-+
|IBM |2008 |… | … |… |… |
|IBM |2009 |… | … |… |… |
|… |… |… | … |… |… |
+—-+—–+———-+———+———-+———-+
Hint: This is a fiddly and somewhat difficult question. I’m really really sorry. The tricky part is creating the additional columns of monthly hires, specifically when there are missing dates. In our dataset, if a company did not hire anybody in a given date, it will not appear in start_dates_sdf.
We suggest you look into CASE and WHEN statements in the function list, and use these to either fill in a number for column (if appropriate) or put in a 0.
In [ ]:
%%spark
# TODO: Create [raw_hire_train_sdf]
# CASE() statements are SQL’s equivalent of if else statements. WHEN a CASE is
# true THEN we define a function. ELSE we do another function and then END the
# statement.
# The query is a GROUP BY. We group data based on the same company and year, as
# in the previous step. We then do a CASE statement. This will seperate out the
# sets of data corresponding to the same month using MONTH() in the WHEN clause.
# If we have a piece of data, it will be the number of employees that started
# working at a given company on a given year and a given month and we will save
# it with a corresponding column name. If there is no piece of data here, as per
# the question, we need to add a 0. This is the ELSE clause. Lastly, we do a
# SUM() to find total_num
# Define and save raw_hire_train_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.1: ##
raw_hire_train_sdf.createOrReplaceTempView(“test_3_1”)
test_3_1_sdf = spark.sql(“SELECT * FROM test_3_1 ORDER BY org DESC, year ASC LIMIT 10”)
to_submit = pd.read_json(test_3_1_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘harmonious’, answer = to_submit)
Step 3.2: The Formidable Filters¶
Create an sdf called hire_train_sdf that contains all the observations in raw_hire_train_sdf with total_num greater than or equal to 100. The format of the sdf is shown below:
+—-+—–+———-+———+———-+———-+
|org |year |jan_hired | … |dec_hired |total_num |
+—-+—–+———-+———+———-+———-+
|IBM |2008 |… | … |… |… |
|IBM |2009 |… | … |… |… |
|… |… |… | … |… |… |
+—-+—–+———-+———+———-+———-+
In [ ]:
%%spark
# TODO: Create [hire_train_sdf]
# Keep all rows where total_num >= 100
# Define and save hire_train_sdf
In [ ]:
%%spark
hire_train_sdf.show(5)
In [ ]:
%%spark
## AUTOGRADER Step 3.2: ##
hire_train_sdf.createOrReplaceTempView(“test_3_2”)
test_3_2_sdf = spark.sql(“SELECT * FROM test_3_2 ORDER BY org DESC, year ASC LIMIT 10”)
to_submit = pd.read_json(test_3_2_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘formidable’, answer = to_submit)
Step 3.3: The Stupendous Stocks¶
Now we are ready for the stock data. The stock data we will use is saved in the same S3 bucket as linkedin.json. Load the data into the EMR cluster. Run the cell below. You do not need to edit this cell.
In [ ]:
%%spark
# Load stock data
raw_stocks_sdf = spark.read.format(“csv”) \
.option(“header”, “true”) \
.load(“s3a://penn-cis545-files/stocks.csv”)
# Creates SQL-accesible table
raw_stocks_sdf.createOrReplaceTempView(‘raw_stocks’)
# Display the first 10 rows
query = ”’SELECT *
FROM raw_stocks”’
spark.sql(query).show(10)
Run the cell below to see the types of the columns in our data frame. These are not correct. We could have defined a schema when reading in data but we will handle this issue in another manner. You will do this in Step 3.4.2.
In [ ]:
%%spark
# Print types of SDF
raw_stocks_sdf.dtypes
Step 3.4 The Clairvoyant Cleaning¶
We now want to format the stock data set into the second half of the training table. We will then merge it with hire_train based off the common org and year fields. The formatting will consist of 4 steps. Actually, it is 5.
Step 3.4.1 The Ubiquitous UDF¶
The companies in our stock dataset are defined by their stock tickers. Thus, we would not be able to merge it with the org field in hire_train_sdf. We must convert them to that format. Often times when using Spark, there may not be a built-in SQL function that can do the operation we desired. Instead, we can create one on our own with a user-defined function (udf).
A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, TICKER_TO_NAME() that will convert the ticker field in raw_stocks to the company’s name. This will be done using the provided ticker_to_name_dict dictionary. We are only interested in the companies in that dictionary.
Fill out the function ticker_to_name() below. Then use spark.udf.register() to register it as a SQL function. The command is provided. You do not need to edit it. Note, we have defined the udf as returning StringType(). Ensure that your function returns this. You must also deal with any potential null cases.
In [ ]:
%%spark
# Dictionary linking stock ticker symbols to their names
ticker_to_name_dict = {‘NOK’: ‘Nokia’,
‘UN’: ‘Unilever’,
‘BP’: ‘BP’,
‘JNJ’: ‘Johnson & Johnson’,
‘TCS’: ‘Tata Consultancy Services’,
‘SLB’: ‘Schlumberger’,
‘NVS’: ‘Novartis’,
‘CNY’: ‘Huawei’,
‘PFE’: ‘Pfizer’,
‘ACN’: ‘Accenture’,
‘DELL’: ‘Dell’,
‘MS’: ‘Morgan Stanley’,
‘ORCL’: ‘Oracle’,
‘BAC’: ‘Bank of America’,
‘PG’: ‘Procter & Gamble’,
‘CGEMY’: ‘Capgemini’,
‘GS’: ‘Goldman Sachs’,
‘C’: ‘Citi’,
‘IBM’: ‘IBM’,
‘CS’: ‘Credit Suisse’,
‘MDLZ’: ‘Kraft Foods’,
‘WIT’: ‘Wipro Technologies’,
‘CSCO’: ‘Cisco Systems’,
‘PWC’: ‘PwC’,
‘GOOGL’: ‘Google’,
‘CTSH’: ‘Cognizant Technology Solutions’,
‘HSBC’: ‘HSBC’,
‘DB’: ‘Deutsche Bank’,
‘MSFT’: ‘Microsoft’,
‘HPE’: ‘Hewlett-Packard’,
‘ERIC’: ‘Ericsson’,
‘BCS’: ‘Barclays Capital’,
‘GSK’: ‘GlaxoSmithKline’}
# TODO: Fill out [ticker_to_name()] and register it as a udf.
# Fill out ticker_to_name()
# In UDFs we have to cover all possible output cases, or else the function will
# crash. Specifically, this means we need to handle the case when “ticker” is
# not in “ticker_to_name_dict”. We use a try and except statement to return null
# for this case.
def ticker_to_name(ticker):
# Register udf as a SQL function. DO NOT EDIT
spark.udf.register(“TICKER_TO_NAME”, ticker_to_name, StringType())
Submit a tuple to the autograder for the ticker value of Google and Tesla. If the ticker value isn’t in the table, set it to a string equal to “None”
In [ ]:
%%spark
## AUTOGRADER Step 3.4.1: ##
print((str(ticker_to_name(“GOOGL”)),str(ticker_to_name(“TSLA”))))
to_submit = ((str(ticker_to_name(“GOOGL”)),str(ticker_to_name(“TSLA”))))
In [ ]:
%%spark
grader.grade(test_case_id = ‘clairvoyant’, answer = to_submit)
Step 3.4.2: The Fastidious Filters¶
With our new TICKER_TO_NAME() function we will begin to wrangle raw_stocks_sdf.
Create an sdf called filter_1_stocks_sdf as follows. Convert all the ticker names in raw_stocks_sdf to the company names and save it as org. Next, convert the date field to a datetime type. As explained before this will help order and group the rows in future steps. Then, convert the type of the values in closing_price to float. This will take care of the dtypes issue we saw in Step 3.3.
Drop any company names that do not appear in ticker_to_name_dict. Using .dropna() is acceptable instead of IS NOT NULL. Keep any date between January 1st 2001 and December 4th 2012 inclusive, in the format shown below (note this is a datetime object not a string):
+—-+————+————–+
|org |date |Close |
+—-+————+————–+
|IBM |2000-01-03 |… |
|… |… |… |
+—-+————+————–+
Hint: You will use a similar function to filter the dates as in Step 1.4. In Spark SQL the format for the date field in raw_stocks_sdf is “yyyy-MM-dd”.
In [ ]:
%%spark
# Format the “org” column using our UDF, TICKER_TO_NAME. Use TO_DATE() to
# convert the string date column to datetime object and filter on this in the
# same way as Step 1.4
# TODO: Create [filter_1_stocks_sdf]
# Define and save filter_1_stocks_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.4.2: ##
filter_1_stocks_sdf.createOrReplaceTempView(“test_3_4_2”)
test_3_4_2_sdf = spark.sql(“SELECT org, DATE_FORMAT(date, ‘yyyy-MM-dd’) as date, Close FROM test_3_4_2 ORDER BY org, date, Close LIMIT 10”)
to_submit = pd.read_json(test_3_4_2_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘fastidious’, answer = to_submit)
Step 3.4.3: The Momentus Months¶
The data in filter_1_stocks_sdf gives closing prices on a daily basis. Since we are interested in monthly trends, we will only keep the closing price on the last trading day of each month.
Create an sdf filter_2_stocks_sdf that contains only the closing prices for the last trading day of each month. Note that a trading day is not simply the last day of each month, as this could be on a weekend when the market is closed . The format of the sdf is shown below:
+—-+————+————–+
|org |date |Close |
+—-+————+————–+
|IBM |2000-01-31 |… |
|… |… |… |
+—-+————+————–+
Hint: This is a difficult question. But if you made it this far, you’re a star by now. It may be helpful to create an intermediate dataframe that will help you filter out the specific dates you desire.
In [ ]:
%%spark
# TODO: Create [filter_2_stocks_sdf]
# Create sdf that has for each company, the closing day for each month. We need
# to preform a GROUP BY on three features, org, YEAR(date), and MONTH(date).
# This will give us aggregations of the closing stock price for every day of a
# specified month and a specified year. Since these are all datetime objects,
# taking MAX() will give us the highest, i.e. last, one.
# Save as a temporary sdf, desired_months
# Merge desired_months with filter_1_stocks. This will allow us to keep the
# closing prices for only those dates that were the closing date for a given
# month.
# Define and save filter_2_stocks_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.4.3: ##
filter_2_stocks_sdf.createOrReplaceTempView(“test_3_4_3”)
test_3_4_3_sdf = spark.sql(“SELECT org, DATE_FORMAT(date, ‘yyyy-MM-dd’) as date, Close FROM test_3_4_3 ORDER BY org, date LIMIT 10”)
to_submit = pd.read_json(test_3_4_3_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘momentus’, answer = to_submit)
Step 3.4.4: The Really Random Reshape¶
Now, we will begin to shape our dataframe into the format of the final training sdf.
Create an sdf filter_3_stocks_sdf that has for a single company and a single year, the closing stock price for the last trading day of each month in that year. This is similar to the table you created in Step 3.1. In this case since we cannot make a proxy for the closing price if the data is not avaliable, drop any rows containing any null values, in any column. The format of the sdf is shown below:
+—-+—–+———-+———+———-+
|org |year |jan_stock | … |dec_stock |
+—-+—–+———-+———+———-+
|IBM |2008 |… | … |… |
|IBM |2009 |… | … |… |
|… |… |… | … |… |
+—-+—–+———-+———+———-+
In [ ]:
%%spark
# TODO: Create [filter_3_stocks_sdf]
# We will do the same operation we did in Step 3.1. In this case, however, as
# the question specifies, any missing entry in a given month are set to null.
# Define and save filter_3_stocks_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.4.4: ##
filter_3_stocks_sdf.createOrReplaceTempView(“test_3_4_4”)
test_3_4_4_sdf = spark.sql(“SELECT * FROM test_3_4_4 ORDER BY org, year LIMIT 12”)
to_submit = pd.read_json(test_3_4_4_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘random’, answer = to_submit)
Step 3.4.5: The Decisive Direction¶
The final element in our training set is the binary output for each case, i.e. the y label.
Create an sdf stocks_train_sdf from filter_3_stocks_sdf with an additional column direction. This should be the direction of percentage change in the closing stock price, i.e. 1 for positive or -1 for negative, in the first quarter of a given year. Make this an integer. The quarter of a year begins in January and ends in April, inclusive. We want to know the percent change between these two months. Reference Step 2.2 for the percent change formula. The format of the sdf is shown below:
+—-+—–+———-+———+———-+————-+
|org |year |jan_stock | … |dec_stock |direction |
+—-+—–+———-+———+———-+————-+
|IBM |2008 |… | … |… |1 |
|IBM |2009 |… | … |… |-1 |
|… |… |… | … |… |… |
+—-+—–+———-+———+———-+————-+
In [ ]:
%%spark
# TODO: Create [stocks_train_sdf]
# SIGN() will return -1 if the input is negative, 0 if the input is zero, and 1
# if the input is positive.
# Keep all rows in filter_3_stocks and add another based on the sign of the
# percentage change in stock
# Define and save stocks_train_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.4.5: ##
stocks_train_sdf.createOrReplaceTempView(“test_3_4_5”)
test_3_4_5_sdf = spark.sql(“SELECT * FROM test_3_4_5 ORDER BY org, year LIMIT 10”)
to_submit = pd.read_json(test_3_4_5_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
grader.grade(test_case_id = ‘decisive’, answer = to_submit)
Step 3.5: The C-r-a-z-y Combination¶
Now that we have individually created the two halves of our training data we will merge them together to create the final training sdf we showed in the beginning of Step 3.
Create an sdf called training_sdf in the format of the one shown at the beginning of Step 3. Note that in our definition for the stock_result column, the stock_result value for a particular year corresponds to the direction of the stock percentage change in the following year. For example, the stock_result in the 2008 row for IBM will contain the direction of IBM’s stock in the first quarter of 2009. The format of the sdf is shown below:
+—-+—–+———-+———+———-+———-+———+———-+————-+
|org |year |jan_hired | … |dec_hired |jan_stock | … |dec_stock |stock_result |
+—-+—–+———-+———+———-+———-+———+———-+————-+
|IBM |2008 |… | … |… |… | … |… |-1 |
|IBM |2009 |… | … |… |… | … |… |1 |
|… |… |… | … |… |… | … |… |… |
+—-+—–+———-+———+———-+———-+———+———-+————-+
In [ ]:
%%spark
# TODO: Create [training_sdf]
# Our merge will consist of two joins. The first will use filter_3_stocks to
# join the monthly hiring rates and closing prices. The next join will be with
# stock_train and to find stock_result. This join will be done such that the
# correct years are matched between hire_train and stocks_train (think about how
# to get one year’s stock result to be the direction of the stock for the following
# year’s first quarter).
# Define and save training_sdf
In [ ]:
%%spark
## AUTOGRADER Step 3.5: ##
training_sdf.createOrReplaceTempView(“test_3_5”)
test_3_5_sdf = spark.sql(“SELECT * FROM test_3_5 ORDER BY org, year LIMIT 10”)
to_submit = pd.read_json(test_3_5_sdf.toPandas().to_json())
to_submit
In [ ]:
%%spark
sum(to_submit[‘nov_stock’].values)
In [ ]:
%%spark
grader.grade(test_case_id = ‘crazy’, answer = to_submit)
Step 4: Machine … Learning?¶
Well here we go. Who’s ready to make some money? Well… it’s not gonna happen. We didn’t code the random forest model, sorry! The second half of the course will be about scalable machine learning, and we will learn how to take this beautiful data and make billions of dollars.
One last thing, as I predicted before, you’re a star.
Feel free to fill out this form with any feedback for this and prior homeworks.
Optional Extra-Credit Step: Full PageRank on Spark (5 points)¶
We’ve given a basic implementation of MapReduce using Spark’s matrix types in:
https://colab.research.google.com/drive/1Mr2zf-Oz6W9kRFrzSN08S1lo14mGYnw2
Your task for extra credit, worth up to 5 points, is to take this basic example and flesh it out:
You should write a function called pagerank that takes two inputs: (1) a Spark dataframe df conforming to the schema of initial_graph, (2) an integer n specifying the max number of iterations until termination. It returns a Spark dataframe with two columns: node_id and pagerank (the latter can be of type double), where the latter is the PageRank score after $n$ iterations.
Your PageRank algorithm should incorporate the standard “decay factor” as we’ve described in the lecture slides. Use the standard value $\alpha=0.85$. It should use Apache Spark matrices to do the computation.
Your PageRank algorithm should remove sinks and self-loops.
In [ ]:
# EXTRA CREDIT HERE
HW Submission¶
Double check that you have the correct PennID (all numbers) in the autograder.
Go to the “File” tab at the top left, and click “Download .ipynb”. Zip it (name doesn’t matter) and submit it to GradeScope.
You must submit your notebook to receive credit.