程序代写 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
Matei Zaharia, , , , , Cauley, . Franklin, , Ion Stoica University of California, Berkeley
We present Resilient Distributed Datasets (RDDs), a dis- tributed memory abstraction that lets programmers per- form in-memory computations on large clusters in a fault-tolerant manner. RDDs are motivated by two types of applications that current computing frameworks han- dle inefficiently: iterative algorithms and interactive data mining tools. In both cases, keeping data in memory can improve performance by an order of magnitude. To achieve fault tolerance efficiently, RDDs provide a restricted form of shared memory, based on coarse- grained transformations rather than fine-grained updates to shared state. However, we show that RDDs are expres- sive enough to capture a wide class of computations, in- cluding recent specialized programming models for iter- ative jobs, such as Pregel, and new applications that these models do not capture. We have implemented RDDs in a system called Spark, which we evaluate through a variety of user applications and benchmarks.
1 Introduction

Copyright By PowCoder代写 加微信 powcoder

Cluster computing frameworks like MapReduce [10] and Dryad [19] have been widely adopted for large-scale data analytics. These systems let users write parallel compu- tations using a set of high-level operators, without having to worry about work distribution and fault tolerance.
Although current frameworks provide numerous ab- stractions for accessing a cluster’s computational re- sources, they lack abstractions for leveraging distributed memory. This makes them inefficient for an important class of emerging applications: those that reuse interme- diate results across multiple computations. Data reuse is common in many iterative machine learning and graph algorithms, including PageRank, K-means clustering, and logistic regression. Another compelling use case is interactive data mining, where a user runs multiple ad- hoc queries on the same subset of the data. Unfortu- nately, in most current frameworks, the only way to reuse data between computations (e.g., between two MapRe- duce jobs) is to write it to an external stable storage sys- tem, e.g., a distributed file system. This incurs substantial overheads due to data replication, disk I/O, and serializa-
tion, which can dominate application execution times. Recognizing this problem, researchers have developed specialized frameworks for some applications that re- quire data reuse. For example, Pregel [22] is a system for iterative graph computations that keeps intermediate data in memory, while HaLoop [7] offers an iterative MapRe- duce interface. However, these frameworks only support specific computation patterns (e.g., looping a series of MapReduce steps), and perform data sharing implicitly for these patterns. They do not provide abstractions for more general reuse, e.g., to let a user load several datasets
into memory and run ad-hoc queries across them.
In this paper, we propose a new abstraction called re- silient distributed datasets (RDDs) that enables efficient data reuse in a broad range of applications. RDDs are fault-tolerant, parallel data structures that let users ex- plicitly persist intermediate results in memory, control their partitioning to optimize data placement, and ma-
nipulate them using a rich set of operators.
The main challenge in designing RDDs is defining a
programming interface that can provide fault tolerance efficiently. Existing abstractions for in-memory storage on clusters, such as distributed shared memory [24], key- value stores [25], databases, and Piccolo [27], offer an interface based on fine-grained updates to mutable state (e.g., cells in a table). With this interface, the only ways to provide fault tolerance are to replicate the data across machines or to log updates across machines. Both ap- proaches are expensive for data-intensive workloads, as they require copying large amounts of data over the clus- ter network, whose bandwidth is far lower than that of RAM, and they incur substantial storage overhead.
In contrast to these systems, RDDs provide an inter- face based on coarse-grained transformations (e.g., map, filter and join) that apply the same operation to many data items. This allows them to efficiently provide fault tolerance by logging the transformations used to build a dataset (its lineage) rather than the actual data.1 If a parti- tion of an RDD is lost, the RDD has enough information about how it was derived from other RDDs to recompute
1Checkpointing the data in some RDDs may be useful when a lin- eage chain grows large, however, and we discuss how to do it in §5.4.

