CS计算机代考程序代写 SQL python data science Java hbase hadoop cache Hive 1

1

Introduction to Data Science

Lecture 17

Apache Spark,

Amazon DynamoDB

CIS 5930/4930 – Fall 2021

Assignments

CIS 5930/4930 – Fall 2021

• Homework 3
• Posted on Canvas 10/14
• Due 10/21 3pm on Canvas

Spark

• Fast and expressive cluster computing system
interoperable with MapReduce/Hadoop

• Improves performance (orders of magnitude faster)

– In-memory computing primitives

– General computation graphs

• Improves usability (less code)

– APIs in Scala, Java, Python

– Interactive shell

CIS 5930/4930 – Fall 2021

Reading

• M. Zaharia et al “Resilient Distributed Datasets: A
Fault Tolerant Abstraction for In Memory Cluster
Computing”, NSDI 2012

CIS 5930/4930 – Fall 2021

Spark Stack

• Berkeley Data Analytics Stack (BDAS)

CIS 5930/4930 – Fall 2021

Problems with MapReduce

• MapReduce greatly simplified big data analysis

• But as soon as it got popular, users wanted more:

– More complex, multi-pass analytics (e.g. ML, graph)

– More interactive ad-hoc queries

– More real-time stream processing

• All 3 need faster data sharing across parallel jobs

CIS 5930/4930 – Fall 2021

1 2

3 4

5 6

2

Data Exchange in MapReduce

• Bottlenecks: Disk I/O, Replication, Scheduling

CIS 5930/4930 – Fall 2021

Data Exchange in Spark

• RAM is orders of magnitude faster than disk + network

CIS 5930/4930 – Fall 2021

Data Model

• Resilient Distributed Datasets (RDDs)
• Distributed collections of objects that can

be cached in memory across the cluster
• Manipulated through parallel operators
• Automatically recomputed on failure

• Programming interface
• Functional APIs in Scala, Java, Python
• Interactive use from Scala shell

CIS 5930/4930 – Fall 2021

Lambda Functions
errors = lines.filter(lambda y: y.startswith(“ERROR”))

bool detect_error(string y)
{

return y.startswith(“ERROR”);
}

• Equivalent

CIS 5930/4930 – Fall 2021

Log Mining

• Load error messages from a Web server log
into memory, then interactively search for
patterns (Python code):

messages.filter(lambda f: “PatternX” in f).count
messages.filter(lambda f: “PatternY” in f).count

