1
Introduction to Data Science
Lecture 17
Apache Spark,
Amazon DynamoDB
CIS 5930/4930 – Fall 2021
Assignments
CIS 5930/4930 – Fall 2021
• Homework 3
• Posted on Canvas 10/14
• Due 10/21 3pm on Canvas
Spark
• Fast and expressive cluster computing system
interoperable with MapReduce/Hadoop
• Improves performance (orders of magnitude faster)
– In-memory computing primitives
– General computation graphs
• Improves usability (less code)
– APIs in Scala, Java, Python
– Interactive shell
CIS 5930/4930 – Fall 2021
Reading
• M. Zaharia et al “Resilient Distributed Datasets: A
Fault Tolerant Abstraction for In Memory Cluster
Computing”, NSDI 2012
CIS 5930/4930 – Fall 2021
Spark Stack
• Berkeley Data Analytics Stack (BDAS)
CIS 5930/4930 – Fall 2021
Problems with MapReduce
• MapReduce greatly simplified big data analysis
• But as soon as it got popular, users wanted more:
– More complex, multi-pass analytics (e.g. ML, graph)
– More interactive ad-hoc queries
– More real-time stream processing
• All 3 need faster data sharing across parallel jobs
CIS 5930/4930 – Fall 2021
1 2
3 4
5 6
2
Data Exchange in MapReduce
• Bottlenecks: Disk I/O, Replication, Scheduling
CIS 5930/4930 – Fall 2021
Data Exchange in Spark
• RAM is orders of magnitude faster than disk + network
CIS 5930/4930 – Fall 2021
Data Model
• Resilient Distributed Datasets (RDDs)
• Distributed collections of objects that can
be cached in memory across the cluster
• Manipulated through parallel operators
• Automatically recomputed on failure
• Programming interface
• Functional APIs in Scala, Java, Python
• Interactive use from Scala shell
CIS 5930/4930 – Fall 2021
Lambda Functions
errors = lines.filter(lambda y: y.startswith(“ERROR”))
bool detect_error(string y)
{
return y.startswith(“ERROR”);
}
• Equivalent
CIS 5930/4930 – Fall 2021
Log Mining
• Load error messages from a Web server log
into memory, then interactively search for
patterns (Python code):
messages.filter(lambda f: “PatternX” in f).count
messages.filter(lambda f: “PatternY” in f).count
lines = spark.textFile(“file://…”)
errors = lines.filter(lambda f: f.startswith(“ERROR”))
messages = errors.map(lambda f: f.split(‘\t’)[2])
messages.cache()
CIS 5930/4930 – Fall 2021
Fault Tolerance
RDDs track lineage info to rebuild lost data
• file.map(lambda rec: (rec.type, 1))
.reduceByKey(lambda x, y: x + y)
.filter(lambda (type, count): count > 100)
CIS 5930/4930 – Fall 2021
7 8
9 10
11 12
3
Evaluation (Logistic Regression)
CIS 5930/4930 – Fall 2021
Architecture
• Spark client is a library
• Runs tasks locally or on cluster
• Mesos, YARN, standalone mode
• Accesses storage systems via
Hadoop InputFormat API
• Local files, HBase, HDFS, S3
CIS 5930/4930 – Fall 2021
Spark SQL
• SQL analytics engine for Spark (uses a
columnar storage engine)
• Orders of magnitude faster than
Apache Hive
• Compatible with Apache Hive
• HiveQL, UDF/UDAF, SerDe IO
• Runs on existing Hive warehouses
• In use at Yahoo! for in-memory OLAP
CIS 5930/4930 – Fall 2021
Hive Architecture
CIS 5930/4930 – Fall 2021
SparkSQL Architecture
CIS 5930/4930 – Fall 2021
SparkSQL vs Hive
• Lower-latency engine
• Columnar storage with compression
• New optimizations (e.g. map pruning)
CIS 5930/4930 – Fall 2021
13 14
15 16
17 18
4
Spark MLLib
• classification: logistic regression, linear
support vector machine (SVM), naive Bayes
• regression: generalized linear regression
• collaborative filtering: alternating least
squares (ALS)
• clustering: k-means
• decomposition: singular value decomposition
(SVD), principal component analysis (PCA)
CIS 5930/4930 – Fall 2021
Reading
• G. Decandia et al, “Dynamo: Amazon’s highly
available Key-value store”, SOSP 2007
• Online: Amazon DynamoDB
CIS 5930/4930 – Fall 2021
Design
• Data Model
–
– Values are binary objects
– No schema
• Operators (by key)
– Insert
– Delete
– Find
• Consistency
– Replication with eventual consistency
– Store multiple versions
– Version control
CIS 5930/4930 – Fall 2021
Operators
• Get(key)
– Locates object replicas associated with key
– Returns one object or a list with versions
– No schema
• Put (key, object)
– Stores the object in a fault-tolerant distributed
storage
CIS 5930/4930 – Fall 2021
Storage
• Distributed/Replicated Hash Table
– Each
– N = hash(key)
• Get (key)
– Client does not always have access to a node N storing
needed
– If a new node joins N → N +1
• A distributed hash table needs to be updated
– Fault tolerance needed
• Replicate
CIS 5930/4930 – Fall 2021
Routing
CIS 5930/4930 – Fall 2021
19 20
21 22
23 24
5
Routing
• Get (key)
– Client does not always have access to a node N
storing needed
some node P
• Simple routing
– Each node stores it neighbors’ addresses
– Forward Get requests to the neighbor and so on
until reaching destination
– Complexity: O(n)
CIS 5930/4930 – Fall 2021
Routing
• Get (key)
– Client does not always have access to a node N storing
needed
• Finger table routing
– Each node K stores several neighbors’ addresses – K,
K+2, K+4 … K + 2^m
– Forward Get request to the closest neighbor from the
routing table, repeat until reaching the destination node
will be the immediate neighbor
– Complexity: O(log n)
CIS 5930/4930 – Fall 2021
Routing
CIS 5930/4930 – Fall 2021
Replication
• Need replication for fault-tolerance
• Let D – degree of replication
• Replicate
hash(key) + 1, …, hash(key) + D -1
CIS 5930/4930 – Fall 2021
Replication in Dynamo
• Assign each key to coordinator responsible for
replication process
• Preference list for a key is its set of replicas
• Routing
– One-hop: nodes store preference lists for its keys
• Replication
– Each update creates a new version of
pair
– Vector clocks track versions
CIS 5930/4930 – Fall 2021
Vector Clocks
• Standard versioning
– Each data object D stores its own timestamp T
– D[T=1], D[T=10], D[T=25], …
• Vector clocks
– D stores a set of [node, timestamp] pairs
– D[(S1, T1), (S2, T2)]
CIS 5930/4930 – Fall 2021
25 26
27 28
29 30
6
Vector Clocks
CIS 5930/4930 – Fall 2021
Vector Clocks
• A client writes data object O1 at node N1
– O1[(N1, 1)]
• A client reads O1, writes O2 at the same node N1
– O2[(N1, 2)]
• A client reads O2, writes O3 processed by a
different node N2
– O3[(N1, 2), (N2, 1)]
• A client reads O2, write O4 processed by a different
node N3
– O4[(N1, 2), (N3, 1)]
• A client reads O3, O4 → conflict
CIS 5930/4930 – Fall 2021
Dynamo Read/Write
• Parameters
– N – number of replicas of a data object
– R – minimum number of nodes that must participate
in a valid read operation
– W – minimum number of nodes that must
participate in a valid write operation
CIS 5930/4930 – Fall 2021
Dynamo Read/Write
• Write
– A write request sent to coordinator
– Coordinator generates a vector clock pair, stores it,
and forwards a new version to N replicas
– If W <= N nodes confirm, then write was successful • Read – A read request is sent to coordinator – Coordinator requests a data object from N replicas – If/when it receives R responses, returns the object CIS 5930/4930 - Fall 2021 31 32 33 34