just that partition. Thus, lost data can be recovered, often quite quickly, without requiring costly replication.
Although an interface based on coarse-grained trans- formations may at first seem limited, RDDs are a good fit for many parallel applications, because these appli- cations naturally apply the same operation to multiple data items. Indeed, we show that RDDs can efficiently express many cluster programming models that have so far been proposed as separate systems, including MapRe- duce, DryadLINQ, SQL, Pregel and HaLoop, as well as new applications that these systems do not capture, like interactive data mining. The ability of RDDs to accom- modate computing needs that were previously met only by introducing new frameworks is, we believe, the most credible evidence of the power of the RDD abstraction.
We have implemented RDDs in a system called Spark, which is being used for research and production applica- tions at UC Berkeley and several companies. Spark pro- vides a convenient language-integrated programming in- terface similar to DryadLINQ [31] in the Scala program- ming language [2]. In addition, Spark can be used inter- actively to query big datasets from the Scala interpreter. We believe that Spark is the first system that allows a general-purpose programming language to be used at in- teractive speeds for in-memory data mining on clusters.
We evaluate RDDs and Spark through both mi- crobenchmarks and measurements of user applications. We show that Spark is up to 20× faster than Hadoop for iterative applications, speeds up a real-world data analyt- ics report by 40×, and can be used interactively to scan a 1 TB dataset with 5–7s latency. More fundamentally, to illustrate the generality of RDDs, we have implemented the Pregel and HaLoop programming models on top of Spark, including the placement optimizations they em- ploy, as relatively small libraries (200 lines of code each).
This paper begins with an overview of RDDs (§2) and Spark (§3). We then discuss the internal representation of RDDs (§4), our implementation (§5), and experimen- tal results (§6). Finally, we discuss how RDDs capture several existing cluster programming models (§7), sur- vey related work (§8), and conclude.
2 Resilient Distributed Datasets (RDDs)
This section provides an overview of RDDs. We first de- fine RDDs (§2.1) and introduce their programming inter- face in Spark (§2.2). We then compare RDDs with finer- grained shared memory abstractions (§2.3). Finally, we discuss limitations of the RDD model (§2.4).
2.1 RDD Abstraction
Formally, an RDD is a read-only, partitioned collection of records. RDDs can only be created through determin- istic operations on either (1) data in stable storage or (2) other RDDs. We call these operations transformations to
differentiate them from other operations on RDDs. Ex- amples of transformations include map, filter, and join.2
RDDs do not need to be materialized at all times. In- stead, an RDD has enough information about how it was derived from other datasets (its lineage) to compute its partitions from data in stable storage. This is a power- ful property: in essence, a program cannot reference an RDD that it cannot reconstruct after a failure.
Finally, users can control two other aspects of RDDs: persistence and partitioning. Users can indicate which RDDs they will reuse and choose a storage strategy for them (e.g., in-memory storage). They can also ask that an RDD’s elements be partitioned across machines based on a key in each record. This is useful for placement op- timizations, such as ensuring that two datasets that will be joined together are hash-partitioned in the same way.
2.2 Interface
Spark exposes RDDs through a language-integrated API similar to DryadLINQ [31] and FlumeJava [8], where each dataset is represented as an object and transforma- tions are invoked using methods on these objects.
Programmers start by defining one or more RDDs through transformations on data in stable storage (e.g., map and filter). They can then use these RDDs in actions, which are operations that return a value to the application or export data to a storage system. Examples of actions include count (which returns the number of elements in the dataset), collect (which returns the ele- ments themselves), and save (which outputs the dataset to a storage system). Like DryadLINQ, Spark computes RDDs lazily the first time they are used in an action, so that it can pipeline transformations.
In addition, programmers can call a persist method to indicate which RDDs they want to reuse in future oper- ations. Spark keeps persistent RDDs in memory by de- fault, but it can spill them to disk if there is not enough RAM. Users can also request other persistence strategies, such as storing the RDD only on disk or replicating it across machines, through flags to persist. Finally, users can set a persistence priority on each RDD to specify which in-memory data should spill to disk first.
2.2.1 Example: Console Log Mining
Suppose that a web service is experiencing errors and an operator wants to search terabytes of logs in the Hadoop filesystem (HDFS) to find the cause. Using Spark, the op- erator can load just the error messages from the logs into RAM across a set of nodes and query them interactively. She would first type the following Scala code:
2Although individual RDDs are immutable, it is possible to imple- ment mutable state by having multiple RDDs to represent multiple ver- sions of a dataset. We made RDDs immutable to make it easier to de- scribe lineage graphs, but it would have been equivalent to have our abstraction be versioned datasets and track versions in lineage graphs.