lines = spark.textFile(“file://…”)
errors = lines.filter(lambda f: f.startswith(“ERROR”))
messages = errors.map(lambda f: f.split(‘\t’)[2])
messages.cache()

CIS 5930/4930 – Fall 2021

Fault Tolerance

RDDs track lineage info to rebuild lost data

• file.map(lambda rec: (rec.type, 1))
.reduceByKey(lambda x, y: x + y)
.filter(lambda (type, count): count > 100)

CIS 5930/4930 – Fall 2021

7 8

9 10

11 12

3

Evaluation (Logistic Regression)

CIS 5930/4930 – Fall 2021

Architecture

• Spark client is a library
• Runs tasks locally or on cluster

• Mesos, YARN, standalone mode

• Accesses storage systems via
Hadoop InputFormat API
• Local files, HBase, HDFS, S3

CIS 5930/4930 – Fall 2021

Spark SQL
• SQL analytics engine for Spark (uses a

columnar storage engine)

• Orders of magnitude faster than

Apache Hive

• Compatible with Apache Hive

• HiveQL, UDF/UDAF, SerDe IO

• Runs on existing Hive warehouses

• In use at Yahoo! for in-memory OLAP

CIS 5930/4930 – Fall 2021

Hive Architecture

CIS 5930/4930 – Fall 2021

SparkSQL Architecture

CIS 5930/4930 – Fall 2021

SparkSQL vs Hive
• Lower-latency engine
• Columnar storage with compression
• New optimizations (e.g. map pruning)

CIS 5930/4930 – Fall 2021

13 14

15 16

17 18

4

Spark MLLib
• classification: logistic regression, linear
support vector machine (SVM), naive Bayes
• regression: generalized linear regression
• collaborative filtering: alternating least
squares (ALS)
• clustering: k-means
• decomposition: singular value decomposition
(SVD), principal component analysis (PCA)

CIS 5930/4930 – Fall 2021

Reading

• G. Decandia et al, “Dynamo: Amazon’s highly
available Key-value store”, SOSP 2007

• Online: Amazon DynamoDB

CIS 5930/4930 – Fall 2021

Design
• Data Model

pairs
– Values are binary objects
– No schema

• Operators (by key)
– Insert
– Delete
– Find

• Consistency
– Replication with eventual consistency
– Store multiple versions
– Version control

CIS 5930/4930 – Fall 2021

Operators

• Get(key)

– Locates object replicas associated with key

– Returns one object or a list with versions

– No schema

• Put (key, object)

– Stores the object in a fault-tolerant distributed
storage

CIS 5930/4930 – Fall 2021

Storage

• Distributed/Replicated Hash Table

– Each pair is stored by some node N

– N = hash(key)

• Get (key)

– Client does not always have access to a node N storing
needed pair, but has access to some node P

– If a new node joins N → N +1
• A distributed hash table needs to be updated

– Fault tolerance needed
• Replicate to several nodes

CIS 5930/4930 – Fall 2021

Routing

CIS 5930/4930 – Fall 2021

19 20

21 22

23 24

5

Routing

• Get (key)

– Client does not always have access to a node N
storing needed pair, but has access to
some node P

• Simple routing

– Each node stores it neighbors’ addresses

– Forward Get requests to the neighbor and so on
until reaching destination

– Complexity: O(n)

CIS 5930/4930 – Fall 2021

Routing

• Get (key)

– Client does not always have access to a node N storing
needed pair, but has access to some node P

• Finger table routing

– Each node K stores several neighbors’ addresses – K,
K+2, K+4 … K + 2^m

– Forward Get request to the closest neighbor from the
routing table, repeat until reaching the destination node
will be the immediate neighbor

– Complexity: O(log n)

CIS 5930/4930 – Fall 2021

Routing

CIS 5930/4930 – Fall 2021

Replication

• Need replication for fault-tolerance

• Let D – degree of replication

• Replicate to nodes hash(key),
hash(key) + 1, …, hash(key) + D -1

CIS 5930/4930 – Fall 2021

Replication in Dynamo

• Assign each key to coordinator responsible for
replication process

• Preference list for a key is its set of replicas

• Routing
– One-hop: nodes store preference lists for its keys

• Replication
– Each update creates a new version of

pair

– Vector clocks track versions

CIS 5930/4930 – Fall 2021

Vector Clocks

• Standard versioning

– Each data object D stores its own timestamp T

– D[T=1], D[T=10], D[T=25], …

• Vector clocks

– D stores a set of [node, timestamp] pairs

– D[(S1, T1), (S2, T2)]

CIS 5930/4930 – Fall 2021

25 26

27 28

29 30

6

Vector Clocks

CIS 5930/4930 – Fall 2021

Vector Clocks

• A client writes data object O1 at node N1
– O1[(N1, 1)]

• A client reads O1, writes O2 at the same node N1
– O2[(N1, 2)]

• A client reads O2, writes O3 processed by a
different node N2
– O3[(N1, 2), (N2, 1)]

• A client reads O2, write O4 processed by a different
node N3
– O4[(N1, 2), (N3, 1)]

• A client reads O3, O4 → conflict

CIS 5930/4930 – Fall 2021

Dynamo Read/Write

• Parameters

– N – number of replicas of a data object

– R – minimum number of nodes that must participate
in a valid read operation

– W – minimum number of nodes that must
participate in a valid write operation

CIS 5930/4930 – Fall 2021

Dynamo Read/Write

• Write

– A write request sent to coordinator

– Coordinator generates a vector clock pair, stores it,
and forwards a new version to N replicas

– If W <= N nodes confirm, then write was successful • Read – A read request is sent to coordinator – Coordinator requests a data object from N replicas – If/when it receives R responses, returns the object CIS 5930/4930 - Fall 2021 31 32 33 34