程序代写代做代考 data mining file system hbase data structure html graph hadoop cache 7CCSMBDT – Big Data Technologies Week 6

7CCSMBDT – Big Data Technologies Week 6
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1

MapReduce functionality
“maps” a value with a key and emits the key=cust_id and value=amount pair.
Query to select input documents to the map function. status: “A”
Location of the result (collection or in-line). Collection: order_totals
“reduces” to a single object all the values associated with a particular key. key=cust_id and value= the sum amount for the same cust_id.
https://docs.mongodb.com/manual/reference/method/db.collection.mapReduce/#db.collection.mapReduce
2

MapReduce functionality
var mapF=function(){ emit(this.cust_id, this.amount); }; this refers to the document that
the map-reduce operation is processing
https://docs.mongodb.com/manual/reference/method/db.collection.mapReduce/#db.collection.mapReduce
3
Sort-and-shuffle

MapReduce functionality
var redF=function(key, values){ return Array.sum(values); };
https://docs.mongodb.com/manual/reference/method/db.collection.mapReduce/#db.collection.mapReduce
4

MapReduce functionality
 Executes the MapReduce operation and outputs to the collection “order_totals” db.orders.mapReduce(mapF, redF, {out: “order_totals”})
https://docs.mongodb.com/manual/reference/method/db.collection.mapReduce/#db.collection.mapReduce
5

Today
 Apache Spark
Material from
M. Zaharia et al. Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, USENIX on NSDI 2012 M. Zaharia et al. Apache Spark: A Unified Engine for Big Data Processing. CACM ‘16. http://cacm.acm.org/magazines/2016/11/209116-apache-spark/fulltext
Apache spark overview
https://spark.apache.org/docs/1.6.0/quick-start.html
6

What is Apache Spark?
 Unified engine for distributed data processing
 Spark extends MapReduce programming model with an
abstraction that allows efficient data reuse
User’s program
Data processing libraries
STORAGE
HDFS, local filesystem, Hbase, …
Cluster management
Apache Mesos, YARN, Standalone
7

Motivation for Spark
 Map Reduce is inefficient for applications that reuse intermediate results across multiple computations.
Example
PageRank
 The output of a MR job in iteration i, is the input of another MR job in iteration i+1. And i can be very large.
k-means clustering
Logistic Regression
8

Motivation for Spark
 Map Reduce is inefficient for interactive data mining (multiple ad-hoc queries on the same data)
Example
Web log queries to find total views of
(1) all pages,
(2) pages with titles exactly matching a given word, and (3) pages with titles partially matching a word.
9

Motivation for Spark
 Hadoop is inefficient because it writes to distributed file system (HDFS)  Overhead due to data replication, disk I/O, and serialization
 Frameworks to address the inefficiency support specific computation patterns
 Pregel: Iterative graph computations
 HaLoop: Iterative MapReduce interface
but not generic data reuse (e.g., a user cannot load all logs in memory and perform interactive data mining on them)
10

Spark
 Resilient Distributed Datasets (RDDs) enable efficient data reuse
 An RDD is a read-only, fault-tolerant collection of records that can be
operated in parallel
 An RDD is a data structure that serves as the core unit of data
 User program can manipulate it, control its partitioning, make it
persistent in memory.
11

Overview of RDDs
 Do not need to be materialized
 Each RDD knows its lineage (how it was derived from other datasets)
and can compute its partitions from data in stable storage
 Only RDDs that can be reconstructed after failure can be referenced by a user’s program
 Persistence
 Users can indicate which RDDs they will reuse and choose a storage strategy
for them (e.g., in-memory storage)
 Partitioning
 Users can specify how records of an RDD are partitioned across machines
based on a key in each record
12

Overview of RDDs
 Created through operations, called transformations, on  data in stable storage
 other RDDs
Returns a new RDD formed by passing each element of the source RDD through a function f (i.e.,”1-1” mapping)
Returns a new RDD formed by selecting those elements of the source RDD on which f returns true.
Similar to map, but each input item can be mapped to 0 or more output items
(i.e., as map in MapReduce)
13

Overview of RDDs
rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd.filter(lambda x: x % 2 == 0).collect() Output:
[2, 4]
rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: range(1, x)).collect() Output:
[[1], [1, 2], [1, 2, 3]]
rdd.flatMap(lambda x: range(1, x)).collect() Output:
[1, 1, 2, 1, 2, 3]
14

Overview of RDDs
 Some other transformations
Contains examples:
https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD https://spark.apache.org/docs/1.6.0/programming-guide.html#transformations
15

Overview of RDDs
 Manipulated through actions, i.e., operations that  return a value to the application, or
 export data to a storage system
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) Output: 15
>>> sc.parallelize([(1,3),(1,4),(2,5)]).lookup(1) Output: [3, 4]
For more on actions
cxcx
https://spark.apache.org/docs/1.6.0/programming-guide.html#actions
16

