程序代写代做代考 graph hadoop go database html 7CCSMBDT – Big Data Technologies Week 10

7CCSMBDT – Big Data Technologies Week 10
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1

Overview
 What is Spark streaming  Overview
 DStream Abstraction
 System Model
 Persistence / Caching  RDD Checkpointing
Reading:
* Zaharia et al. Discretized streams: fault-tolerant streaming computation at scale.
http://dl.acm.org/citation.cfm?doid=2517349.2522737
* Overview and API
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html https://stanford.edu/~rezab/sparkclass/slides/td_streaming.pdf
2

What is it ? (1/3)
Spark SQL
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
3

What is it ? (2/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
Scales to hundreds of nodes
Achieves second-scale latencies
Efficiently recovers from failures
Integrates with batch and interactive processing
4

What is it ? (3/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html 5

Advantages of a unified stack
 Explore data interactively (in shell) to identify problems
 Use same code that you used in shell also in Spark for processing large logs
 Use similar code in Spark Streaming for realtime processing
$ ./spark-shell
scala> val file = sc.hadoopFile(“smallLogs”)

scala> val filtered = file.filter(_.contains(“ERROR”)) …
scala> val mapped = filtered.map(…)

object ProcessProductionData {
def main(args: Array[String]) {
val sc = new SparkContext(…)
val file = sc.hadoopFile(“productionLogs”)
val filtered = file.filter(_.contains(“ERROR”)) val mapped = filtered.map(…)

} }
object ProcessLiveStream {
def main(args: Array[String]) {
val sc = new StreamingContext(…)
val stream = sc.kafkaStream(…)
val filtered = stream.filter(_.contains(“ERROR”)) val mapped = filtered.map(…)

} }

Why we need it?
1. Twitter campaigns that promote computer viruses
“Go to http://… to see the replay during football match”
Twitter needs to discover such malicious campaigns in real time
2. Datacenter receives streams of logs from many clusters, e.g. Flume. Wants to detect problems
3. Facebook adverts https://www.facebook.com/business/products/ads is an interface to advertisers.
Advertisers want to understand who clicks on their pages in real time.
7

Why is handling these applications challenging?
Data size calls for a very large cluster environment, but:
1. More cluster nodesmore faults are likely 2. More cluster nodesslow nodes are likely
Quick recovery is important because we do not want to lose useful data
Part of click logs for advertising
Twitter accounts that may be affected by malicious campaign
We need a new streaming processing model
8

Discretized streams (D-Streams)
Run a streaming computation as a series of very small, deterministic batch jobs
live data stream
How
 Cut the stream into batches of X seconds
 Treat each batch as RDD and process it using RDD operations
 Finally, return the processed results of the RDD operations in batches
processed results
Spark Streaming
batches of X seconds
Spark
9

Discretized streams (D-Streams)
Run a streaming computation as a series of very small, deterministic batch jobs
Benefits
Low latencybatch size X as low as 0.5 seconds
RDDs can store batch datawe can combine batch processing and streaming processing
(e.g., find spam URLs from a static list in a twitter stream)
live data stream
Spark Streaming
batches of X seconds
Spark
processed results
10

Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
DStream: a sequence of RDDs representing a stream of data
Twitter Streaming API
tweets DStream
batch @ t batch @ t+1 batch @ t+2
stored in memory as an RDD (immutable, distributed)

Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
new DStream
transformation: modify data in one DStream to create another Dstream, hashTags, containing Tags
tweets DStream
hashTags Dstream [#cat, #dog, … ]
batch @ t
flatMap
batch @ t+1
flatMap
batch @ t+2
flatMap
new RDDs created for every batch

Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
hashTags.saveAsHadoopFiles(“hdfs://…”)
output operation: to push data to external storage (here HDFS)
tweets DStream hashTags DStream
batch @ t
flatMap
save
batch @ t+1
flatMap
save
batch @ t+2
flatMap
save
every batch saved to HDFS

Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
hashTags.foreach(hashTagRDD => { … })
foreach: do whatever you want with the processed data
tweets DStream hashTags DStream
batch @ t
flatMap
foreach
batch @ t+1
flatMap
foreach
batch @ t+2
flatMap
foreach
Write to a database, update analytics UI, do whatever you want

Python example 1
 Counts words in new text files created in the given HDFS directory
Main entry point for Spark Streaming functionality
the time interval in millisecs at which streaming data will be divided into batches
Start the execution of the streams Wait for the execution to stop

Python example 1
 Counts words in new text files created in the given HDFS directory
Run in one terminal (leave it running):
spark-submit hdfs_wordcount.py > stream_out.txt
Open another terminal and execute: hadoop fs –mkdir test_stream
hadoop fs –put localfile.txt test_stream/
Go to the first terminal:
Press Ctrl+C to terminate the spark sprit Then, execute in the first terminal:
nano stream_out.txt
—————————————————- Time: 2017-03-23 14:28:21 —————————————————- (u’Jan’, 1)
(u’this’, 2)

Python example 2
 Stream can come from TCP
 nc –lk 9999 in a terminal and type text in it after python script starts
 Spark python script that processes the stream

Overview of operations
 Transformations – modify data from one DStream to another  Standard RDD operations – map, countByValue, count union …
 Stateful operations – window, countByValueAndWindow, …

Window-based queries
Count the hashtags over last 1 min
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
Returns a new DStream which is computed based on windowed batches of hashTags.
window length sliding interval
window length
DStream of data
sliding interval

Window-based queries
Count the hashtags over last 1 min
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
Returns tagCounts where the value of each key is its frequency
[(#cat, 5), (#dog, 6), (#mouse,11), ….
window length
DStream of data
sliding interval

Overview of operations
 Output Operations – send data to external entity
Not supported in Spark 1.6.0 (Cloudera VM)
Sends each partition of the resultant RDD to a remote connection (e.g., TCP location)

UpdateStateByKey
 Running count of each word seen in a text data stream  You need to maintain a state and then update it, based on
 the previous state:  the new values:
 updated state:
(“cat”,100), (“dog”,80), …
(“cat”,1), (“dog”,1), (“rabbit”, 1) … (“cat”, 101), (“dog”, 81), (“rabbit”, 1)
 You need a function to update the state of a word
Sequence of 1s from (word,1) key/value pairs of the stream
Previous count of a word
def updateFunction(newValues, runningCount): if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
2nd arg added to the 1st

UpdateStateByKey
 Running count of each word seen in a text data stream
Sequence of 1s from (word,1) key/value pairs of the stream
Previous count
def updateFunction(newValues, runningCount): if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
 You need to update the state of the stream, based on the updateFunction
runningCounts = pairs.updateStateByKey(updateFunction)
A stream of Updates the state of each word (word, 1) key/value pairs

UpdateStateByKey
 Generally
Your own function
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not.
If the update function returns None then the key-value pair will be eliminated.

Combine batch and stream processing
 Filtering out spam words from streams
 You need an RDD containing spam words
 You need a stream with all words
When called on two DStreams of (K, V) and (K, W) pairs, join
returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
wordCounts: (the, 1), (dog, 5), (buy, 1), (drug, 1) spamInfoRDD: (buy, 1), (drug, 1)
after join: (buy, (1,1)), (drug, (1,1)) filter: filter(lambda x: isTooOften(x))

Combine batch and stream processing
 Combining batch and stream processing is useful  Online machine learning
 Predict {True, False}, based on stream elements  Stream: {product1, product2, product1, ….}
 Prediction function F(Stream_window)
 Continuously learn and update data models (updateStateByKey and transform)
 Combine live data streams with historical data
 Generate historical data models with Spark, etc.
 Use data models to process live data stream (transform)

Combine batch and stream processing
 Combining batch and stream processing is useful  Complex Event Processing (CEP) processing
 CEP matches continuously incoming events against a pattern  Stream: {room_visit, sanitize, room_visit, … }
 Pattern: {room_visit, room_visit}
 If pattern appears in Stream Then doctor did not sanitize hands, alert!
 window-based operations (reduceByWindow, etc.)

Fault tolerance
 Recall the fault tolerance semantics of RDD
1. Each RDD remembers the lineage of operations that were used on a fault-tolerant
input dataset (e.g., HDFS) to create it.
2. RDD partition lost due to a worker node failure 
That partition can be re-computed using the lineage of operations.
3. The data in the final transformed RDD will always be the same irrespective of failures in the Spark cluster.
 But in Spark Streaming, the received data may come from the network!

Fault tolerance
 Solution
 Buffer and replicate data among multiple Spark
executors (by default, 2) in worker nodes in the cluster The replication is done in batches
 Data replicated & worker fails
 Get data from the other working node(s)
 Data buffered but NOT replicated
 Get data from the data source, if possible

Fault tolerance
 Fault tolerance depends on the scenario and data source
 Scenario:
 Worker node fails: All in-memory data are lost. If any receivers
were running on failed nodes, then their buffered data will be lost.
 Driver node fails: The SparkContext is lost. All executors with their in-memory data are lost.
 Checkpointing to address this (later)
 Data source:
 For text file sources, the system must be fault tolerant (e.g., HDFS)  For receiver-based sources, the receiver must be able to
acknowledge data replication

Discretized streams in more detail
DStream: A sequence of RDDs representing a stream of data DStream needs to define how to generate an RDD in each
batch interval. So, it keeps:
 Dependencies: List of dependent (parent) DStreams
 Slide Interval, the interval at which it will compute RDDs  Function to compute RDD at a time t

Discretized streams in more detail
Example: Mapped DStream
Dependencies: Single parent DStream
Slide Interval: Same as the parent DStream
Compute function for time t: Create new RDD by applying map function on parent DStream’s RDD of time t
override def compute(time: Time): Option[RDD[U]] = {
parent.getOrCompute(time).map(_.map[U](mapFunc)) }
Gets RDD of time t if already computed Map function applied to once, or generates it generate new RDD

Discretized streams in more detail
From Dstreams into Spark jobs: Step 1 (program  Dstream Graph)
Spark Streaming program
t = ssc.twitterStream(“…”) .map(…)
DStream Graph
t.foreach(…)
t1 = ssc.twitterStream(“…”) t2 = ssc.twitterStream(“…”)
t = t1.union(t2).map(…)
t.saveAsHadoopFiles(…) t.map(…).foreach(…) t.filter(…).foreach(…)
T
M
F E
Twitter Input DStream Mapped DStream Foreach DStream
Dummy DStream signifying an output operation
T
T
U
M
FMF E
FF EE

Discretized streams in more detail
From Dstreams into Spark jobs:
Step 2 (Dstream graphRDD graphSpark job)
 Every interval, RDD graph is computed from DStream graph
 For each output operation, a Spark action is created
 For each action, a Spark job is created to compute it DStream Graph Block RDDs with RDD Graph
data received in
T T last batch interval B B
UU
MM
FMF AMF E
FF AA EE
Spark actions

System model
Spark Streaming Client
Your program
Spark Client
Cluster Manager
Spark Context DStream graph Network Input Tracker
ssc = new StreamingContext t = ssc.twitterStream(“…”) t.filter(…).foreach(…)
RDD graph Block manager
Job Scheduler
Network Input Tracker – Keeps track of the data received by each network receiver and maps them to the corresponding input DStreams
Job Scheduler – Periodically queries the DStream graph to generate Spark jobs from received data, and hands them to Job Manager for execution
Job Manager – Maintains a job queue and executes the jobs in Spark
Spark Worker
Block manager
Job Manager

Execution Model – Receiving Data
Spark Streaming + Spark Driver
StreamingContext.start()
Spark Workers
Data recvd Blocks pushed
Blocks replicated
Receiver
Network Input Tracker
Block Manager Master
“Write-once” key-value stores, can replicate data across nodes, or store to disk
Block Manager
Block Manager

Execution Model – Job Scheduling
Spark Streaming + Spark Driver
Spark Workers
Receiver
DStream Graph
RDDs
Queries the DStream graph to generate jobs. Hands jobs to Job Manager for execution
Keeps track of the data Maps them to Dstreams
Block IDs
Job Scheduler
Network Input Tracker
Block Manager
Jobs executed on worker nodes
Jobs
Job Manager
Spark’s Schedulers
Maintains a job queue and executes the jobs in Spark
Block Manager
Job Queue

DStream Persistence
 If a DStream is set to persist at a storage level, then all RDDs generated by it set to the same storage level
 When to persist?
 If there are multiple transformations / actions on a DStream
 If RDDs in a DStream is going to be used multiple times
 Window-based DStreams are automatically persisted in memory

DStream Persistence
 Default storage level of DStreams is StorageLevel.MEMORY_ONLY_SER (i.e. in memory as serialized bytes)
 Except for input DStreams which have StorageLevel.MEMORY_AND_DISK_SER_2
 Note the difference from RDD’s default level (no serialization)
 Serialization reduces random pauses due to GC providing more consistent job processing times
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER_2
True
True
False
2

Checkpointing
 A streaming application must operate 24/7 even in case of system failures.
 To allow recovery from failures, Spark Streaming checkpoints:
 Metadata (configuration, Dstream operations, incomplete
jobs) to recover from driver’s failures
 Data: Saves the generated RDDs to reliable storage.
 Let’s examine Data Checkpointing in detail

Data checkpointing
Saving RDD to HDFS to prevent RDD graph from growing too large
 Done internally in Spark transparent to the user program
 Done lazily, RDD is saved to HDFS the first time it is computed
red_rdd.checkpoint()
HDFS file
Contents of red_rdd saved to a HDFS file
transparent to all child RDDs

Data checkpointing
Why is RDD checkpointing necessary? Stateful DStream operators can have infinite
lineages
states
Large lineages lead to …
 Large closure of the RDD objectlarge task sizeshigh task launch times
 High recovery times under failure
t-1 t t+1 t+2 t+3 data

Data checkpointing
Stateful DStream operators can have infinite lineages
t+2 t+3
HDF SS
Periodic RDD checkpointing solves this Useful for iterative Spark programs as well
t-1 t t+1
data states
HDF

Data checkpointing
 Periodicity of checkpoint determines a tradeoff
 Checkpoint too frequent:
 HDFS writing will slow things down
 Checkpoint too infrequent:
 Task launch times may increase
 Default setting checkpoints at most once in 10 seconds
 Try to checkpoint once in about 10 batches

Performance
Can process 60M records/sec (6 GB/sec) on 100 nodes at sub-second latency
Grep was network-bound and WC ran on EC2 and performed a did pattern matching on strings sliding window count ever 30 secs
7 6 5 4 3 2 1 0
Grep
3.5 3 2.5 2 1.5 1 0.5 0
WordCount
1 sec 2 sec
1 sec 2 sec
0 50 100
# Nodes in Cluster
0 50 100
# Nodes in Cluster
Cluster Thhroughput (GB/s)
Cluster Throughput (GB/s)

Comparison with other systems
Higher throughput than Storm
 Spark Streaming: 670k records/sec/node
 Storm: 115k records/sec/node
 Commercial systems: 100-500k records/sec/node
60 40 20
Spark Storm
Grep
WordCount
30 20 10
0
100 1000
Spark Storm
0
100 1000
Record Size (bytes)
Record Size (bytes)
Throughput per node (MB/s)
Throughput per node (MB/s)

Apache Storm
 Core concepts:
 tuple: a named list of values;
 stream: a (possibly) unbounded sequence of tuples processed by the application.
 Application defined by means of topology
Spouts: Stream sources. They read tuples from external sources
(e.g. Twitter API) or from disk and emit them in the topology;
Bolts: Process input streams and produce output streams. They encapsulate the application logic.
http://storm.apache.org/releases/current/Tutorial.html http://storm.apache.org/about/simple-api.html

Comparison with other systems
Higher throughput than Storm
 Spark Streaming: 670k records/sec/node
 Storm: 115k records/sec/node
60 40 20
Spark Storm
Grep
WordCount
30 20 10
0
100 1000
Spark Storm
0
100 1000
Record Size (bytes)
Record Size (bytes)
Throughput per node (MB/s)
Throughput per node (MB/s)

Fast Fault Recovery
Recovers from faults/stragglers within 1 sec due to
 Checkpointing
 Speculative backup copies of slow nodes (>=1.4x slower than the median task in its job) tasks stage)

Mobile Millennium Project
Traffic transit time estimation using online machine learning on GPS observations
Very CPU intensive, requires dozens of machines for useful computation
Scales linearly with cluster size
2000
1600
1200
800 400 0
0 20 40 60 80
# Nodes in Cluster
GPS observations per sec