程序代写 • 􏰅 􏰋e ha􏰓e di􏰖c􏰂􏰖􏰖ed􏰳

• 􏰅 􏰋e ha􏰓e di􏰖c􏰂􏰖􏰖ed􏰳
– GFS/HDFS as backend storage systems for big data
– MapReduce (v1/v2), as frameworks to run data crunching tasks in parallel over big data (files)
• But what about:

Copyright By PowCoder代写 加微信 powcoder

– Iterative/multi-pass algorithms?
• Rec􏰭􏰴􏰴e􏰵da􏰔i􏰭􏰵 􏰖􏰇􏰖􏰔e􏰴􏰖􏰄 􏰴achi􏰵e 􏰶ea􏰬􏰵i􏰵g􏰄 da􏰔a e􏰈􏰕􏰶􏰭􏰬a􏰔i􏰭􏰵􏰄 􏰅 – Higher-level languages?
• SQL􏰄P􏰇􏰔h􏰭􏰵􏰄R􏰄􏰅
– Interactive data access/querying
– Complex data crunching algorithms

MapReduce issues
• From our discussions/lectures so far, what would you expect to be:
– MapReduce annoyances for the programmer
– Framework-specific design decisions hurting performance
– Possible ways of addressing the above

MapReduce issues
• Not everything is a sequence of Map and Reduce phases – E.g., joins, map-reduce-map, etc.
• Too much disk I/O
– Writingmapoutputdatatodisk,readingbackduringshuffling
• Significant setup/cleanup overhead – Interactivity out the window
• Multi-pass algorithms too slow
– Machi􏰵e 􏰶ea􏰬􏰵i􏰵g􏰄 g􏰬a􏰕h 􏰕􏰬􏰭ce􏰖􏰖i􏰵g􏰄 􏰅
• Only Java supported natively
– PythonthroughHadoopStreaming – SQL􏰔h􏰬􏰭􏰂ghHi􏰓e􏰊I􏰴􏰕a􏰶a􏰊D􏰬i􏰶􏰶􏰊􏰅
• Too hard for non programmers
– N􏰭􏰔 􏰔ha􏰔 ea􏰖􏰇 f􏰭􏰬 􏰕􏰬􏰭g􏰬a􏰴􏰴e􏰬􏰖 ei􏰔he􏰬􏰅

Spark to the rescue!

Spark to the rescue
• General purpose
– Batch and stream processing
– Map/Reduce just two of several dozens operations
• In-memory caching to alleviate disk I/O
– Works wonders for iterative algorithms
– Heck􏰄 􏰋􏰭􏰬k􏰖 􏰋􏰭􏰵de􏰬􏰖 f􏰭􏰬 j􏰂􏰖􏰔 ab􏰭􏰂􏰔 a􏰵􏰇 a􏰶g􏰭􏰬i􏰔h􏰴􏰅
– Up to 100x faster than MapReduce in-memory
– Up to 10x faster on disk
• Native support for several (higher-level) languages
– Scala, Python, Java, R, SQL
– Batch job submission and interactive RAPL shell
• Native support for all input formats supported by Hadoop
• Built-in libraries for streaming, machine learning, graph data, etc.
• A 􏰛one size fits all􏰪 􏰖􏰭􏰶􏰂􏰔i􏰭􏰵

Brief history of Spark
• Started as a class project at UC Berkeley AMPLab in 2009
• Open-sourced in 2010 (BSD)
• Donated to Apache Foundation in 2013 (ASL) – Top-Level Apache Project in 2014
• Databricks created in 2013
• Smashed the TeraSort world record in 2014
– PreviouslyheldbyMapReduce
• U􏰕 a􏰵d c􏰭􏰴i􏰵g􏰅
– Much more popular in Google searches than M/R

RDD􏰖􏰳 S􏰕a􏰬k􏰁􏰖 hea􏰬􏰔 a􏰵d 􏰖􏰭􏰂􏰶
• RDD: Resilient Distributed Dataset
– Immutable, partitioned and distributed, collection of records
• Can be created by:
– E􏰈􏰔e􏰬􏰵a􏰶 da􏰔a 􏰖􏰭􏰂􏰬ce􏰖 􏰌Had􏰭􏰭􏰕 i􏰵􏰕􏰂􏰔 f􏰭􏰬􏰴a􏰔􏰖􏰄 RDBMS􏰖􏰄 􏰶􏰭ca􏰶 fi􏰶e􏰖􏰄 􏰅􏰆􏰄
– Transforming other RDDs
• Can be cached and/or partitioned across cluster nodes
– User-configurable
– Caching mode governed by the RDDs Storage Level
• In-memory, on disk, both in-mem and on-disk, off-heap, replicated
• Serialised or deserialised
• LRU cache replacement
• The Driver records the lineage of each RDD
– I.e.,howitwascreatedfromotherRDDs/sources
– If something goes wrong, it can be reconstructed at runtime – D􏰭e􏰖􏰵􏰁􏰔ha􏰓e􏰔􏰭be􏰴a􏰔e􏰬ia􏰶i􏰖ed

