7CCSMBDT – Big Data Technologies Practical
SPARK
1. Download the file ml-100k.zip from https://grouplens.org/datasets/movielens/100k/
Extract it in one directory of your choice, say /home/cloudera/Desktop/ml
2. Launching the pyspark shell Go to the directory ~/Desktop and type pyspark –master local[2]
After many lines of text, you should see
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Python version 2.6.6 …
SparkContext available as sc, HiveContext available as sqlContext. >>>
3. Checking the SparkContext Inside the pyspark shell, type
sc
You must see something like
4. Creating RDDs with textFile Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines.
Our RDD will be called fs and it will be based on the textfile u.user which is contained in the directory where you extracted ml-100k.zip . Assuming u.user is contained in /home/cloudera/Desktop/ml , the RDD can be created with:
fs=sc.textFile(“file:///home/cloudera/Desktop/ml/u.user”)
7CCSMBDT – Big Data Technologies Practical
On successful output, you must see something like
17/02/15 17:12:39 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 198.4 KB, free 425.1 KB)
17/02/15 17:12:39 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 23.4 KB, free 448.5 KB)
17/02/15 17:12:39 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:49936 (size: 23.4 KB, free: 534.5 MB)
17/02/15 17:12:39 INFO spark.SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
5. RDD Actions The u.user file contains demographic information about the users. It is a tab separated list of
user id | age | gender | occupation | zip code Now that we have created the fs RDD, we can apply actions to it.
Read about actions: https://spark.apache.org/docs/1.6.0/programming-guide.html#actions (i) Apply an action to return the number of elements in the fs RDD.
SOL: fs.count()
(ii) Apply an action to return all the elements of the fs RDD
SOL: fs.collect()
(iii) Create an array, array_fs_sample , with a random sample of fs, which is created with replacement and has size 5.
SOL: array_fs_sample=fs.takeSample(True,5)
(iv) Use the parallelize() method to create a new RDD fs2 which contains all elements of array_fs_sample and then count the elements of fs2. You must see the last line being 5.
SOL:
fs2=sc.parallelize(array_fs_sample) fs2.count()
7CCSMBDT – Big Data Technologies Practical
(v) This is to be done in a new terminal. Create a directory pyspark_tmp under /.
To do this, type each of the following: cd /
sudo mkdir pyspark_tmp
sudo chmod 777 pyspark_tmp
(vi) Now return to the terminal where pyspark runs. Use the SaveAsTextFile() action to save the contents of fs2 into a subdirectory fs2 under /pyspark_tmp/
Check the contents of the files in the subdirectory you created, from the terminal that was launched in (v).
SOL: In pyspark:
fs2.saveAsTextFile(“file:///pyspark_tmp/fs2”)
In the terminal launched in (v):
cat /pyspark_tmp/fs2/*
(vii) Write a function myprint which prints its argument. To do this type the following (hit
enter after : and hit enter two times after print x ):
def myprint(x): print x
(viii) Read about the foreach action and apply it to fs2 using myprint as an argument. You must see the five lines of fs2 (among INFO information).
SOL: fs2.foreach(myprint)
(ix) Write a function which prints only the user-id (i.e., the first part before “I” of each element), and apply it to the first 20 elements of fs . The elements must be sorted using their natural order.
SOL: First define the function:
def myf(x): tokens=x.split(“|”) print tokens[0]
7CCSMBDT – Big Data Technologies Practical
Then, use the function: sc.parallelize(fs.takeOrdered(20)).foreach(myf)
Here we convert the output of takeOrdered to RDD with parallelize() before applying foreach.
(x) Another way to pass a function, if it is short, is to use lambda expressions. See
https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions
Using a lambda expression, count how many ids in fs2 are <370.
Hint: Check the filter() action which will take as argument your lambda expression.
SOL:
fs2.filter(lambda x: int(x.split(“|”)[0])<370).count()
(xi) Using a lambda function, create an RDD called student , which contains the elements of fs2 that have the word “student” in them.
SOL:
student=fs2.filter(lambda x: “student” in x)
6. RDD Transformations
Read the section “Transformations” from
https://spark.apache.org/docs/1.6.0/programming-guide.html#basics
For examples and the API see
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD
(i) Read the section Parallelized collections from
https://spark.apache.org/docs/1.6.0/programming-guide.html#basics
First, create a parallelized collection num with the integers 1 to 5 . Then, create a new RDD power_num containing each number in num raised in the power of 3.
HINT: Use the map transformation and a lambda expression. SOL:
num=sc.parallelize([1,2,3,4,5]) power_num=num.map(lambda x: x*x*x*)
7CCSMBDT – Big Data Technologies Practical
(ii) Create a new RDD power_num2 containing each number in num that is >=4, raised in the power of 3.
SOL:
power_num2=num.filter(lambda x: x>=4).map(lambda x: x*x*x)
(iii) Read about flatMap from the section “Transformations” above.
The flatMap transformation is useful when we want to produce multiple output elements for each input element. Like with map, the function we provide to flatMap is called individually for each element in our input RDD. However, instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD which consists of the elements from all of the iterators.
Create a parallelized collection by typing:
words=sc.parallelize([“This is”,” a sentence”])
Use flatMap to create an RDD words2 that contains each word in words.
The output of words2.collect() must be [‘This’,’is’, ‘a’, ‘sentence’]
SOL:
words2=words.flatMap(lambda x: x.split(“ “))
(iv) Create a parallelized collection num2 with the integers 3 to 7. SOL:
num2=sc.parallelize([3,4,5,6,7])
(v) Create a new RDD, num3 , containing the distinct elements from both num and num2
SOL: num3=num.union(num2).distinct()
(vi) Create a new RDD containing only the common elements of num and num2 SOL:
num.intersection(num2)
7CCSMBDT – Big Data Technologies Practical
(vii) Create a new RDD containing the cartesian product of num and num2.
Sol: num.cartesian(num2)
(viii) By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the cache method, in which case Spark will keep the elements in memory for much faster access the next time you query it. Read the section “RDD persistence” from https://spark.apache.org/docs/1.2.0/programming-guide.html
Create an RDD as follows and apply count() to it. How much time does count() take?
large_list=sc.parallelize(range(1,10000000))
Then, use cache() to cache large_list. Apply count() two times, and check the timings.
SOL: Before caching:
Job 0 finished: count at
After caching:
Job 1 finished: count at
7. Writing standalone programs
The pyspark shell is useful for testing simple programs. To run larger programs or existing code, we should use the spark-submit script in Spark’s bin directory. Read
https://spark.apache.org/docs/1.6.0/submitting-applications.html and especially the section “Launching Applications with spark-submit”.
(i) Create a file negate_spark.py and type the following in it:
from pyspark import SparkContext
sc = SparkContext( ‘local’, ‘pyspark’) nums=sc.parallelize(range(1,50)) my_list=nums.map(lambda x: -x).collect() print(sorted(my_list,reverse=False))
7CCSMBDT – Big Data Technologies Practical
Observe that we need to create SparkContext in the program, while the pyspark shell does this automatically. Then, we can apply the actions and transformations to RDDs, as in pyspark shell and also use python functions, on the lists created by Spark methods such as collect. For example, negate_spark.py creates a parallelized collection with the integers in [1,50), uses map with a lambda expression that negates each integer, and then collect() outputs a list, my_list , containing the negated integers. Last, we apply the sorted() method from python to output the list in increasing order.
To execute negate_spark.py type the following in a terminal:
spark-submit negate_spark.py >negate_spark.out
The >negate_spark.out redirects the output to a file, which you can then view with
cat negate_spark.out
The file should include [-49, -48, …, -2, -1].
(ii) Write a program that computes the total number of attribute values in the file u.user we used in the part Creating RDDs with textFile . By attribute value, we mean the value in user id, age, gender, occupation, or zip code. For example, the following part of u.user has 5 attribute values (namely 1,24,M,technician, 85711):
1|24|M|technician|85711
Note that your program needs to load the file u.user again.
SOL:
from operator import add
from pyspark import SparkContext
sc = SparkContext( ‘local’, ‘pyspark’) fs=sc.textFile(“file:///home/cloudera/Downloads/ml-100k/u.user”) print(fs.map(lambda x: len(x.split(“|”))).reduce(add))
7CCSMBDT – Big Data Technologies Practical
(iii) Complete the following program so that it computes the total number of users in u.user with age in [20,30) (i.e., the return value of age_group below is ’20-30’).
from pyspark import SparkContext sc = SparkContext( ‘local’, ‘pyspark’) def age_group(age):
if age < 10 : return '0-10'
elif age < 20: return '10-20'
elif age < 30: return '20-30'
elif age < 40: return '30-40'
elif age < 50: return '40-50'
elif age < 60: return '50-60'
elif age < 70: return '60-70'
elif age < 80: return '70-80'
else :
return '80+'
def parse_with_age_group(data):
userid,age,gender,occupation,zip = data.split("|")
return userid, age_group(int(age)),gender,occupation,zip,int(age)
7CCSMBDT – Big Data Technologies Practical
SOL:
Append the following lines:
fs=sc.textFile("file:///home/cloudera/Desktop/ml/u.user ") #parse each element of fs
data_with_age_group=fs.map(parse_with_age_group)
#get users with age in [20,30) data_with_age_20_30=data_with_age_group.filter(lambda x: '20-30' in x)
#count the elements of the RDD and print the result print(data_with_age_20_30.count())
(iv) Complete the given program in (iii), so that it outputs the number of users in u.user whose age is in [20,30) , grouped by zip code. For example, given the following input file:
1|24|M|technician|11 2|53|F|other|3 3|23|M|writer|7 4|24|M|technician|7 8|36|M|administrator|21 9|29|M|student|2
the output will be something like:
Zipcode : 7, Number of users: 2 Zipcode: 11, Number of users: 1
Zipcode: 2, Number of users: 1
Note that the zip code is the last attribute and the users with age 53 and 36 are excluded.
HINT: Use countByValue() as described in
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext
7CCSMBDT – Big Data Technologies Practical
SOL:
Append the following lines:
fs=sc.textFile("file:///home/cloudera/Desktop/ml/u.user ") data_with_age_group=fs.map(parse_with_age_group) data_with_age_20_30=data_with_age_group.filter(lambda x: '20-30' in x)
#obtain the zipcode with map, and return the count of each unique zipcode value as a dictionary with (count, zipcode) pairs. freq_per_zip=data_with_age_20_30.map(lambda x: x[4]).countByValue()
#iterate over the dictionary freq_per_zip and print for zipcode,freq in dict(freq_per_zip).items():
print("Zipcode: {0}, Number of users: {1}".format(zipcode, freq))
(v) Read about accumulators from http://spark.apache.org/docs/latest/programming-
guide.html#accumulators
Using accumulators, complete the given program in (iii), so that it outputs the number of users in each age group [60,70) , [70,80), and 80+ , for the users in the file u.user
The output should be something like: Age in [60,70): 27
Age in [70,80): 4
Age in 80+: 0
7CCSMBDT – Big Data Technologies Practical
SOL:
#we define a function to increase accumulators when we see their corresponding age groups
def large_ages(data): if data[1]=="60-70":
accum1.add(1) if data[1]=="70-80":
accum2.add(1) if data[1]=="80+": accum3.add(1)
fs=sc.textFile("file:///home/cloudera/Downloads/ml-100k/u.user") data_with_age_group=fs.map(parse_with_age_group)
#we initialize the accumulators with zero accum1=sc.accumulator(0)
accum2=sc.accumulator(0) accum3=sc.accumulator(0)
#we apply the function large_ages into each element of data_with_age_group data_with_age_group.map(large_ages).collect()
print("Age in [60,70): ",accum1)
print("Age in [70,80): ",accum2)
print("Age in 80+: ",accum3)
(vi) Complete the given program in (iii), so that it outputs the occupations that are performed by users in age group [40,50) and also by users in age group [50,60) , in alphabetical order. Each of these occupations should appear only once.
HINT: use the intersection() on the RDD containing the occupations for the age group [40,50) and the RDD containing the occupations for the age group [50,60).
7CCSMBDT – Big Data Technologies Practical
SOL:
data_with_age_group=fs.map(parse_with_age_group) data_with_age_40_50=data_with_age_group.filter(lambda x: '40-50' in x) data_with_age_50_60=data_with_age_group.filter(lambda x: '50-60' in x)
#we obtain the occupations for each age_group with map(), apply distinct to get rid of #duplicates and then perform the intersection common=(data_with_age_40_50.map(lambda x: x[3]).distinct()).intersection (data_with_age_50_60.map(lambda x: x[3]).distinct())
print(sorted(common.collect()))