RD . Shared Mem.
Coarse- or fine-grained
Fine-grained
Coarse-grained
Fine-grained
Consistency
Trivial (immutable)
Up to app / runtime
Fault recovery
Fine-grained and low- overhead using lineage
Requires checkpoints and program rollback
Straggler mitigation
Possible using backup tasks
Work placement
Automatic based on data locality
Up to app (runtimes aim for transparency)
Behavior if not enough RAM
Similar to existing data flow systems
Poor performance (swapping?)
filter(_.startsWith(“ERROR”))
filter(_.contains(“HDFS”)))
HDFS errors
map(_.split(‘\t’)(3))
time fields
Figure 1: Lineage graph for the third query in our example. Boxes represent RDDs and arrows represent transformations.
lines = spark.textFile(“hdfs://…”)
errors = lines.filter(_.startsWith(“ERROR”))
errors.persist()
Line 1 defines an RDD backed by an HDFS file (as a collection of lines of text), while line 2 derives a filtered RDD from it. Line 3 then asks for errors to persist in memory so that it can be shared across queries. Note that the argument to filter is Scala syntax for a closure.
At this point, no work has been performed on the clus- ter. However, the user can now use the RDD in actions, e.g., to count the number of messages:
errors.count()
The user can also perform further transformations on the RDD and use their results, as in the following lines:
// Count errors mentioning MySQL:
errors.filter(_.contains(“MySQL”)).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains(“HDFS”))
.map(_.split(’\t’)(3))
.collect()
After the first action involving errors runs, Spark will store the partitions of errors in memory, greatly speed- ing up subsequent computations on it. Note that the base RDD, lines, is not loaded into RAM. This is desirable because the error messages might only be a small frac- tion of the data (small enough to fit into memory).
Finally, to illustrate how our model achieves fault tol- erance, we show the lineage graph for the RDDs in our third query in Figure 1. In this query, we started with errors, the result of a filter on lines, and applied a fur- ther filter and map before running a collect. The Spark scheduler will pipeline the latter two transformations and send a set of tasks to compute them to the nodes holding the cached partitions of errors. In addition, if a partition of errors is lost, Spark rebuilds it by applying a filter on only the corresponding partition of lines.
Table 1: Comparison of RDDs with distributed shared memory.
2.3 Advantages of the RDD Model
To understand the benefits of RDDs as a distributed memory abstraction, we compare them against dis- tributed shared memory (DSM) in Table 1. In DSM sys- tems, applications read and write to arbitrary locations in a global address space. Note that under this definition, we include not only traditional shared memory systems [24], but also other systems where applications make fine- grained writes to shared state, including Piccolo [27], which provides a shared DHT, and distributed databases. DSM is a very general abstraction, but this generality makes it harder to implement in an efficient and fault- tolerant manner on commodity clusters.
The main difference between RDDs and DSM is that RDDs can only be created (“written”) through coarse- grained transformations, while DSM allows reads and writes to each memory location.3 This restricts RDDs to applications that perform bulk writes, but allows for more efficient fault tolerance. In particular, RDDs do not need to incur the overhead of checkpointing, as they can be recovered using lineage.4 Furthermore, only the lost partitions of an RDD need to be recomputed upon fail- ure, and they can be recomputed in parallel on different nodes, without having to roll back the whole program.
A second benefit of RDDs is that their immutable na- ture lets a system mitigate slow nodes (stragglers) by run- ning backup copies of slow tasks as in MapReduce [10]. Backup tasks would be hard to implement with DSM, as the two copies of a task would access the same memory locations and interfere with each other’s updates.
Finally, RDDs provide two other benefits over DSM. First, in bulk operations on RDDs, a runtime can sched-
3Note that reads on RDDs can still be fine-grained. For example, an application can treat an RDD as a large read-only lookup table.
4In some applications, it can still help to checkpoint RDDs with long lineage chains, as we discuss in Section 5.4. However, this can be done in the background because RDDs are immutable, and there is no need to take a snapshot of the whole application as in DSM.