Overview of RDDs
 Persist method to implement persistence
 Persistent RDDs are stored in RAM by default
 If there is not enough RAM, they are spilled to disk
 Users can request other storage strategies
 Storing an RDD only on disk
 Replicating an RDD across machines, through
flags to persist.
 Set a persistence priority on each RDD to specify
which in-memory data should spill to disk first.
17

Example: Console log mining
 Several TBs of logs are stored in HDFS and need to be queried to find errors in a web service
 The user can load just the error messages from the logs into RAM across a set of nodes and query them interactively
lines = spark.textFile(“hdfs://…”)
errors = lines.filter(_.startsWith(“ERROR”))
errors.persist()
No computation has been performed yet
Define an RDD backed by an HDFS file
Create a new RDD containing error messages
The new RDD persists in memory and shared among queries for efficiency
18

Example: Console log mining
errors.count() errors=errors.filter(_.contains(“MySQL”)).count()
Return the time-fields of these errors (assuming the time-field is #3 in tab-separated format
errors.filter(_.contains(“HDFS”)).map(_.split(‘/t’)(3)).collect()
Counts error messages
Counts error messages containing
“MySQL”
After the first action involving errors runs, Spark will store the partitions of errors in memory, greatly speeding up subsequent computations on it.
19

Example: Console log mining
lines =spark.textFile(“hdfs://…”)
errors =
lines.filter(_.startsWith(“ERROR”))
errors.persist()
errors.filter(_.contains(“HDFS”)). map(_.split(‘/t’)(3)).collect()
 Lineage
if a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.
20

Lazy evaluation
 In the log example, Spark began to execute after errors.count()
 Spark executes only after seeing the first action, and it lazy evaluates transformations (e.g., spark.textFile(),lines.filter())
 i.e., records metadata for them
Benefits of Lazy evaluation
 Lazy evaluation provides “global view”, so
 Spark can optimize the required calculations by grouping operations
together
 Spark recovers from failures and slow workers
21

RDD vs. DSM
 Distributed Shared Memory (DSM)
 a global address space that applications can
read and write to in arbitrary locations
 RDDs created by coarse-grained transformations (applied to the entire dataset) but reads can be fine-grained (read from a specific location)
DSM allows fine-grained writes and reads
 Thus, RDDs restricted to applications performing bulk writes
22

Advantages of RDD
1. RDDs provide efficient fault tolerance
 They can be recovered using lineage
 Only lost partitions need to be recomputed (and this can be done in
parallel)
2. Spark can run backup copies of slow tasks (e.g., to address issues with slow nodes as in MapReduce) without accessing the same memory
3. Runtime can schedule tasks based on data locality to improve performance
4. RDDs degrade gracefully when there is not enough memory to store them
 Partitions that do not fit in RAM stored on disk
23

Overview of the advantages of RDD
backup tasks
24

Applications for RDD
 Suitable for batch applications that perform the same operation to the entire dataset
 RDDs remember each transformation as one step in a lineage graph  RDDs can recover lost partitions efficiently
 Not suitable for applications that make asynchronous fine-grained updates to shared state
 e.g., storage system for a web application in which many users update values on the same table
25

Spark programming interface
Cluster of workers
worker
worker worker
Driver program
Developer’s program
tasks
results
 Defines RDDs
 Invokes actions on them.  Tracks the RDDs’ lineage.
Workers are processes that
 Read data blocks from distributed
file system
 Store RDD partitions in RAM
This is an architecture focusing on how Driver program communicates with the cluster in terms of RDDs. Later we will see the functions of the driver’s program and cluster as the program is executed.
26

Representing RDDs
 Goal: a representation that can track lineage and provide transformation that can be composed arbitrarily
Representation of an RDD
 A set of partitions (atomic pieces of the dataset)
 A set of dependencies on parent RDDs
 A function for computing the dataset based on its parents
 Metadata about partitioning scheme and data placement
Example: An RDD representing an HDFS file has a partition for each block of the file and knows which machines each block is on
27

Representing dependencies
 Narrow dependencies
 each partition of the parent RDD is used by at most one partition of the child
RDD
 e.g., map (recall that it is “1-1”):
Parent RDD partitions
 allow pipelined execution on a node
 e.g., map followed by filter on each element
 efficient failure recovery
 only the lost parent partitions need to be recomputed and the recomputation
can be parallel
28

Representing dependencies
 Wide dependencies
 Multiple child partitions may depend on a single partition of parent RDD
 Require data from all parent partitions, in order to be available and shuffled across the nodes
 Failure recovery involves many RDDs and complete re-execution may be needed
29

How Spark runs on cluster mode
The functions of the driver’s program and cluster as the program is executed
 Cluster Manager: Allocates resources across applications
 Support for standalone (default), Apache Mesos, Hadoop YARN
http://spark.apache.org/docs/latest/cluster-overview.html
30

How Spark runs on cluster mode
 Driver Program
 must listen for and accept incoming connections from its executors throughout its
lifetime (network addressable from the worker nodes)
 should run close to the workers (e.g., in the same local area network
 SparkContext: connects to the cluster manager, acquires executors, sends application code and tasks to executors
http://spark.apache.org/docs/latest/cluster-overview.html
31

How Spark runs on cluster mode
 Executor: process that performs computations and stores data
 each application gets its own executors (it is isolated from others)
 Task: a unit of work that will be sent to an executor
http://spark.apache.org/docs/latest/cluster-overview.html
32

Programming with Spark
 Spark interfaces in Scala, Python.
 We will use pySpark, the Python interface.
 Part of Cloudera VM Quickstart
 Can also be installed independently on a pc or cluster
 http://spark.apache.org/docs/1.6.0/index.html (see Downloading)
 API
 http://spark.apache.org/docs/1.6.0/api/python/pyspark.html
33

pySpark in shell
 ./bin/pyspark — master local to start the shell
 ./bin/pyspark — master local[k] to start the shell with k worker threads
(ideally, k=the number of cores)  SparkContext is the “sc” variable that is created automatically.
>>> sc  Create an RDD using
 the text in “/usr/share/dict/words” file (local dir is the dir you run pyspark from)
>>>tf = sc.textFile(“file:///usr/share/dict/words”)
 tf is a pointer to the file. No loading is performed. file:// can be replaced by hdfs:// to access file in HDFS
34

pySpark in shell
 Parallelized collections
 Created by the parallelize method of sc on an existing iterable or collection d
 The elements of d are copied to form a distributed dataset that can be
operated on in parallel
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d)
 The number of partitions of the parallelized collection is determined automatically based on the cluster, but it can be set up as a second optional parameter
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d,10)
35

pySpark in shell
 count
 Returns the number of elements in this RDD
 filter
 Argument: a function that acts as a filter
 lambda is a type of function with no specific name
 Output: a new RDD with elements that pass the filter (i.e., those on which the
function returns true)
>>> tf = sc.textFile(“file:///usr/share/dict/words”) >>> lines_nonempty=tf.filter( lambda x: len(x) > 0) >>> lines_nonempty.count()
Same output with $cat /usr/share/dict/words |wc
36

pySpark in shell
 map
 Argument: a function f
 Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them  collect
 returns all the elements of the dataset as an array at the driver program
>>> nums = sc.parallelize([1, 2, 3, 4])
>>> squared = nums.map(lambda x: x * x).collect() >>> for num in squared:
print(num,”,”) Output: 1,4,9,16
37

pySpark in shell
 map
 Argument: a function f
 Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them (each element passed through f)  flatMap
 Output: similar to map but returns an RDD of the elements >>> x = sc.parallelize([“a b”, “c d”])
>>> y = x.map(lambda x: x.split(‘ ‘)) >>> y.collect()
[[‘a’, ‘b’], [‘c’, ‘d’]]
>>> y = x.flatMap(lambda x: x.split(‘ ‘))
>>> y.collect()
[‘a‘,’b’,’c’,’d’]
38

pySpark in shell
 sample(withReplacement, fraction, [seed])
 Output: A sample of size fraction*100% of the data, with or without
replacement, using a given random number generator seed
 rddA.union(rddB)
 rddA.intersection(rddB)
 rddA.subtract(rddB)
 rddA.cartesian(rddB)
 rddA.join(rddB,[number of reduce tasks])
 When called on datasets of type (K, V) and (K, W), where K is a key and V, W values, it returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key
>>> x = sc.parallelize([(‘a’,1),(‘b’,2)])
>>> y = sc.parallelize([(‘b’,3),(‘d’,4)])
>>> x.join(y).collect()
[(‘b’, (2,3))]
39

pySpark in shell
 Reduce
 Arguments: Two elements of the same type in an RDD, a commutative and
associative function f
 Output: A new element of the same type as that of the arguments, which is
the result of applying f to the elements
 Fold
 Same as reduce but with a first identity (“zero”) argument of the type we want
to return
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> sum = rdd.reduce(lambda x,y: x+y)
>>> sum
Output: 15
>> sum2=rdd.fold(0.0,lambda x,y: x+y)
>> sum2
Output: 15.0
40

pySpark in shell
 Aggregate
 Like fold, but aggregates the results in each partition, using a function f1, and
then combines the results for all partitions using a function f2 >>> rdd = sc.parallelize([1,2,3,4],2)
>>> f1=(lambda local_res, list_elem: (local_res[0]+list_elem, local_res[1]+1)) >>> f2=(lambda local_resA, local_resB: (local_resA[0]+
local_resB[0],local_resA[1]+local_resB[1]
>>> sum = rdd.aggregate((0.0), f1, f2)
local_res is a pair (0,0)
Partitions
after seeing [1,2] local_res : (1,1) after seeing [1,2] local_res : (3,2)
41
)

pySpark in shell
 Aggregate
 Like fold, but aggregates the results in each partition, using a function f1, and
then combines the results for all partitions using a function f2 >>> rdd = sc.parallelize([1,2,3,4],2)
>>> f1=(lambda local_res, list_elem: (local_res[0]+list_elem, local_res[1]+1)) >>> f2=(lambda local_resA, local_resB: (local_resA[0]+
local_resB[0],local_resA[1]+local_resB[1]
>>> sum = rdd.aggregate((0.0), f1, f2)
local_res is a pair (0,0)
Partitions
after seeing [3,4] local_res : (3,1) after seeing [3,4] local_res : (7,2)
42
)

pySpark in shell
 Aggregate
 Like fold, but aggregates the results in each partition, using a function f1, and
then combines the results for all partitions using a function f2 >>> rdd = sc.parallelize([1,2,3,4],2)
>>> f1=(lambda local_res, list_elem: (local_res[0]+list_elem, local_res[1]+1)) >>> f2=(lambda local_resA, local_resB:(local_resA[0]+
local_resB[0],local_resA[1]+local_resB[1]))
>>> sum = rdd.aggregate((0.0), f1, f2)
after seeing [1,2] local_res : (3,2) after seeing [3,4] local_res : (7,2)
Final result:
43

pySpark in shell
 Accumulators
 Variables that are only “added” to through an associative and commutative
operation and can therefore be efficiently supported in parallel.
 Created from an initial value v by calling SparkContext.accumulator(v).  Tasks can add y to them using add(y)
 Only the driver program can read the accumulator’s value, using value
>>>accum = sc.accumulator(0)
>>> accum
Output: Accumulator
>>>sc.parallelize([1, 2, 3, 4,5]).foreach(lambda x: accum.add(x))
>> accum.value
Output: 15
44

pySpark in shell
 persist
 Sets the storage level of an RDD to persist. Useful if it will be reused.
 cache
 Sets the storage level to MEMORY_ONLY
Deserialized: Keeps data in memory in a serialized format, for space efficiency Replication: Replicates RDD partitions on 1 or 2 cluster nodes
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER
True
True
False
1
45

Spark with pyspark
“””SimpleApp.py”””
from pyspark import SparkContext
logFile = “YOUR_SPARK_HOME/README.md” # some file
sc = SparkContext(“local”, “Simple App”) #we need to create sc rdd = sc.textFile(logFile).cache()
numAs = rdd.filter(lambda s: ‘a’ in s).count()
print(‘Lines with a: ‘,numAs,’\n’)
sc.stop() #we need to stop sc
Runs in Linux with spark-submit. Remember the local[k] parameter in pyspark
$ YOUR_SPARK_HOME/bin/spark-submit \ –master local[4] \ SimpleApp.py
46

Extra slides
 The following four slides are for your reference
47

Job Scheduling
 Job: An action and the tasks that get spawned to evaluate it
 Job scheduling within a cluster
 Static partitioning of resources (default)
 each application is given a maximum amount of resources it can use, and holds onto them for its whole duration
 Dynamic adjustment of resources
 Resources allocated based on the workload and may be given back if no
longer used
 Job scheduling within an application
 multiple jobs can run concurrently if they were submitted by different threads.
http://spark.apache.org/docs/latest/cluster-overview.html
48

Job Scheduling within an application
 When an action on an RDD runs, the scheduler examines the RDD’s lineage and builds a DAG of stages.
 Each stage contains as many pipelined transformations as possible with narrow dependencies
 The boundaries of the stages are the shuffle operations required for wide dependencies, or any already computed partitions that can shortcircuit the computation of a parent RDD 49

Job Scheduling within an application
 Jobs in a cluster can be grouped into pools
 By default, the jobs in each pool are scheduled in a FIFO (First In First Out)
fashion
 Example:
 One pool per user, and each user’s job are running in the order they arrive
 Alternatively, fair sharing between jobs
 Spark assigns tasks to jobs in a round-robin fashion
 Thus, cluster resources are shared almost equally among jobs  Best for multi-user settings
 Example:
 When a long job is running, many short jobs are submitted. The short
jobs can receive resources right away
50

Job Scheduling within an application
 The fair sharing scheduler also supports grouping jobs into pools and setting different options for each pool
 e.g., a “high-priority” pool is created for important jobs
 Default behavior of pools
 Each pool gets an equal share of the cluster  Inside each pool, jobs run in FIFO order.
51