PowerPoint Presentation
Big Data Computing
Spark Basics and RDD
1
A Brief History
2
Why is Map/Reduce bad?
Programming model too restricted
Iterative jobs involve a lot of disk I/O
3
Many specialized systems on top of Hadoop
4
What is Spark?
Efficient
General execution graphs
In-memory storage
Usable
Rich APIs in Java, Scala, Python
Interactive shell
Fast and Expressive Cluster Computing
Engine Compatible with Apache Hadoop
2-5× less code
Up to 10× faster on disk,
100× in memory
Generalize the map/reduce framework
5
Spark’s (Short) History
Initially developed at UC Berkeley in 2009
Became an open-source project in 2010
Became the most active project in the Apache Software Foundation and among Big Data open source projects.
Many companies are switching from Hadoop to Spark, including Amazon, Alibaba, Baidu, eBay, Dianping, Tencent, etc.
6
Spark Popularity
7
Use Memory Instead of Disk
8
Tech Trend: Cost of Memory
9
In-Memory Data Sharing
10
Spark and Map Reduce Differences
11
12
Spark Programming
Resilient Distributed Datasets (RDDs)
A real or virtual file consisting of records
Partitioned into partitions
Created through deterministic
transformations on:
Data in persistent storage
Other RDDs
P1
P2
P3
RDD1
RDD2
RDDs do not need to be materialized
Users can control two other aspects:
Persistence
Partitioning (e.g. key of the record)
13
RDDs and partitions
Programmer specifies number of partitions for an RDD (Default value used if unspecified)
more partitions: more parallelism but also more overhead
Example
Web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop file system to find the cause.
lines = spark.textFile(“hdfs://…”)
errors = lines.filter(_.startsWith(“Error”))
errors.filter(_.contains(“HDFS”))
.map(_split(‘\t’)(3))
.collect()
Execution is pipelined and parallel
No need to store intermediate results
Lazy execution allows optimization
15
Lineage Graph
Fault Tolerance Mechanism
RDD has enough information about how it was derived from to compute its partitions from data in stable storage.
RDD1
RDD2
RDD3
RDD4
Example:
If a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.
Partitions can be recomputed in parallel on different nodes, without having to roll back the whole program.
16
Operations
17
Spark recomputes transformations
lines = sc.textFile(“…”,4)
comments = lines.filter(isComment)
print lines.count(),comments.count()
Caching RDDs
lines = sc.textFile(“…”,4)
lines.cache()
comments = lines.filter(isComment)
print lines.count(),comments.count()
RDD Persistence
Make an RDD persist using persist() or cache()
Different storage levels, default is MEMORY_ONLY
Allows faster reuse and fault recovery
Spark also automatically persists some intermediate data in shuffle operations
Spark automatically drops out old data partitions using LRU policy. You can also unpersist() an RDD manually.
Spark Shell
Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively. Start it by running the following in the Spark directory:
./bin/pyspark
You can use –master local[k] to use k workers. You may need to use –master “local[k]” on some systems. The default is local[*], using as many workers as the number of logical CPU cores.
Use –master spark://host:port to connect to a spark cluster.
The SparkContext sc is already initialized in spark shell.
Let’s make a new RDD from the text of the README file in the Spark source directory:
>>> textFile = sc.textFile(“README.md”)
Transformations and actions
Let’s start with a few actions:
>>> textFile.count() # Number of items in this RDD
104
>>> textFile.first() # First item in this RDD
u’# Apache Spark‘
>>> textFile.take(5)
Now let’s use a transformation. We will use the filter transformation to return a new RDD with a subset of the items in the file.
>>> linesWithSpark = textFile.filter(lambda line: “Spark” in line)
We can chain together transformations and actions:
>>> textFile.filter(lambda line: “Spark” in line).count() # How many lines contain “Spark”?
20
Transformations and actions
RDD actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
22
The arguments to map and reduce are Python anonymous functions (lambdas), but we can also pass any top-level Python function we want. For example:
>>> def max(a, b):
… if a > b:
… return a
… else:
… return b
…
>>> textFile.map(lambda line: len(line.split())).reduce(max)
22
For multiple commands, you can write them in a .py file, and execute it using execfile(). But you will need add “print” to get the output.
Lazy transformations
For example, if you run the following commands:
>>> import math
>>> a = sc.parallelize(range(1,100000))
>>> b = a.map(lambda x: math.sqrt(x))
>>> b.count()
You can realize that map function returns immediately, while the count() function really triggers the work.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes
RDD Basics
consider the simple program below:
lines = sc.textFile(“README.md”)
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.
Self-Contained Applications
Example:
This program just counts the number of lines containing ‘a’ and the number containing ‘b’ in a text file. We can run this application using the bin/spark-submit script:
$ ./bin/spark-submit SimpleApp.py
…
Lines with a: 62, Lines with b: 30
For self-contained applications, you should use sc.stop() to stop the SparkContext at the end of your application. On the other hand, you should not stop the SparkContext in the Spark shell.
Cluster Mode
Note: SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN).
Spark Standalone Cluster Manager
To install Spark Standalone mode, you simply place a compiled version of Spark on each node on the cluster.
You can start a standalone master server by executing:
./sbin/start-master.sh
Start one or more slaves by executing
./sbin/start-slave.sh
Finally, remember to shut down the master and workers using the following scripts when they are not needed:
sbin/stop-master.sh
sbin/stop-slave.sh
Note: Only one master/worker can run on the same machine, but a machine can be both a master and a worker.
Now you can submit your application to the cluster. Example:
./bin/spark-submit –master spark://hostname:7077 pi.py 20
You can monitor the cluster’s status on the master at port 8080.
Job status can be monitored at driver at port 4040, 4041, …
Where code runs
Most Python code runs in driver, except for code passed to transformations. Transformations run at executors, actions run at executors and driver.
Example: Let’s say you want to combine two RDDs: a, b.
You remember that rdd.collect() returns a list, and in Python you can combine two lists with +
A naïve implementation would be:
>>> a = RDDa.collect()
>>> b = RDDb.collect()
>>> RDDc = sc.parallelize(a+b)
Where does this code run?
In the first line, all distributed data for a and b is sent to driver. What if a and/or b is very large? Driver could run out of memory. Also, it takes a long time to send the data to the driver.
In the third line, all data is sent from driver to executors.
The correct way:
>>> RDDc = RDDa.union(RDDb)
This runs completely at executors.
The Jupyter Notebook
In-browser editing for code, with automatic syntax highlighting, indentation, and tab completion/introspection.
The ability to execute code from the browser, with the results of computations attached to the code which generated them.
In-browser editing for rich text using the Markdown markup language, which can provide commentary for the code, is not limited to plain text.
See Canvas page for installation guide
Closure
A task’s closure is those variables and methods which must be visible for the executor to perform its computations on the RDD.
Functions that run on RDDs at executors
Any global variables used by those executors
The variables within the closure sent to each executor are copies.
This closure is serialized and sent to each executor from the driver when an action is invoked.
Closures: A bad example
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don’t do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print(“Counter value: “, counter)
Accumulators
Accumulators are variables that are only “added” to through an associative and commutative operation.
Created from an initial value v by calling SparkContext.accumulator(v).
Tasks running on a cluster can then add to it using the add method or the += operator
Only the driver program can read the accumulator’s value, using its value method.
Accumulators: Note
Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map().
Update in transformations may be applied more than once if tasks or job stages are re-executed. See example.
Suggestion: Avoid using accumulators whenever possible. Use reduce() instead.
Computing Pi using Monte Carlo simulation
The pi.py example provided in the default Spark installation is actually wrong! The accuracy does not improve by using more partitions. Why? try executing the following code:
>>> a = sc.parallelize(xrange(0,20),2)
>>> a.map(lambda x: random()).glom().collect()
Check online manual for glom().
We see that the random numbers generated in different partitions are the same. This is because they use the same default seed, initialized when the python random package is first imported. The solution is to set a different seed for each partition using mapPartitionsWithIndex().
More notes on random number generation
Use deterministic pseudo random number generators
Use fixed seed (not system time)
If a partition is lost and recomputed, the recovered partition is the same as before
If not, the program may not behave correctly
If different random numbers are desired every time you run your program, generate a single seed (using system time) in driver program and use it in all tasks.
Better yet, use Spark SQL (later)
Example: Linear-time selection
Problem:
Input: an array A of n numbers (unordered), and k
Output: the k-th smallest number (counting from 0)
Algorithm
x = A[0]
partition A into
A[0..mid-1] < A[mid] = x < A[mid+1..n-1]
if mid = k then return x
if k < mid then A = A[0..mid-1]
if k > mid then A = A[mid+1,n-1], k = k – mid – 1
go to step 1
Why didn’t it work?
This closure is sent to each executor from the driver when an action is invoked.
How to fix?
Use different variables to hold x ?
Add an action immediately after each transformation ?
And cache it.
But this introduces unnecessary work.
Rule: When the closure includes variables that will change later, call an action and cache the RDD before the variables change.
Lab: Key-Value Pairs
While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs.
In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation.
For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:
lines = sc.textFile(“README.md”)
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as a list of objects.
Lab: Key-value pairs
Other useful operations on key-value pairs:
flatMap(func), sortByKey()
Examples
Lab: PMI Computation
PMI (pointwise mutual information) is a measure of association used in information theory and statistics.
Given a list of pairs (x, y)
where
probability of ,
probability of ,
joint probability of
Example:
p(x=0) = 0.8, p(x=1)=0.2, p(y=0)=0.25, p(y=1)=0.75
pmi(x=0;y=0) = −1
pmi(x=0;y=1) = 0.222392
pmi(x=1;y=0) = 1.584963
pmi(x=1;y=1) = -1.584963
x y p(x, y)
0 0 0.1
0 1 0.7
1 0 0.15
1 1 0.05
Lab: k-means clustering
The algorithm:
Choose k points from the input points randomly. These points represent initial group centroids.
Assign each point to the closest centroid.
When all points have been assigned, recalculate the positions of the k centroids.
Repeat Steps 2 and 3 until the centroids no longer move. This produces a separation of the objects into groups from which the metric to be minimized can be calculated.
See example at http://shabal.in/visuals/kmeans/6.html
Lab: PageRank
Algorithm:
Initialize all PR’s to 1
Iteratively compute
Functions to note: groupByKey(), join(), mapValues()
Broadcast Variables
Broadcast variables allow the programmer to keep a read-only variable cached on each machine (not each task)
More efficient than sending closures to tasks
Example
/docProps/thumbnail.jpeg