• RDDs support two types of operations: Transformations and Actions
• Organised in DAGs, grouped into stages, executed in parallel
• Transformations: Create a new RDD from an existing one
– Lazy evaluation
• Nothing is really computed until output is requested
• Allows scheduler to optimise task execution
– Example:
• map(func) / flatMap(func)
• filter(func)
• distinct()
• join(otherRDD) •􏰅
• Actions: return a value to the driver/client and/or write results out to stable storage
– Cause a job to be submitted from the Driver to the cluster – Example:
• reduce(func) / reduceByKey(func) • count()
• saveAsTextFile()

S􏰕a􏰬k􏰳 A Bi􏰬d􏰁􏰖 E􏰇e Vie􏰋

S􏰕a􏰬k􏰳 A Bi􏰬d􏰁􏰖 E􏰇e Vie􏰋

Understanding the Spark framework
Again, three fundamental questions:
• What are the main entities (architectural elements)?
• Processing Plane:
– How are they engaged during execution ? – And what are the execution phases
• Data Plane:
– What data and how it flows between them

Spark entities
• Cluster Manager
– External service managing resources on the cluster
• L􏰭ca􏰶􏰄 MESOS􏰄 YARN􏰄 S􏰕a􏰬k S􏰔a􏰵da􏰶􏰭􏰵e􏰄 􏰅 • Worker Nodes
– Cluster nodes capable of executing application code • Managed/allocated by the Cluster Manager
– Multiple Worker processes per Worker Node
• One JVM process per Worker
– Runs in a client JVM on a client node
– Submits application to cluster and mediates results to user
• spark-submit (Java/Scala jar file)
• REPL (PySpark (Python), spark-shell (Scala), spark-sql (SQL))
– Program running the main() function and coordinating the job execution
– Must be available for the duration of an application
• Can be executed on the client node (client mode), or on a node in the cluster (cluster mode) • Executor
– Process on a Worker Node executing application code for a specific task/stage, caching datasets/results
– Runs in JVM spawned by a Worker
– Executes Tasks using a ThreadPool
• Task: Unit of work assigned to an executor
• Stage: Sets of tasks that can be executed without any network transfer

Interaction of Spark entities

Interaction of Spark entities
Client mode Cluster mode
Source: http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

• O􏰵e􏰖e􏰔􏰭fe􏰈ec􏰂􏰔􏰭􏰬􏰖􏰕e􏰬􏰛a􏰕􏰕􏰶ica􏰔i􏰭􏰵􏰪 – Equivalent of YARN child containers
– Single JVM per executor
– New threads for tasks
– Multiple tasks executing within each executor – Can exist even without an active job
• Dynamic Executor/resource allocation can change this Super fast startup time for tasks (msecs/secs) Compared to high startup times for MR (secs/mins)

Execution phases
1. Client instantiates a SparkContext object (SC)
– Responsible for I/O with the user, cluster manager service, and
– Initialises the Driver, which registers the application with the Cluster Manager and gets a list of Executors
– The Driver then is responsible for the Spark job
2. Client uses SC to create input RDDs
– Partitioned/parallelised across cluster nodes
3. Client uses Transformations on the input/intermediate
– NB1: Sequence of transformations creates a DAG of ops
– NB2: Lazy evaluationNo computation takes place so far on the cluster
4. Client uses an Action to print/store results

Execution phases
Action triggers submission of ops DAG to the DAGScheduler within
the Driver
The DAGScheduler builds stages of tasks
– Defines what will be executed where
– Implemented as an event queue
– Groupstogethertasksbasedontheirdependencies
– Submits the execution plan to the TaskScheduler within the Driver – Resubmits failed stages, if their output was lost
The TaskScheduler:
a. Ships tasks to Executors (serialised RDD lineage + transformations)
b. Monitors the execution/progress of their computation
• Launches tasks on Executors, according to resource and locality constraints • Decides where to run each task
• Deals with stragglers (speculative execution)
c. Returns the result to the DAGScheduler (as a series of events)

