7CCSMBDT – Big Data Technologies Practical
SPARK
The lab is to be done in Cloudera.
1. Download the file ml-100k.zip from https://grouplens.org/datasets/movielens/100k/ Extract it in one directory of your choice, say /home/cloudera/Desktop/ml
2. Launching the pyspark shell Go to the directory ~/Desktop and type
pyspark –master local[2]
After many lines of text, you should see
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Python version 2.6.6 …
SparkContext available as sc, HiveContext available as sqlContext. >>>
3. Checking the SparkContext Inside the pyspark shell, type
sc
You must see something like
4. Creating RDDs with textFile Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines.
Our RDD will be called fs and it will be based on the textfile u.user which is contained in the directory where you extracted ml-100k.zip . Assuming u.user is contained in /home/cloudera/Desktop/ml , the RDD can be created with:
fs=sc.textFile(“file:///home/cloudera/Desktop/ml/u.user”)
7CCSMBDT – Big Data Technologies Practical
On successful output, you must see something like
17/02/15 17:12:39 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 198.4 KB, free 425.1 KB)
17/02/15 17:12:39 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 23.4 KB, free 448.5 KB)
17/02/15 17:12:39 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:49936 (size: 23.4 KB, free: 534.5 MB)
17/02/15 17:12:39 INFO spark.SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2
5. RDD Actions The u.user file contains demographic information about the users. It is a tab separated list of
user id | age | gender | occupation | zip code Now that we have created the fs RDD, we can apply actions to it.
Read about actions: https://spark.apache.org/docs/1.6.0/programming-guide.html#actions
(i) Apply an action to return the number of elements in the fs RDD. (ii) Apply an action to return all the elements of the fs RDD
(iii) Create an array, array_fs_sample , with a random sample of fs, which is created with replacement and has size 5.
(iv) Use the parallelize() method to create a new RDD fs2 which contains all elements of array_fs_sample and then count the elements of fs2. You must see the last line being 5.
(v) This is to be done in a new terminal. Create a directory pyspark_tmp under /.
To do this, type each of the following: cd /
sudo mkdir pyspark_tmp
sudo chmod 777 pyspark_tmp
7CCSMBDT – Big Data Technologies Practical
(vi) Now return to the terminal where pyspark runs. Use the SaveAsTextFile() action to save the contents of fs2 into a subdirectory fs2 under /pyspark_tmp/
Check the contents of the files in the subdirectory you created, from the terminal that was launched in (v).
(vii) Write a function myprint which prints its argument. To do this type the following (hit enter after : and hit enter two times after print x ):
def myprint(x): print x
(viii) Read about the foreach action and apply it to fs2 using myprint as an argument. You must see the five lines of fs2 (among INFO information).
(ix) Write a function which prints only the user-id (i.e., the first part before “I” of each element), and apply it to the first 20 elements of fs . The elements must be sorted using their natural order.
(x) Another way to pass a function, if it is short, is to use lambda expressions. See
https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions
Using a lambda expression, count how many ids in fs2 are <370.
Hint: Check the filter() action which will take as argument your lambda expression.
(xi) Using a lambda function, create an RDD called student , which contains the elements of fs2 that have the word “student” in them.
6. RDD Transformations
Read the section “Transformations” from
https://spark.apache.org/docs/1.6.0/programming-guide.html#basics
For examples and the API see
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD
(i) Read the section Parallelized collections from
https://spark.apache.org/docs/1.6.0/programming-guide.html#basics
7CCSMBDT – Big Data Technologies Practical
First, create a parallelized collection num with the integers 1 to 5 . Then, create a new RDD power_num containing each number in num raised in the power of 3.
HINT: Use the map transformation and a lambda expression.
(ii) Create a new RDD power_num2 containing each number in num that is >=4, raised in the power of 3.
(iii) Read about flatMap from the section “Transformations” above.
The flatMap transformation is useful when we want to produce multiple output elements for each input element. Like with map, the function we provide to flatMap is called individually for each element in our input RDD. However, instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD which consists of the elements from all of the iterators.
Create a parallelized collection by typing:
words=sc.parallelize([“This is”,” a sentence”])
Use flatMap to create an RDD words2 that contains each word in words.
The output of words2.collect() must be [‘This’,’is’, ‘a’, ‘sentence’]
(iv) Create a parallelized collection num2 with the integers 3 to 7.
(v) Create a new RDD, num3 , containing the distinct elements from both num and num2 (vi) Create a new RDD containing only the common elements of num and num2
(vii) Create a new RDD containing the cartesian product of num and num2.
7CCSMBDT – Big Data Technologies Practical
(viii) By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the cache method, in which case Spark will keep the elements in memory for much faster access the next time you query it. Read the section “RDD persistence” from https://spark.apache.org/docs/1.2.0/programming-guide.html
Create an RDD as follows and apply count() to it. How much time does count() take?
large_list=sc.parallelize(range(1,10000000))
Then, use cache() to cache large_list. Apply count() two times, and check the timings.
7. Writing standalone programs
The pyspark shell is useful for testing simple programs. To run larger programs or existing code, we should use the spark-submit script in Spark’s bin directory. Read
https://spark.apache.org/docs/1.6.0/submitting-applications.html and especially the section “Launching Applications with spark-submit”.
(i) Create a file negate_spark.py and type the following in it:
from pyspark import SparkContext
sc = SparkContext( ‘local’, ‘pyspark’) nums=sc.parallelize(range(1,50)) my_list=nums.map(lambda x: -x).collect() print(sorted(my_list,reverse=False))
Observe that we need to create SparkContext in the program, while the pyspark shell does this automatically. Then, we can apply the actions and transformations to RDDs, as in pyspark shell and also use python functions, on the lists created by Spark methods such as collect. For example, negate_spark.py creates a parallelized collection with the integers in [1,50), uses map with a lambda expression that negates each integer, and then collect() outputs a list, my_list , containing the negated integers. Last, we apply the sorted() method from python to output the list in increasing order.
To execute negate_spark.py type the following in a terminal:
spark-submit negate_spark.py >negate_spark.out
The >negate_spark.out redirects the output to a file, which you can then view with
7CCSMBDT – Big Data Technologies Practical
cat negate_spark.out
The file should include [-49, -48, …, -2, -1].
(ii) Write a program that computes the total number of attribute values in the file u.user we used in the part Creating RDDs with textFile . By attribute value, we mean the value in user id, age, gender, occupation, or zip code. For example, the following part of u.user has 5 attribute values (namely 1, 24, M, technician, 85711):
1|24|M|technician|85711
Note that your program needs to load the file u.user again.
(iii) Complete the following program so that it computes the total number of users in u.user with age in [20,30) (i.e., the return value of age_group below is ’20-30’).
from pyspark import SparkContext sc = SparkContext( ‘local’, ‘pyspark’) def age_group(age):
if age < 10 : return '0-10'
elif age < 20: return '10-20'
elif age < 30: return '20-30'
elif age < 40: return '30-40'
elif age < 50: return '40-50'
elif age < 60: return '50-60'
elif age < 70: return '60-70'
elif age < 80: return '70-80'
else :
return '80+'
7CCSMBDT – Big Data Technologies Practical
def parse_with_age_group(data):
userid,age,gender,occupation,zip = data.split("|")
return userid, age_group(int(age)),gender,occupation,zip,int(age)
(iv) Complete the given program in (iii), so that it outputs the number of users in u.user whose age is in [20,30) , grouped by zip code. For example, given the following input file:
1|24|M|technician|11 2|53|F|other|3 3|23|M|writer|7 4|24|M|technician|7 8|36|M|administrator|21 9|29|M|student|2
the output will be something like:
Zipcode : 7, Number of users: 2 Zipcode: 11, Number of users: 1 Zipcode: 2, Number of users: 1
Note that the zip code is the last attribute and the users with age 53 and 36 are excluded. HINT: Use countByValue() as described in https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.SparkContext
(v) Read about accumulators from http://spark.apache.org/docs/latest/programming- guide.html#accumulators
Using accumulators, complete the given program in (iii), so that it outputs the number of users in each age group [60,70) , [70,80), and 80+ , for the users in the file u.user
The output should be something like: Age in [60,70): 27
Age in [70,80): 4
Age in 80+: 0
7CCSMBDT – Big Data Technologies Practical
(vi) Complete the given program in (iii), so that it outputs the occupations that are performed by users in age group [40,50) and also by users in age group [50,60) , in alphabetical order. Each of these occupations should appear only once.
HINT: use the intersection() on the RDD containing the occupations for the age group [40,50) and the RDD containing the occupations for the age group [50,60).