COMP5349 – Cloud Computing
Week 5: Spark Framework
Dr. Ying Zhou School of Computer Science
Last Week
n Last week we cover MapReduce framework
n MapReduce is the first kind of big data processing framework
Many other frameworks are either built on top of it, or follow its design principle
n MapReduce borrows many concepts from functional programming
Two higher order functions: map and reduce
n It divides a data analytic workload into a sequence of jobs
consisting of map phase and reduce phase.
n Both Map and Reduce phase can be parallelized n Key value pair is the prominent data structure
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-2
Outline
n Data Sharing and Job Chaining in MapReduce n Spark Basic Concepts
n A complete Spark Application Example
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-3
Data Sharing in MapReduce
n The mappers and reducers are designed to run independently on different partitions of the input data on different nodes
n The API for mapper and reducer only takes input key value pairs as arguments
n How do we pass additional program data?
n E.g. for a simple word count program, we may want to
Get rid of all stop words, the list of stop words is stored in a text file
Remove words that occurs less than a threshold, the threshold value can be given as command line argument or in a property file
How do we share the stop word file among all mappers? How do we tell all reducers the threshold values?
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-4
Sharing parameters
n Sharing parameters of simple type can be achieved using the Configuration object
E.g. a threshold value (int), a timestampe value, some string value n In Java application
the driver program will set the property with the configuration conf.set(“mapper.placeFilter.country”, “Australia“)
Both mapper and reducer can read it out context.getConfiguration().get(“mapper.placeFilter.country”, countryName);
n In Python application
Simple parameter can be specified as argument of the mapper or
reducer script
mapper “place_filter_mapper.py Australia”
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-5
Distributing Auxiliary Job data
n Auxiliary job data
In general a small file contains common background knowledge for
map and/reduce functions
¡ E.g. the stop word list for word counting, the dictionary for spelling check
All mappers/reducers need to read it
The file is small enough to fit in the memory of mappers/reducers
n Hadoop provides a mechanism for this purpose called the distributed cache.
Files put in the distributed cache is accessible by both mappers and reducers.
n Distributed cache can be used to provide an efficient join if one join table is small enough to fit in the memory
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-6
Chaining Jobs
n Most of the time, an analytic workload cannot be implemented in a single MapReduce job, we need to chain multiple jobs
n E.g. If we want to sort words descendingly based on their occurrence in a cohort
We need two jobs, the first job does the word counting, the second one does the sorting
The second job uses the output of the first job as input n In Java API
The chaining happens in the driver program, define jobs one by one and submitting them in the correct order
n In Python API
We use streaming API multiple times to submit jobs one by one
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-7
A simple select-join example
n Two input files Photo.csv:
phoeo_id \t owner \t tags \t date_taken \t place_id \t accuracy Place.csv:
place_id \t woeid \t lat \t longi \t place_name \t place_url
n We want to find all photos taken in “Australia”
n Assuming place.csv is large, but its subset containing “Australia” is quite small
n Assuming photo.csv is very large n Two jobs:
The first job reads the place.csv data, filters only rows containing word Australia in place_url and saves the output in a file
The second job uses distributed cache to distribute the result of the first job and joins it with photos.csv
See course git repos for example in Python code
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-8
Outline
n Data Sharing and Job Chaining in MapReduce
n Spark Basic Concepts Motivation
RDD and its operations Functional Programming Revisit General Structure of Spark Program
n A complete Spark Application Example
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-9
Two major issues with MapReduce
n Map/Reduce are storage based
Pro: parallelism, fault-tolerance, runtime can decide where to run tasks
Con: simple processing model that assumes data flowing from stable storage to stable storage + materialization of intermediate results
….
Transfer across network
Transfer across network
Job Chaining MRMR
Read input from disk
Write output to disk
Write output to disk Read input from disk Write output to disk
Writ output to disk
n Provide a simple set of API: map and reduce
Pro: very flexible, you can implement various processing in the map and
reduce function.
Con: means common processing tasks are not provided and need to be
implemented again and again by developers
M1 join
R M2
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-10
Data Analytics Workload
n Exploratory
Summarizing the main characteristics of the data set
Comparing across various columns
Such exploration may reveal high level features of the data set and
help to make decisions
¡ E.g. we may need to set up a data center in Australia, it seems we have a lot of customers there
n Predictive
Build either statistic or machine learning models to make prediction
¡ E.g. which movie this customer may like; Most machine learning algorithms are iterative,
¡ The data set will be scanned and processed multiple times until some stop criterion is reached
¡ The results of pervious iteration will be used in next iteration Storage based MapReduce is not suitable for such algorithm
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-11
Beyond MapReduce
n One type of attempt, which deals with mainly exploratory data analysis, built a SQL, or SQL like layer on top of MapReduce
Pig, HIVE and others
SQL like data analytic expressions are automatically converted into
MapReduce programs
¡ common processing like filtering, projection, joining are implemented
Various optimization techniques has been proposed to achieve performance similar to or better than hand coded version
Backend engine is still storage heavy MapReduce
n Another type of attempt, which covers both exploratory and predictive analysis, is the data flow based analysis system
Memory based + rich set of APIs Spark and Flink
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-12
Apache Spark
n In-memory framework for interactive and iterative computations
n Goals:
Distributed memory abstractions for clusters to support apps that
needs to repeatedly reuse working sets of data Retain the attractive properties of MapReduce:
¡ Fault tolerance (for crashes & stragglers) ¡ Data locality
¡ Scalability
¡ Functional programming flavour
n Approach:
Augment data flow model with Resilient Distributed Dataset (RDD) ¡ RDD: fault-tolerance, in-memory storage abstraction
New data structure abstraction such as DataFrame is proposed and becomes the main data structure in later versions.
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-13
Spark RDD Programming Model
n Resilient distributed datasets (RDDs)
Immutable collections partitioned across cluster that can be rebuilt if
a partition is lost
Created by transforming data in stable storage using data flow operators (map, filter, group-by, …)
Can be cached across parallel operations n Parallel operations on RDDs
Transformations and actions n Restricted shared variables
Accumulators, broadcast variables
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-14
Resilient Distributed Datasets
n RDDs are created by
Parallelizing an existing collection
Referencing a dataset in an external storage system
//parallelizing existing collection
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
//referencing a data set in HDFS
lines = sc.textFile(“data.txt”)
n RDDs may contain key value pair as record
If the data in an RDD is of tuple type, the first element would be treated as key automatically, the rest will be values
The value can be of simple type, or of tuples kvData = [(‘a’,1), (‘b’,2), (‘c’,3,) (‘d’,4), (‘e’,5)]
kvDistData = sc.parallelize(data)
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-15
RDD operations
n Transformation
create a new dataset from an existing one
Eg. map(func), flatMap(func), reduceByKey(func) n Action
return a value to the driver program after running a computation on the data set
Eg. count(), first(), collect(), saveAsTextFile(path)
nMost RDD operations take one or more functions
as parameter
Most of them can be viewed as higher order functions
nSpark has strong functional programming flavour! COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-16
Spark Program
n A Spark program is just a regular main program/script that creates SparkContext object, which is used to access a cluster.
n The spark context provides methods for Data input
Data output …
n The data processing steps are defined using RDD transformations and actions
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-17
WordCount in Spark (Python)
from pyspark import SparkConf, SparkContext
myconf = (SparkConf().setMaster(”…”) .setAppName(“WordcountExample”)
.set(“spark.executor.memory”, “1g”) sc = SparkContext(myconf)
Create RDD from input file
text_file = sc.textFile(“hdfs://…”)
counts = text_file.flatMap(lambda line: line.strip().split(” “)) \
.map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(“hdfs://…”)
See demo of word count in Ed workspace
One transformation
[Cf.: http://spark.apache.org/examples.html]
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-18
Revisit: functional programming
n Higher order functions can take other functions as parameter
n MapReduce provides two such higher order functions
map and reduce
MapReduce API defines the signature of functions supplied to map and reduce
¡ Functions supplied to map should take a key value pair as input and returns a list of key value pairs
¡ Functions supplied to reduce take a key and value list as input and returns a key value pair
The signature is general enough to handle many types of processing
MapReduce framework is responsible for calling those user supplied “functions”
¡ They can be implemented as method, script, etc..
¡ Actual programming does not involve a lot functioning flavour
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-19
Revisit: functional programming
n In Spark, most RDD transformations are higher order functions, they take one or more functions as argument(s)
n Spark API defines and restricts the signature of functions supplied to each transformation
n The actual invocation is user specified
Calling a transformation on an RDD with a given function
A transformation
text_file.flatMap(lambda line: line.strip().split(” “))
an RDD
A function argument
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-20
Anonymous Functions
n Functional argument can be compactly expressed using anonymous functions
Python: map (lambda a: 2*a)
n Lambda expression is a way to express anonymous
functions in programming language
n Nearly all functional languages and most script languages support anonymous function
n Anonymous function cannot be reused efficiently
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-21
Lambda Expression
n Lambda expression is quite hard to understand when the function logic gets complicated
Function body is embedded in the main program logic The argument(s) and return types are not declared explicitly
n Function is a language concept in Python, lambda expression is only used for very simple functions
n It is much easier to define a function then call it using the function name.
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-22
RDD Operation functional argument
counts = text_file.flatMap(lambda line: line.strip().split(” “)) \ .map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-23
Outline
n Data Sharing and Job Chaining in MapReduce
n Spark Basic Concepts
RDD and its operations Functional Programming Revisit General Structure of Spark Program Lambda Expression in Java
n A Complete Spark Application Example
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-24
A sample program
n Two data sets stored as txt files n Movies (mid, title, genres)
Sample data:
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
n Ratings (uid, mid, rating, timestamp) Sample data:
1,253,3.0,900660748
n We want to find out the average rating for each genre
We would join the two data sets on movie id (mid) and keep only the genre and rating data, we then group the rating data based on genre and find the average for each genre.
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-25
Ratings file
sc.textFile(…)
RDD
Movies file
sc.textFile(…)
RDD movies flatMap
mid, genre
Spark RDD operation design
uid, mid, rating, timestampmid, title, genres
1,1,3.0,900660748
1,2,4.0, 932640588
1,Toy Story (1995), Animation|Children 2,Babe(1995), Children
ratings
mid, rating
join
PairRDD
PairRDD
map
PairRDD
PairRDD
“1,Toy Story (1995), Animation|Children”, “2,Babe(1995), Children”
mid, (genre, rating)
map
genre, rating
aggregateByKey
PairRDD
map
PairRDD
genre, (rateSum, rateCount)
genre, avg-rating
saveAsTextFile
result file
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-26
“1,1,3.0,900660748” “1,2,4.0,932640588”
(1,3.0)
(2,4.0)
(1,(Animation,3.0)) (1,(Children,3.0)) (2,(Children,4.0))
(1,Animation)
(1,Children)
(2,Children)
(Animation,3.0)
(Children,3.0)
(Children,4.0)
(Animation,(3.0,1)) (Children,(7.0,2))
(Animation,3.0)
(Children,3.5,)
Ratings file
sc.textFile(…)
RDD
Spark Program Skeleton
Movies file
sc.textFile(…)
RDD movies flatMap
ratings
map
mid, rating
join
PairRDD
mid, genre
PairRDD
PairRDD genre, rating aggregateByKey
PairRDD
PairRDD
mid, (genre, rating)
values
genre, (rateSum, rateCount)
map
genre, avg-rating
saveAsTextFile
result file
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-27
Design Functions
https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-28
Design Functions
n AggregateByKey transformation
There are a few transformations similar to the reducer in MapReduc framework
groupByKey groups the values for each key. This is like the step of preparing input for reducer in MapReduce framework
reduceByKey merge the values for each key using a given reduce function. This will also perform the merging locally on each “mapper” before sending results to a reducer, similarly to a “combiner” in MapReduce. But the type of the merged value should be the same as the type of the input value!
foldByKey is similar to reduceByKey except that you can supply a natural zero value.
aggregateByKey is more general than reduceByKey. It allows the merged value to have different type of the input
value. It takes at least a natural zero value and two functions as parameter.
All above transformations can take extra parameter to indicate the number of partition, or a partitioner object. This is like
specifying the number of reducers in MapReduce, or specifying a customized partitioner.
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-29
n aggregateByKey transformation
Design Functions
Of type V
Of type V
seqFunc
f
First result
ff
Of type U
initValue
f
First result
ff
Of type U
initValue
zeroValue
COMP5349 “Cloud Computing” – 2020 (Y. Zhou)
05-30
combFunc
Design Functions
COMP5349 “Cloud Computing” – 2020 (Y. Zhou) 05-31