1. Set of input partitions
– Aka 􏰛􏰖􏰕􏰶i􏰔􏰖􏰪
– Si􏰴i􏰶a􏰬 􏰔􏰭 Had􏰭􏰭􏰕 MR􏰁􏰖 i􏰵􏰕􏰂􏰔 partitions
2. Dependencies on parent RDDs
3. Function to compute current RDD
given its dependencies – User-defined
4. Preferred locations – Optional
– Used to enforce data locality 5. Partitioner preferences
– Optional
– Hash/range partitioned
– Used to tweak the shuffle phase behaviour
• Example 1 􏰉 RDD off of HDFS file:
– Partitions = HDFS blocks
– Locations = HDFS block locations
– Compute function = read HDFS block
– Dependencies/Partitioner = N/A
• Example 2 􏰉 RDD out of filtering of
other RDD:
– Partitions = parent RDD partitions
– Dependencies = parent (1-1)
– Compute function = compute parent and filter it
– Locations = same as parent
– Partitioner = N/A
RDD Lineage

RDD operations
MapReduce map
MapReduce reduce MapReduce combiner

Creation of stages
• Based on RDD dependencies
– Think: What do I need to compute partition X?
– Thi􏰵k􏰳 Wha􏰔 ha􏰕􏰕e􏰵􏰖 if a 􏰕a􏰬e􏰵􏰔 􏰕a􏰬􏰔i􏰔i􏰭􏰵 􏰛die􏰖􏰪􏰷
• Narrow dependencies:
– Each partition of a parent RDD is used by at most one
partition of the child RDD
The task can be executed locally at the location of the parent RDD
• Wide dependencies:
– Multiple child RDD partitions depend on one parent RDD
Data needs to be shuffled
• i.e., stored (on disk) and subsequently accessed over the network
• Unless parent is hash-partitioned (why?)
– Data can be re-partitioned (user-configurable) • Trade-offs?

Creation of stages
Optimisation of execution plan
– Groupandpipelinenarrowopsinastage
– Choose task placement and shuffle algorithm to minimise data transfers (based on partitioning)
• Check cached data & preferred locations
• Materialise intermediate results at parent partitions (sounds familiar?)
– Reuse previously cached data, where applicable
Source: https://trongkhoanguyenblog.wordpress.com/2014/11/27/understand-rdd-operations-transformations-and-actions/

Data locality
• Basic idea: Same as in Hadoop MR
– Avoid network I/O
– Transfer the processing to the data • Or: Work only on local data (if possible)
• 􏰅 b􏰂􏰔 S􏰕a􏰬k d􏰭e􏰖 cachi􏰵g􏰳
– Fi􏰬􏰖􏰔 􏰔i􏰴e a fi􏰶e i􏰖 acce􏰖􏰖ed􏰄 i􏰔􏰁􏰖 􏰬ead 􏰭ff 􏰭f HDFS 􏰌􏰭􏰬 􏰭􏰔he􏰬 source) and cachedlocality controlled by HDFS (or other source)
– Subsequent accesses will use the cachelocality controlled by the cache
– If data is cached out (why/when?) fall back to HDFS locality

Data shuffling
• Much like in Hadoop MR
– Intermediate results partitioned and stored on the local FS
– Data are serialised on writing, deserialised on reading
– Writes to disk are batched
• Increased write throughput
– Data are automatically pre-combined in-memory before written out • Again, much like with Hadoop MR
– Subsequent stages transfer data over the network
– How many shuffle files for m map and r reduce tasks?
• But also pluggable
– Various types of shuffling
• Hash shuffle
• Sort-based shuffle (standard Hadoop-style shuffle, plus pluggable external sorter) •􏰅

Sharing state across executors
• 􏰅 􏰵􏰭􏰔 􏰬ea􏰶􏰶􏰇 􏰅
– Why would that be a bad idea?
• Two options:
– Accumulators
– Broadcast variables

Broadcast variables
• Si􏰴i􏰶a􏰬 􏰔􏰭 Had􏰭􏰭􏰕 MR􏰁􏰖 di􏰖􏰔􏰬ib􏰂􏰔ed cache
– Allows for read-only data to be sent to all nodes
• Initially stored locally by the Driver
• F􏰭􏰬 􏰔􏰬a􏰵􏰖f􏰭􏰬􏰴a􏰔i􏰭􏰵􏰖 􏰂􏰖i􏰵g 􏰔he 􏰛b􏰬􏰭adca􏰖􏰔ed􏰪 da􏰔a􏰄 􏰔he
Driver serialises and attaches its metadata
• An executor receiving this info will
– First check its local FS for a copy of the data
– If no copy exists, will transfer from the Driver over the network • HTTPtransfer
• Torrenttransfer(why?)
• Transferred only once per Worker Node
– Wh􏰇 􏰵􏰭􏰔 􏰭􏰵ce 􏰕e􏰬 􏰖􏰔age􏰊􏰔a􏰖k􏰊􏰅􏰷