Input Data
Input Data
Input Data
Figure 2: Spark runtime. The user’s driver program launches multiple workers, which read data blocks from a distributed file system and can persist computed RDD partitions in memory.
ule tasks based on data locality to improve performance. Second, RDDs degrade gracefully when there is not enough memory to store them, as long as they are only being used in scan-based operations. Partitions that do not fit in RAM can be stored on disk and will provide similar performance to current data-parallel systems.
2.4 Applications Not Suitable for RDDs
As discussed in the Introduction, RDDs are best suited for batch applications that apply the same operation to all elements of a dataset. In these cases, RDDs can ef- ficiently remember each transformation as one step in a lineage graph and can recover lost partitions without hav- ing to log large amounts of data. RDDs would be less suitable for applications that make asynchronous fine- grained updates to shared state, such as a storage sys- tem for a web application or an incremental web crawler. For these applications, it is more efficient to use systems that perform traditional update logging and data check- pointing, such as databases, RAMCloud [25], Percolator [26] and Piccolo [27]. Our goal is to provide an efficient programming model for batch analytics and leave these asynchronous applications to specialized systems.
3 Interface
Spark provides the RDD abstraction through a language- integrated API similar to DryadLINQ [31] in Scala [2], a statically typed functional programming language for the Java VM. We chose Scala due to its combination of conciseness (which is convenient for interactive use) and efficiency (due to static typing). However, nothing about the RDD abstraction requires a functional language.
To use Spark, developers write a driver program that connects to a cluster of workers, as shown in Figure 2. The driver defines one or more RDDs and invokes ac- tions on them. Spark code on the driver also tracks the RDDs’ lineage. The workers are long-lived processes that can store RDD partitions in RAM across operations.
As we showed in the log mining example in Sec- tion 2.2.1, users provide arguments to RDD opera-
tions like map by passing closures (function literals). Scala represents each closure as a Java object, and these objects can be serialized and loaded on another node to pass the closure across the network. Scala also saves any variables bound in the closure as fields in the Java object. For example, one can write code like var x = 5; rdd.map(_ + x) to add 5 to each element of an RDD.5
RDDs themselves are statically typed objects parametrized by an element type. For example, RDD[Int] is an RDD of integers. However, most of our examples omit types since Scala supports type inference.
Although our method of exposing RDDs in Scala is conceptually simple, we had to work around issues with Scala’s closure objects using reflection [33]. We also needed more work to make Spark usable from the Scala interpreter, as we shall discuss in Section 5.2. Nonethe- less, we did not have to modify the Scala compiler.
3.1 RDD Operations in 2 lists the main RDD transformations and actions available in Spark. We give the signature of each oper- ation, showing type parameters in square brackets. Re- call that transformations are lazy operations that define a new RDD, while actions launch a computation to return a value to the program or write data to external storage.
Note that some operations, such as join, are only avail- able on RDDs of key-value pairs. Also, our function names are chosen to match other APIs in Scala and other functional languages; for example, map is a one-to-one mapping, while flatMap maps each input value to one or more outputs (similar to the map in MapReduce).
In addition to these operators, users can ask for an RDD to persist. Furthermore, users can get an RDD’s partition order, which is represented by a Partitioner class, and partition another dataset according to it. Op- erations such as groupByKey, reduceByKey and sort au- tomatically result in a hash or range partitioned RDD.
3.2 Example Applications
We complement the data mining example in Section 2.2.1 with two iterative applications: logistic regression and PageRank. The latter also showcases how control of RDDs’ partitioning can improve performance.
3.2.1 Logistic Regression
Many machine learning algorithms are iterative in nature because they run iterative optimization procedures, such as gradient descent, to maximize a function. They can thus run much faster by keepin

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com