COMP9313: Big Data Management
Course web site: http://www.cse.unsw.edu.au/~cs9313/
Chapter 4.2: I
Download and Configure Spark
❖ Current version: 3.1.2. https://spark.apache.org/downloads.html ➢ You also need to install Java first
❖ After downloading the package, unpack it and then configure the path variable in file ~/.bashrc
export SPARK_HOME=/home/comp9313/workdir/spark export PATH=$SPARK_HOME/bin:$PATH
❖ Spark comes with four widely used interpreters that act like interactive “shells” and enable ad hoc data analysis: pyspark, spark-shell, sparksql, and sparkR
❖ You can start the spark-shell by using the command “spark-shell”
Understanding Concepts
❖ Application
➢ A user program built on Spark using its APIs. It consists of a driver
program and executors on the cluster ❖ SparkContext/SparkSession
➢ An object that provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs
➢ A parallel computation consisting of multiple tasks that gets
spawned in response to a Spark action (e.g., save(), collect()). ❖ Stage
➢ Each job gets divided into smaller sets of tasks called stages that depend on each other.
➢ A single unit of work or execution that will be sent to a Spark executor.
SparkSession
❖ The core of every Spark application is the Spark driver program, which creates a SparkSession (SparkContext in Spark 1.x) object.
➢ When you’re working with a Spark shell, the driver is part of the shell and the SparkSession/SparkContext object (accessible via the variable spark) is created for you
➢ Once you have a SparkSession/ SparkContext, you can program Spark using the APIs to perform Spark operations.
❖ During interactive sessions with Spark shells, the driver converts your Spark application into one or more Spark jobs
❖ It then transforms each job into a Spark’s execution plan as a DAG, where each node within a DAG could be a single or multiple Spark stages.
❖ Stages are created based on what operations can be performed serially or in parallel.
❖ Each stage is comprised of Spark tasks (a unit of execution), which are then federated across each Spark executor; each task maps to a single core and works on a single partition of data
The Spark UI
❖ Spark includes a graphical user interface that you can use to inspect or monitor Spark applications in their various stages of decomposition—that is jobs, stages, and tasks.
❖ The driver launches a web UI, running by default on port 4040, where you can view metrics and details such as:
➢ A list of scheduler stages and tasks
➢ A summary of RDD sizes and memory usage ➢ Information about the environment
➢ Information about the running executors
➢ All the QL queries
❖ In local mode, you can access this interface at http://localhost:4040 in a web browser.
Part 1: Programming with RDD
RDD Operations
❖ Transformation: returns a new RDD.
➢ Nothing gets evaluated when you call a Transformation function, it
just takes an RDD and return a new RDD.
➢ Transformation functions include map, filter, flatMap, groupByKey,
reduceByKey, aggregateByKey, join, etc.
❖ Action: evaluates and returns a new value.
➢ When an Action function is called on a RDD object, all the data processing queries are computed at that time and the result value is returned.
➢ Action operations include reduce, collect, count, first, take, countByKey, foreach, saveAsTextFile, etc.
❖ Web service is experiencing errors and an operators want to search terabytes of logs in the Hadoop file system to find the cause.
//base RDD
val lines = sc.textFile(“hdfs://…”)
//Transformed RDD
val errors = lines.filter(_.startsWith(“Error”))
errors.persist()
errors.count() errors.filter(_.contains(“HDFS”))
.map(_.split(‘\t’)(3)) .collect()
➢ Line1: RDD backed by an HDFS file (base RDD lines not loaded in memory)
➢ Line3: Asks for errors to persist in memory (errors are in RAM) 4.12
Lineage Graph
RDDs keep track of lineage
❖ RDD has enough information about how it was derived from to
compute its partitions from data in stable storage.
RDD1 RDD2 RDD3 RDD4
HDFS errors
map(lambda x:x.split(‘\t’)[3])
time fields
filter(lambda x: x.startswith(“Error”)
filter(lambda x: “HDFS” in x)
❖ Example:
➢ If a partition of errors is lost, Spark rebuilds it by applying a filter
on only the corresponding partition of lines.
➢ Partitions can be recomputed in parallel on different nodes,
without having to roll back the whole program.
Deconstructed
//base RDD
val lines = sc.textFile(“hdfs://…”)
//Transformed RDD
val errors = lines.filter(_.startsWith(“Error”))
errors.persist()
errors.count() errors.filter(_.contains(“HDFS”))
.map(_.split(‘\t’)(3)) .collect()
Deconstructed
//base RDD
val lines = sc.textFile(“hdfs://…”)
//Transformed RDD
val errors = lines.filter(_.startsWith(“Error”))
errors.persist()
Put transform and action together:
errors.filter(_.contains(“HDFS”)).map(_split(‘\t’)(3)).collect() 4.15
errors.count()
count() causes Spark to: 1) read data; 2) sum within partitions; 3) combine sums in driver
SparkContext
❖ SparkContext is the entry point to Spark for a Spark application.
❖ Once a SparkContext instance is created you can use it to
➢ Create RDDs
➢ Create accumulators
➢ Create broadcast variables
➢ access Spark services and run jobs
❖ A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application
❖ The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster
❖ In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc
RDD Persistence: Cache/Persist
❖ One of the most important capabilities in Spark
is persisting (or caching) a dataset in memory across operations.
❖ When you persist an RDD, each node stores any partitions of it. You can reuse it in other actions on that dataset
❖ Each persisted RDD can be stored using a different storage level, e.g. ➢ MEMORY_ONLY:
Store RDD as deserialized Java objects in the JVM.
If the RDD does not fit in memory, some partitions will not be
cached and will be recomputed when they’re needed. This is the default level.
➢ MEMORY_AND_DISK:
If the RDD does not fit in memory, store the partitions that don’t
fit on disk, and read them from there when they’re needed.
❖ cache() = persist(StorageLevel.MEMORY_ONLY)
Why Persisting RDD?
val lines = sc.textFile(“hdfs://…”)
val errors = lines.filter(_.startsWith(“Error”))
errors.persist()
errors.count()
❖ If you do errors.count() again, the file will be loaded again and computed again.
❖ Persist will tell Spark to cache the data in memory, to reduce the data loading cost for further actions on the same data
❖ erros.persist() will do nothing. It is a lazy operation. But now the RDD says “read this file and then cache the contents”. The action will trigger computation and data caching.
❖ Similar to Map Reduce, Spark supports Key-Value pairs ❖ Each element of a Pair RDD is a pair tuple
❖ Spark supports data partitioning control for pair RDDs ❖ Some Key-Value transformation functions:
Value RDDs
Example (Transformation)
❖ Transformations on one pair RDD rdd = {(1, 2), (3, 4), (3, 6)}
reduceByKey(func)
Combine values with the same key
rdd.reduceByKey( (x, y) => x + y)
groupByKey()
Group values with the same key
rdd.groupByKey()
{(1, [2]), (3, [4, 6])}
mapValues(func)
Apply a function to each value of a pair RDD without changing the key
rdd.mapValues(x => x+1)
{(1, 3), (3, 5), (3, 7)}
Return an RDD of just the keys
rdd.keys()
Return an RDD of just the values
rdd.values()
sortByKey()
Return an RDD sorted by the key
rdd.sortByKey()
{(1, 2), (3, 4), (3, 6)}
Example (Transformation)
❖ Transformations on two pair RDDs rdd1 = {(1, 2), (3, 4), (3, 6)} and rdd2 = {(3, 9)})
subtractByKey
Remove elements with a key present in the other RDD
rdd1.subtractByKey (rdd2)
{(1, 2), (3, 10)}
Perform an inner join between two RDDs
rdd1.join(rdd2)
{(3, (4, 9)), (3, (6, 9))}
Group data from both RDDs sharing the same key
rdd1.cogroup(rdd2)
{(1,([2],[])),
(3, ([4, 6],[9]))}
Example (Actions)
❖ Actions on one pair RDD rdd = ({(1, 2), (3, 4), (3, 6)})
countByKey()
Count the number of elements for each key
rdd.countByKey()
{(1, 1), (3, 2)}
collectAsMap()
Collect the result as a map to provide easy lookup
rdd.collectAsMap()
Map{(1, 2), (3, 4), (3, 6)}
lookup(key)
Return all values associated with the provided key
rdd.lookup(3)
A Few Practices on Pair RDD
val lines = sc.parallelize(List(“hello world”, “this is a scala program”, “to create a pair RDD”, “in spark”))
val pairs = lines.map(x => (x.split(” “)(0), x))
pairs.filter {case (key, value) => key.length <3}.foreach(println)
val pairs = sc.parallelize(List((1, 2), (3, 1), (3, 6), (4,2)))
val pairs1 = pairs.mapValues(x=>(x, 1))
val pairs2 = pairs1.reduceByKey((x,y) => (x._1 + y._1, x._2+y._2)) pairs2.foreach(println)
val pairs = sc.parallelize(List((1, 2), (3, 4), (3, 9), (4,2)))
val pairs1 = pairs.mapValues(x=>(x, 1)).reduceByKey((x,y) => (x._1 + y._1, x._2+y._2)).mapValues(x=>x._2/x._1)
pairs1.foreach(println)
Passing Functions to RDD
❖ Spark’s API relies heavily on passing functions in the driver program to run on the cluster.
➢ Anonymous function. E.g.,
val words = input.flatMap(line => line.split(” “))
➢ Static methods in a global singleton object. E.g,
object MyFunctions { def func1(s: String): String = { … } }
myRdd.map(MyFunctions.func1)
Understanding Closures
❖ RDD operations that modify variables outside of their scope can be a frequent source of confusion.
❖ Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (–master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
➢ The behavior of the above code is undefined, and may not work as intended.
➢ Spark sends the closure to each task containing variables must be visible to the executors. Thus “counter” in the executor is only a copy of the “counter” in the driver.
var counter = 0
var rdd = sc.parallelize(data) rdd.foreach(x => counter += x) println(“Counter value: ” + counter)
Load Your Data
❖ File formats range from unstructured, like text, to semi-structured, like JSON, to structured, like SequenceFiles.
❖ Text File:
➢ input = sc.textFile(“file:///home/holden/repos/spark/README.md”)
❖ CSV File:
➢ You can use csv libraries such as opencsv:
➢ If you know the field separator in advance, you can also split each record into columns using the separator such as “,”
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile) val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext(); }
Save Your Data
❖ Text File:
➢ result.saveAsTextFile(outputFile)
❖ CSV File:
➢ You can use StringWriter/StringIO to allow us to put the result in
➢ You can also convert each record to a string with the fields separated by a separator such as “,”, and then save to a text file.
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter); csvWriter.writeAll(people.toList) Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
Setting the Level of Parallelism
❖ All the pair RDD operations take an optional second parameter for number of tasks
> words.reduceByKey((x,y) => x + y, 5) > words.groupByKey(5)
Using Local Variables
❖ Any external variables you use in a closure will automatically be shipped to the cluster:
> query = sys.stdin.readline()
> pages.filter(x => x.contains(query)).count()
❖ Some caveats:
➢ Each task gets a new copy (updates aren’t sent back) ➢ Variable must be Serializable
Shared Variables
❖ When you perform transformations and actions that use functions (e.g., map(f: T=>U)), Spark will automatically push a closure containing that function to the workers so that it can run at the workers.
❖ Any variable or data within a closure or data structure will be distributed to the worker nodes along with the closure
❖ When a function (such as map or reduce) is executed on a cluster node, it works on separate copies of all the variables used in it.
❖ Usually these variables are just constants but they cannot be shared across workers efficiently.
Shared Variables
❖ Consider These Use Cases
➢ Iterative or single jobs with large global variables
Sending large read-only lookup table to workers
Sending large feature vector in a ML algorithm to workers
Problems? Inefficient to send large data to each worker with each iteration
Solution: Broadcast variables
➢ Counting events that occur during job execution
How many input lines were blank?
How many input records were corrupt?
Problems? Closures are one way: driver -> worker Solution: Accumulators
Broadcast Variables
❖ Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
➢ For example, to give every node a copy of a large input dataset efficiently
❖ Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost
❖ Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). Its value can be accessed by calling the value method.
scala > val broadcastVar =sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala > broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
❖ The broadcast variable should be used instead of the value v in any functions run on the cluster, so that v is not shipped to the nodes more than once.
Accumulators
❖ Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.
❖ They can be used to implement counters (as in MapReduce) or sums.
❖ Spark natively supports accumulators of numeric types, and
programmers can add support for new types.
❖ Only driver can read an accumulator’s value, not tasks
❖ An accumulator is created from an initial value v by calling SparkContext.accumulator(v).
scala> val accum = sc.longAccumulator(“My Accumulator”)
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
… 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
Accumulators Example (Python)
❖ Counting empty lines
➢ blankLines is created in the driver, and shared among workers ➢ Each worker can access this variable
file = sc.textFile(inputFile)
# Create Accumulator[Int] initialized to 0 blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # Make the global variable accessible if (line == “”):
blankLines += 1 return line.split(” “)
callSigns = file.flatMap(extractCallSigns) print (“Blank lines: %d” % blankLines.value)
RDD Operations
DD API Examples:
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Part 2: Model (RDD)
How Spark Works
❖ User application create RDDs, transform them, and run actions. ❖ This results in a DAG (Directed Acyclic Graph) of operators.
❖ DAG is compiled into stages
❖ Each stage is executed as a series of Task (one Task for each Partition).
Word Count in Spark
val file = sc.textFile(“hdfs://…”, 4) RDD[String]
Word Count in Spark
val file = sc.textFile(“hdfs://…”, 4) val words = file.flatMap(line =>
line.split(“ ”))
RDD[String]
RDD[List[String]]
Word Count in Spark
val file = sc.textFile(“hdfs://…”, 4) val words = file.flatMap(line =>
line.split(“ ”))
val pairs = words.map(t => (t, 1))
RDD[String]
RDD[List[String]] RDD[(String, Int)]
Word Count in Spark
val file = sc.textFile(“hdfs://…”, 4) val words = file.flatMap(line =>
line.split(“ ”))
val pairs = words.map(t => (t, 1))
val count = pairs. reduceByKey(_+_)
RDD[String]
RDD[List[String]]
RDD[(String, Int)] RDD[(String, Int)]
reduceByKey
Word Count in Spark
val file = sc.textFile(“hdfs://…”, 4) val words = file.flatMap(line =>
line.split(“ ”))
val pairs = words.map(t => (t, 1))
val count = pairs. reduceByKey(_+_) count.collect()
RDD[String]
RDD[List[String]]
RDD[(String, Int)] RDD[(String, Int)] Array[(String, Int)]
collect reduceByKey
Execution Plan
collect reduceByKey
❖ The scheduler examines the RDD’s lineage graph to build a DAG of stages.
❖ Stages are sequences of RDDs, that don’t have a Shuffle in between ❖ The boundaries are the shuffle stages.
Execution Plan
collect reduceByKey
1. ReadHDFSsplit
2. Apply both the maps 3. StartPartialreduce 4. Write shuffle data
1. Readshuffledata 2. Finalreduce
3. Send result to
driver program
❖ You can browse the web interface for the information of , storage, etc. at: http://localhost:4040
Stage Execution
Task 2 Task 3 Task 4
❖ Create a task for each Partition in the new RDD ❖ Serialize the Task
❖ Schedule and ship Tasks to Slaves
❖ All this happens internally
Word Count in Spark (As a Whole View)
❖ Word Count using Scala in
“to” “to be or” “be”
(to, 1) (be, 1) (or, 1)
(not, 1) (to, 1) (be, 1)
(be, 2) (not, 1)
(or, 1) (to, 2)
“not to be”
“not” “to” “be”
❖ map: Return a new distributed dataset formed by passing each element of the source through a function func.
❖ flatMap: Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
❖ Sample input file:
Part 3: Running on a Cluster
❖ Standalone code
(RDD, Scala)
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf
object WordCount {
def main(args: Array[String]) {
val inputFile = args(0)
val outputFolder = args(1)
val conf = new SparkConf().setAppName(“wordCount”).setMaster(“local”) // Create a Context.
val sc = new SparkContext(conf)
// Load our input data.
val input = sc.textFile(inputFile)
// Split up into words.
val words = input.flatMap(line => line.split(” “))
// Transform into word and count.
val counts = words.map(word => (word, 1)).reduceByKey(_+_)
object first
❖ Linking with Apache Spark
➢ The first step is to explicitly import the required spark classes into
your Spark program
❖ Initializing Spark
➢ Create a Spark context object with the desired spark configuration
that tells Apache Spark on how to access a cluster
➢ SparkConf: Spark configuration class
➢ setAppName: set the name for your application
➢ setMaster: set the cluster master URL
(RDD, Scala)
import org.apache.spar