Accumulators
• 􏰛Sha􏰬ed􏰪 􏰋􏰬i􏰔e-able variables
• Created by the Driver
• Tasks can only reset and add to the Accumulator
– A local copy is maintained per node
• O􏰵􏰶􏰇 􏰔he D􏰬i􏰓e􏰬 ca􏰵 􏰬ead 􏰔he Acc􏰂􏰴􏰂􏰶a􏰔􏰭􏰬􏰁􏰖 􏰓a􏰶􏰂e
• Great for aggregating over the whole cluster
– AlaHadoopMRCounters
• Beware of their quirks under failures!
– Guaranteedupdatedonceperaction
– NOT guaranteed updated once otherwise
• i.e., restarted transformations will reapply any updates!
– Do not affect the lazy evaluation semantics
• i.e., not updated until a job is actually submitted for execution

Failure handling
• RDD checkpointing vs lineage
– Both supported
– Lineage much faster, but think: what about very long lineages/several wide dependencies?
• 􏰛S􏰕ec􏰂􏰶a􏰔i􏰓e e􏰈ec􏰂􏰔i􏰭􏰵􏰪 ea􏰖􏰇 d􏰂e 􏰔􏰭 RDD􏰖 bei􏰵g i􏰴􏰴􏰂􏰔ab􏰶e
• Stage failures:
– Detected and dealt with by DAGScheduler
– Stage failures may lead to data loss of shuffle output files
– Failed stages resubmitted
• Task failures
– Detected and dealt with by TaskScheduler
– Tasks resubmitted if can be computed with no dependency on previous output
• Driver failure
– Handled by the user (local mode) or by the Cluster Manager (cluster mode)
– Cluster mode: Same as for Application Master in YARN
• Worker Node failure:
– Handled by the Driver and Cluster Manager
– Tasks rescheduled on other Worker Nodes
– Same as failure of Node Manager in YARN
• Cluster Manager failure:
– Same as failure of Resource Manager in YARN

• Streaming
􏰛B􏰂i􏰶􏰔-i􏰵􏰪 􏰶ib􏰬a􏰬ie􏰖
– Rides on core Spark functionality
– Input stream partitioned in micro-batches􏰄 􏰔he􏰵 􏰔􏰬ea􏰔ed a􏰖 􏰛􏰴ic􏰬􏰭-􏰪RDD􏰖
– Driver starts long-running tasks to read+replicate input data; then periodically executes the
DAG of ops
– Dataframes: RDDs + schema metadata => better optimisations
– Catalyst: full-blown query optimiser
– Not as mature/complete as Hive
– 􏰅 b􏰂􏰔 Hi􏰓e 􏰬e􏰔􏰬􏰭fi􏰔􏰔ed 􏰔􏰭 􏰂􏰖e S􏰕a􏰬k i􏰵􏰖􏰔ead 􏰭f MR
– A slew of machine learning algorithms
– Rivals Mahoot
– A graph processing framework
– Rivals Giraph/Pregel
• Tungsten
– Smarter/faster serialisation and deserialisation

Example: WordCount
public class WordCount {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“WordCount-v1″));
sc.textFile(args[0])
.flatMap(s -> Arrays.asList(s.split(” “))) .mapToPair(s -> new Tuple2(s, 1)) .reduceByKey((x, y) -> x + y) .saveAsTextFile(args[1]);
sc.close();

Example: WordCount
public class WordCount {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName(“WordCount-v1″));
sc.textFile(args[0])
.flatMap(s -> Arrays.asList(s.split(” “))) .mapToPair(s -> (s, 1))
.reduceByKey((x, y) -> x + y) .saveAsTextFile(args[1]);
sc.close(); }

Example: PageRank
public class PageRank {
public static void main(String[] args) {
int numIterations = Integer.parseInt(args[1]);
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(􏰣PageRank-v1”));
JavaRDD lines = sc.textFile(args[0], 1);
JavaPairRDD> links = lines
.mapToPair( s ->
String[] parts = s.split(‘\\s+’);
return new Tuple2(parts[0], parts[1]); )
.distinct().groupByKey().cache();
JavaPairRDD ranks = links.mapValues(s -> 1.0); for (int current = 0; current < numIterations; current++) { JavaPairRDD contribs = links.join(ranks).values()
.flatMapToPair(v -> {
List> res = new ArrayList>(); int urlCount = Iterables.size(v._1);
for (String s : v._1)
res.add(new Tuple2(s, v._2() / urlCount)); return res;
ranks = contribs.reduceByKey((a, b) -> a + b).mapValues(v -> 0.15 + v * 0.85);
List> output = ranks.collect();

Example: PageRank

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com