7CCSMBDT – Big Data Technologies Week 10
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1
Overview
What is Spark streaming Overview
DStream Abstraction
System Model
Persistence / Caching RDD Checkpointing
Reading:
* Zaharia et al. Discretized streams: fault-tolerant streaming computation at scale.
http://dl.acm.org/citation.cfm?doid=2517349.2522737
* Overview and API
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html https://stanford.edu/~rezab/sparkclass/slides/td_streaming.pdf
2
What is it ? (1/3)
Spark SQL
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
3
What is it ? (2/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
Scales to hundreds of nodes
Achieves second-scale latencies
Efficiently recovers from failures
Integrates with batch and interactive processing
4
What is it ? (3/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html 5
Advantages of a unified stack
Explore data interactively (in shell) to identify problems
Use same code that you used in shell also in Spark for processing large logs
Use similar code in Spark Streaming for realtime processing
$ ./spark-shell
scala> val file = sc.hadoopFile(“smallLogs”)
…
scala> val filtered = file.filter(_.contains(“ERROR”)) …
scala> val mapped = filtered.map(…)
…
object ProcessProductionData {
def main(args: Array[String]) {
val sc = new SparkContext(…)
val file = sc.hadoopFile(“productionLogs”)
val filtered = file.filter(_.contains(“ERROR”)) val mapped = filtered.map(…)
…
} }
object ProcessLiveStream {
def main(args: Array[String]) {
val sc = new StreamingContext(…)
val stream = sc.kafkaStream(…)
val filtered = stream.filter(_.contains(“ERROR”)) val mapped = filtered.map(…)
…
} }
Why we need it?
1. Twitter campaigns that promote computer viruses
“Go to http://… to see the replay during football match”
Twitter needs to discover such malicious campaigns in real time
2. Datacenter receives streams of logs from many clusters, e.g. Flume. Wants to detect problems
3. Facebook adverts https://www.facebook.com/business/products/ads is an interface to advertisers.
Advertisers want to understand who clicks on their pages in real time.
7
Why is handling these applications challenging?
Data size calls for a very large cluster environment, but:
1. More cluster nodesmore faults are likely 2. More cluster nodesslow nodes are likely
Quick recovery is important because we do not want to lose useful data
Part of click logs for advertising
Twitter accounts that may be affected by malicious campaign
We need a new streaming processing model
8
Discretized streams (D-Streams)
Run a streaming computation as a series of very small, deterministic batch jobs
live data stream
How
Cut the stream into batches of X seconds
Treat each batch as RDD and process it using RDD operations
Finally, return the processed results of the RDD operations in batches
processed results
Spark Streaming
batches of X seconds
Spark
9
Discretized streams (D-Streams)
Run a streaming computation as a series of very small, deterministic batch jobs
Benefits
Low latencybatch size X as low as 0.5 seconds
RDDs can store batch datawe can combine batch processing and streaming processing
(e.g., find spam URLs from a static list in a twitter stream)
live data stream
Spark Streaming
batches of X seconds
Spark
processed results
10
Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
DStream: a sequence of RDDs representing a stream of data
Twitter Streaming API
tweets DStream
batch @ t batch @ t+1 batch @ t+2
stored in memory as an RDD (immutable, distributed)
Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
new DStream
transformation: modify data in one DStream to create another Dstream, hashTags, containing Tags
tweets DStream
hashTags Dstream [#cat, #dog, … ]
batch @ t
flatMap
batch @ t+1
flatMap
batch @ t+2
flatMap
new RDDs created for every batch
…
Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
hashTags.saveAsHadoopFiles(“hdfs://…”)
output operation: to push data to external storage (here HDFS)
tweets DStream hashTags DStream
batch @ t
flatMap
save
batch @ t+1
flatMap
save
batch @ t+2
flatMap
save
every batch saved to HDFS
Example – Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
hashTags.foreach(hashTagRDD => { … })
foreach: do whatever you want with the processed data
tweets DStream hashTags DStream
batch @ t
flatMap
foreach
batch @ t+1
flatMap
foreach
batch @ t+2
flatMap
foreach
Write to a database, update analytics UI, do whatever you want
Python example 1
Counts words in new text files created in the given HDFS directory
Main entry point for Spark Streaming functionality
the time interval in millisecs at which streaming data will be divided into batches
Start the execution of the streams Wait for the execution to stop
Python example 1
Counts words in new text files created in the given HDFS directory
Run in one terminal (leave it running):
spark-submit hdfs_wordcount.py
Open another terminal and execute: hadoop fs –mkdir test_stream
hadoop fs –put localfile.txt test_stream/
Go to the first terminal:
Press Ctrl+C to terminate the spark sprit Then, execute in the first terminal:
nano stream_out.txt
—————————————————- Time: 2017-03-23 14:28:21 —————————————————- (u’Jan’, 1)
(u’this’, 2)
Python example 2
Stream can come from TCP
nc –lk 9999 in a terminal and type text in it after python script starts
Spark python script that processes the stream
Overview of operations
Transformations – modify data from one DStream to another Standard RDD operations – map, countByValue, count union …
Stateful operations – window, countByValueAndWindow, …
Window-based queries
Count the hashtags over last 1 min
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
Returns a new DStream which is computed based on windowed batches of hashTags.
window length sliding interval
window length
DStream of data
sliding interval
Window-based queries
Count the hashtags over last 1 min
val tweets = ssc.twitterStream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
Returns tagCounts where the value of each key is its frequency
[(#cat, 5), (#dog, 6), (#mouse,11), ….
window length
DStream of data
sliding interval
Overview of operations
Output Operations – send data to external entity
Not supported in Spark 1.6.0 (Cloudera VM)
Sends each partition of the resultant RDD to a remote connection (e.g., TCP location)
UpdateStateByKey
Running count of each word seen in a text data stream You need to maintain a state and then update it, based on
the previous state: the new values:
updated state:
(“cat”,100), (“dog”,80), …
(“cat”,1), (“dog”,1), (“rabbit”, 1) … (“cat”, 101), (“dog”, 81), (“rabbit”, 1)
You need a function to update the state of a word
Sequence of 1s from (word,1) key/value pairs of the stream
Previous count of a word
def updateFunction(newValues, runningCount): if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
2nd arg added to the 1st
UpdateStateByKey
Running count of each word seen in a text data stream
Sequence of 1s from (word,1) key/value pairs of the stream
Previous count
def updateFunction(newValues, runningCount): if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
You need to update the state of the stream, based on the updateFunction
runningCounts = pairs.updateStateByKey(updateFunction)
A stream of Updates the state of each word (word, 1) key/value pairs
UpdateStateByKey
Generally
Your own function
In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not.
If the update function returns None then the key-value pair will be eliminated.
Combine batch and stream processing
Filtering out spam words from streams
You need an RDD containing spam words
You need a stream with all words
When called on two DStreams of (K, V) and (K, W) pairs, join
returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
wordCounts: (the, 1), (dog, 5), (buy, 1), (drug, 1) spamInfoRDD: (buy, 1), (drug, 1)
after join: (buy, (1,1)), (drug, (1,1)) filter: filter(lambda x: isTooOften(x))
Combine batch and stream processing
Combining batch and stream processing is useful Online machine learning
Predict {True, False}, based on stream elements Stream: {product1, product2, product1, ….}
Prediction function F(Stream_window)
Continuously learn and update data models (updateStateByKey and transform)
Combine live data streams with historical data
Generate historical data models with Spark, etc.
Use data models to process live data stream (transform)
Combine batch and stream processing
Combining batch and stream processing is useful Complex Event Processing (CEP) processing
CEP matches continuously incoming events against a pattern Stream: {room_visit, sanitize, room_visit, … }
Pattern: {room_visit, room_visit}
If pattern appears in Stream Then doctor did not sanitize hands, alert!
window-based operations (reduceByWindow, etc.)
Fault tolerance
Recall the fault tolerance semantics of RDD
1. Each RDD remembers the lineage of operations that were used on a fault-tolerant
input dataset (e.g., HDFS) to create it.
2. RDD partition lost due to a worker node failure
That partition can be re-computed using the lineage of operations.
3. The data in the final transformed RDD will always be the same irrespective of failures in the Spark cluster.
But in Spark Streaming, the received data may come from the network!
Fault tolerance
Solution
Buffer and replicate data among multiple Spark
executors (by default, 2) in worker nodes in the cluster The replication is done in batches
Data replicated & worker fails
Get data from the other working node(s)
Data buffered but NOT replicated
Get data from the data source, if possible
Fault tolerance
Fault tolerance depends on the scenario and data source
Scenario:
Worker node fails: All in-memory data are lost. If any receivers
were running on failed nodes, then their buffered data will be lost.
Driver node fails: The SparkContext is lost. All executors with their in-memory data are lost.
Checkpointing to address this (later)
Data source:
For text file sources, the system must be fault tolerant (e.g., HDFS) For receiver-based sources, the receiver must be able to
acknowledge data replication
Discretized streams in more detail
DStream: A sequence of RDDs representing a stream of data DStream needs to define how to generate an RDD in each
batch interval. So, it keeps:
Dependencies: List of dependent (parent) DStreams
Slide Interval, the interval at which it will compute RDDs Function to compute RDD at a time t
Discretized streams in more detail
Example: Mapped DStream
Dependencies: Single parent DStream
Slide Interval: Same as the parent DStream
Compute function for time t: Create new RDD by applying map function on parent DStream’s RDD of time t
override def compute(time: Time): Option[RDD[U]] = {
parent.getOrCompute(time).map(_.map[U](mapFunc)) }
Gets RDD of time t if already computed Map function applied to once, or generates it generate new RDD
Discretized streams in more detail
From Dstreams into Spark jobs: Step 1 (program Dstream Graph)
Spark Streaming program
t = ssc.twitterStream(“…”) .map(…)
DStream Graph
t.foreach(…)
t1 = ssc.twitterStream(“…”) t2 = ssc.twitterStream(“…”)
t = t1.union(t2).map(…)
t.saveAsHadoopFiles(…) t.map(…).foreach(…) t.filter(…).foreach(…)
T
M
F E
Twitter Input DStream Mapped DStream Foreach DStream
Dummy DStream signifying an output operation
T
T
U
M
FMF E
FF EE
Discretized streams in more detail
From Dstreams into Spark jobs:
Step 2 (Dstream graphRDD graphSpark job)
Every interval, RDD graph is computed from DStream graph
For each output operation, a Spark action is created
For each action, a Spark job is created to compute it DStream Graph Block RDDs with RDD Graph
data received in
T T last batch interval B B
UU
MM
FMF AMF E
FF AA EE
Spark actions
System model
Spark Streaming Client
Your program
Spark Client
Cluster Manager
Spark Context DStream graph Network Input Tracker
ssc = new StreamingContext t = ssc.twitterStream(“…”) t.filter(…).foreach(…)
RDD graph Block manager
Job Scheduler
Network Input Tracker – Keeps track of the data received by each network receiver and maps them to the corresponding input DStreams
Job Scheduler – Periodically queries the DStream graph to generate Spark jobs from received data, and hands them to Job Manager for execution
Job Manager – Maintains a job queue and executes the jobs in Spark
Spark Worker
Block manager
Job Manager
Execution Model – Receiving Data
Spark Streaming + Spark Driver
StreamingContext.start()
Spark Workers
Data recvd Blocks pushed
Blocks replicated
Receiver
Network Input Tracker
Block Manager Master
“Write-once” key-value stores, can replicate data across nodes, or store to disk
Block Manager
Block Manager
Execution Model – Job Scheduling
Spark Streaming + Spark Driver
Spark Workers
Receiver
DStream Graph
RDDs
Queries the DStream graph to generate jobs. Hands jobs to Job Manager for execution
Keeps track of the data Maps them to Dstreams
Block IDs
Job Scheduler
Network Input Tracker
Block Manager
Jobs executed on worker nodes
Jobs
Job Manager
Spark’s Schedulers
Maintains a job queue and executes the jobs in Spark
Block Manager
Job Queue
DStream Persistence
If a DStream is set to persist at a storage level, then all RDDs generated by it set to the same storage level
When to persist?
If there are multiple transformations / actions on a DStream
If RDDs in a DStream is going to be used multiple times
Window-based DStreams are automatically persisted in memory
DStream Persistence
Default storage level of DStreams is StorageLevel.MEMORY_ONLY_SER (i.e. in memory as serialized bytes)
Except for input DStreams which have StorageLevel.MEMORY_AND_DISK_SER_2
Note the difference from RDD’s default level (no serialization)
Serialization reduces random pauses due to GC providing more consistent job processing times
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER_2
True
True
False
2
Checkpointing
A streaming application must operate 24/7 even in case of system failures.
To allow recovery from failures, Spark Streaming checkpoints:
Metadata (configuration, Dstream operations, incomplete
jobs) to recover from driver’s failures
Data: Saves the generated RDDs to reliable storage.
Let’s examine Data Checkpointing in detail
Data checkpointing
Saving RDD to HDFS to prevent RDD graph from growing too large
Done internally in Spark transparent to the user program
Done lazily, RDD is saved to HDFS the first time it is computed
red_rdd.checkpoint()
HDFS file
Contents of red_rdd saved to a HDFS file
transparent to all child RDDs
Data checkpointing
Why is RDD checkpointing necessary? Stateful DStream operators can have infinite
lineages
states
Large lineages lead to …
Large closure of the RDD objectlarge task sizeshigh task launch times
High recovery times under failure
t-1 t t+1 t+2 t+3 data
Data checkpointing
Stateful DStream operators can have infinite lineages
t+2 t+3
HDF SS
Periodic RDD checkpointing solves this Useful for iterative Spark programs as well
t-1 t t+1
data states
HDF
Data checkpointing
Periodicity of checkpoint determines a tradeoff
Checkpoint too frequent:
HDFS writing will slow things down
Checkpoint too infrequent:
Task launch times may increase
Default setting checkpoints at most once in 10 seconds
Try to checkpoint once in about 10 batches
Performance
Can process 60M records/sec (6 GB/sec) on 100 nodes at sub-second latency
Grep was network-bound and WC ran on EC2 and performed a did pattern matching on strings sliding window count ever 30 secs
7 6 5 4 3 2 1 0
Grep
3.5 3 2.5 2 1.5 1 0.5 0
WordCount
1 sec 2 sec
1 sec 2 sec
0 50 100
# Nodes in Cluster
0 50 100
# Nodes in Cluster
Cluster Thhroughput (GB/s)
Cluster Throughput (GB/s)
Comparison with other systems
Higher throughput than Storm
Spark Streaming: 670k records/sec/node
Storm: 115k records/sec/node
Commercial systems: 100-500k records/sec/node
60 40 20
Spark Storm
Grep
WordCount
30 20 10
0
100 1000
Spark Storm
0
100 1000
Record Size (bytes)
Record Size (bytes)
Throughput per node (MB/s)
Throughput per node (MB/s)
Apache Storm
Core concepts:
tuple: a named list of values;
stream: a (possibly) unbounded sequence of tuples processed by the application.
Application defined by means of topology
Spouts: Stream sources. They read tuples from external sources
(e.g. Twitter API) or from disk and emit them in the topology;
Bolts: Process input streams and produce output streams. They encapsulate the application logic.
http://storm.apache.org/releases/current/Tutorial.html http://storm.apache.org/about/simple-api.html
Comparison with other systems
Higher throughput than Storm
Spark Streaming: 670k records/sec/node
Storm: 115k records/sec/node
60 40 20
Spark Storm
Grep
WordCount
30 20 10
0
100 1000
Spark Storm
0
100 1000
Record Size (bytes)
Record Size (bytes)
Throughput per node (MB/s)
Throughput per node (MB/s)
Fast Fault Recovery
Recovers from faults/stragglers within 1 sec due to
Checkpointing
Speculative backup copies of slow nodes (>=1.4x slower than the median task in its job) tasks stage)
Mobile Millennium Project
Traffic transit time estimation using online machine learning on GPS observations
Very CPU intensive, requires dozens of machines for useful computation
Scales linearly with cluster size
2000
1600
1200
800 400 0
0 20 40 60 80
# Nodes in Cluster
GPS observations per sec