7CCSMBDT – Big Data Technologies Week 8
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1
Objectives
Data streams
What are they?
What are their properties?
How do we manage them?
Common queries (problems) on streams Sampling
Window queries
Reading
Chapter 4 Leskovec’s book
Tutorial: mining massive data streams http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
2
Motivation
So far, all our data was available when we wanted it
BUT
we may not know the entire data set in advance
data may have high velocity, so that we cannot store
all data and then access them
In domains such as
web click-stream data
computer network monitoring data telecommunication connection data readings from sensor networks
stock quotes
3
Data stream definition
Data stream
An ordered and potentially infinite sequence of
elements
An element (a.k.a data point) can be:
event “page A visited” in web click-stream data
stock price “10.3” in stock quote data
customer record “[item, qty, price]” in marketing data graph in social network data streams
…
4
Data stream example 1
Network of Bluetooth sensor tags each contains various sensors and outputs a stream of JSON data
“2016-01-26T20:47:53”
“2016-01-26T20:47:55”
“2016-01-26T20:47:57”
http://processors.wiki.ti.com/index.php/SensorTag_User_Guide#SensorTag_Hardware_Overview
5
Monitor Avg(temp) every 30 secs and display SensorName, Avg(temp) if Avg(temp)>100 degrees
Data stream example 2
HTTP server log
Report number of distinct accessed page every hour Report the most frequently accessed pages every hour
Report average view time of a page every hour
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
6
Properties of data streams
1. Potentially infinite
• Transient (stream might not be realized on disk)
• Single pass over the data
• Only summaries can be stored
• Real-time processing (in main memory)
2. Data streams are not static
• Incremental updates • Concept drift
• Forgetting old data
3. Temporal order may be important
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
7
Data Stream Management System (DSMS)
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Streams entering the DSMS
Processor
Often have different velocity (arrival rate) and data types
8
Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
Archives streams
Not for querying due to
very low efficiency of data retrieval
9
Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
* Storage space in disk or in main memory
* Used for answering queries
* Cannot store all data
Limited Working Storage
10
Data stream management system
permanently executing and produce outputs at appropriate times
Standing Queries
Example of standing query
“output an alert, when the variable TEMPERATURE exceeds 25”
Archival Storage
Processor
Limited Working Storage
11
Data stream management system
Example of standing query
“list the total number of transactions
in the last hour”
Ad-Hoc Queries
Standing Queries
Processor
Output
Query answers
Archival Storage
Limited Working Storage
12
asked once about the current state of a stream or streams
Common queries
Sampling data from a stream
Construct a random sample (i.e., a random subset
of the stream)
Queries over sliding windows
Number of items of type x in the last k elements of the stream
13
Common queries
Filtering a data stream
Select elements with property x from the stream
Counting distinct elements
Number of distinct elements in the last k elements
of the stream
Estimating frequency moments
Estimate avg./std.dev. of last k elements
Finding frequent elements
14
Common applications of these queries
Mining query streams
Google wants to know what queries are
more frequent today than yesterday
Mining click streams
Yahoo wants to know which of its pages are
getting an unusual number of hits in the past hour
Mining social network news feeds
E.g., look for trending topics on Twitter, Facebook
15
Sampling from data stream
Construct a sample from a data stream such that we can ask queries on the sample and get answers that are statistically representative of the stream as a whole
Example application
Stream from search query engine
(u1, q1, t1), (u1, q1, t2), (u2, q2, t3),…, user query time
What fraction of the typical user’s queries were repeated over the last month?
Cannot store more than 1/10th of the stream
16
Sampling from data stream
Naïve solution for the example application Store 1/10 of each user’s queries:
Generate a random integer in [0..9] for each user’s query
Store the tuple (u,q,t) if the integer is 0, otherwise discard
For users with many queries, 1/10th of their queries will be in the sample
————
Is the naïve solution good for answering:
What is the fraction of duplicate queries in the stream by an average user?
Stream=[q1,q1,q2,q2,q3] x=1,d=2, fraction of duplicates is 2/3
Suppose each user issues x queries once and d queries twice. Queries issued twice will be “duplicate queries”.
Correct answer: 𝒅 𝒙+𝒅
17
Sampling from data stream
Same query (fraction of duplicate queries), based on sample?
Out of x singleton queries, x/10 will appear in the sample
This assumes that I have many singletons and happens because I sample
1/10 of the elements (each singleton is an element)
Duplicate query twice in the sample with probability 1/100
Pr(1st occurrence in sample)*Pr(2nd occurrence in sample)=1/10*1/10=1/100
This is because when I have a duplicate query q1q1, to include
it in the sample, I need to include both q1s and each q1 is sampled with probability 1/10.
Out of d duplicate queries, 𝑑 ∗ 1 will appear twice in the sample 100
18
Sampling from data stream
Fraction of duplicate queries, based on sample?
Duplicate query once in the sample with probability 18/100 Pr(1st occ. in sample)*Pr(2nd occ. not in sample) +
Pr(2nd occurrence in sample)*Pr(1st occurrence not in sample)= =1/10*(1-1/10) + 1/10*(1-1/10) =1/10*9/10+1/10*9/10=18/100
This is because when I have a duplicate query q1q1 I can miss it in the sample by: (I) not sampling the second q1 (red arrow), OR (II) by not sampling the first q1 (yellow arrow)
Out of d duplicate queries, 𝑑 ∗ 18 will appear once in the sample 100
Fraction of duplication queries based on sample
𝑑∗ 1 100
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
=
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
Fraction of duplicate queries based on entire data stream
𝒅 𝒙+𝒅
19
Sampling from data stream
How bad is the sample-based answer?
The sample-based answer is
𝑑∗ 1
100 =
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
The answer based on the entire stream is 𝒅
• For x=1000, d=300 𝒙+𝒅
• Sample-based answer 0.019 = 1.9%
• Answer based on entire stream 0.231 = 23.1%. 12.1 times larger
• For x=1000, d=50
• Sample based answer 0.0046= 0.46%
• Answer based on entire stream 0.0476= 4.76%. 10.4 times larger
WHAT WENT SO WRONG?
20
Sampling from data stream
Each user issues x queries once and d queries twice
What is the fraction of duplicate queries in the stream by an average user?
Main idea: We want avg user, and we sampled queries
Solution
Select 1/10th of users (assuming this is <=10% of the stream) For each such user, store all their queries in the sample
To get 1/10th of users
use a hash function h: user{1,2,...,10}
If h(user)=1, we accept their query, otherwise we discard it
Then, count the duplicates in the sample: d/(x+d), for each user
21
Sampling from data stream
General problem
Stream of tuples with keys:
Key is some subset of each tuple’s components e.g., tuple is (user, search, time); key is user
Choice of key depends on application
To get a sample of a/b fraction of the stream:
Hash each tuple’s key uniformly into b buckets
Pick the tuple if its hash value is in {1,..., a}
* How to generate a 30% sample?
* What if more and more new users come, and we have a budget on the
number of tuples that we can store?
22
Sampling from data stream
General problem
To get a (uniform) sample of s tuples from a stream
(e1,e2,...) that contains >s tuples
Reservoir sampling
Add the first s tuples from the stream into a reservoir R
For j>s
With probability 𝑠 replace a random entry of R with the j-th tuple 𝑗
of the stream (i.e., newcomer replaces random old one)
At j=n, return R
Does this work?
We need to prove that after seeing n tuples, R contains each tuple
seen so far with probability s/n
23
Sampling from data stream
Proof by induction for s=1 (exercise for given s>=1)
For j=1, the tuple is in the sample with probability 1 = 1 = 1
𝑗1
Assume that for j=n, each tuple is in the sample with prob. 1 𝑛
We need to show that for j=n+1, each tuple is in the sample with
probability 1 𝑛+1
24
Sampling from data stream
Proof by induction for s=1 (exercise for given s>=1) For j=n+1
The tuple j=n+1 is selected with probability 1
𝑛+1
(the for in the algorithm does this)
If this tuple is selected, the old tuple in R is replaced with prob. 1
Thus, the prob. that an old tuple is replaced is 1 and not replaced with prob. 1 − 1 𝑛+1
𝑛+1
So, R contains an old tuple either because it was added into R and not replaced by j, which happens with prob.
1∗1−1=1 𝑛 𝑛+1 𝑛+1
Probability in the sample by the induction for j=n
Probability NOT replaced
or because it was added in the latest round: prob 1 QED 𝑛+1
25
Queries over a (long) sliding window
Many queries are about a window containing the N most recent elements.
Window is long: N is so large that the data cannot be stored in memory, or even on disk
Amazon example:
For every product X we keep 0/1 stream of whether that product was sold in the n-th transaction
We want to answer queries, how many times have we sold X in the last k transactions
26
Queries over a (long) sliding window
Sliding window on a single stream: N = 6 0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 Past Future
27
Queries over a (long) sliding window
Problem:
Given a stream of 0s and 1s
How many 1s are in the last k bits? where k ≤ N
Obvious solution:
Store the most recent N bits
When new bit comes in, discard the N+1st bit
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 Past Future
28
Queries over a (long) sliding window
What if we cannot afford to store N bits? E.g., N = 1 billion
We cannot get an exact answer (the bits we do not store may affect the result)
But we can still get an approximate answer
29
Queries over a (long) sliding window
Naïve method
Maintain 2 counters:
S: number of 1s from the beginning of the stream Z: number of 0s from the beginning of the stream
How many 1s are in the last N bits? 𝑵 ∙ 𝑺 𝑺+𝒁
0001110 00111
N=6, S=6, Z=6, so answer 𝑵 ∙ 𝑺 = 𝟔 ⋅ 𝟔 = 𝟑 is the same
𝑺+𝒁 𝟔+𝟔
as that if we stored all N last bits
0 0 0 0 0 0 0 0 00 1 1
N=6, S=2, Z=10, so answer 6*2/(10+2)=1, but there are 2 1s!
What is the assumption of the naïve method?
30
Queries over a (long) sliding window
DGIM method
Does not assume uniformity
Bounded approximation error (within 1+r of the true answer) Stores only O(log2N ) bits*
* All logarithms are base-2
Datar, Gionis, Indyk, Montwani
Maintaining stream statistics over sliding windows: (extended abstract)
http://dl.acm.org/citation.cfm?id=545466&CFID=730180783&CFTOKEN=65233872
31
Queries over a (long) sliding window
DGIM method Timestamps
Each element has timestamp t mod N, where t is 1,2,3,… and N is the window length.
Each timestamp needs O(logN) bits space
e.g., 4 represented as 100 with O(log(4)) bits
Bucket (a segment of the window)
Contains the timestamp of its most recent element
Contains the number of 1s in it (size), which is a power of 2
32
Queries over a (long) sliding window
DGIM method
Memory needed to store a Bucket
Timestamp needs O(log(N)) bits
Size
is a power of 2
is at most 2^log(N), since each bucket cannot have more than N 1s. It suffices to store log(N) in bits.
Thus, we need O(log(log(N))) bits to represent size Overall, O(log(N)) bits.
We have O(log(N)) buckets
So O(log(N)*log(N))=O(log^2(N)) bits.
33
Queries over a (long) sliding window
Rules for representing a stream with buckets
The right end of a bucket is always an 1
Every 1 is in a bucket
No position is in >1 buckets
There are one or two buckets of any given size, up to some maximum size
All sizes must be a power of 2
Buckets cannot decrease in size as we move from left to right
34
Updating Buckets (1)
When a new bit comes in, drop the last (oldest) bucket if its end-time is prior to N time units before the current time
2 cases: Current bit is 0 or 1
If the current bit is 0:
no other changes are needed
35
Updating Buckets (2)
If the current bit is 1:
(1) Create a new bucket of size 1, for just this bit
End timestamp = current time
(2) If there are now three buckets of size 1,
combine the oldest two into a bucket of size 2
(3) If there are now three buckets of size 2, combine the oldest two into a bucket of size 4
(4) And so on …
We need to satisfy rule: there are one or two buckets of each size
36
Example: Updating Buckets
110101
001010110001011
010110001011101010101010111 0101100010111 010110001011
Current state of the stream:
10010101100010110101010101010110101010101011101010101 00010110010
Bit of value 1 arrives
0101010101010110101010101011101010101110101000101100101 Two blue buckets get merged into a yellow bucket
0010101100010110101010101010110101010101011101010101110101000101100101
Next bit 1 arrives, new blue bucket is created, then 0 comes, then 1:
0 010101010101110101010111010100010110010110 Buckets get merged and new yellow (rightmost) is created…
010101010101011010101010101110101010111010100010110010110 Subsequent merging… State of the buckets after merging
0101010101010110101010101011101010101110101000101100101101
37
How to Query?
To estimate the number of 1s in the most recent N bits:
1. Sum the sizes of all buckets but the last (leftmost)
(note “size” means the number of 1s in the bucket)
2. Add half the size of the last bucket
Remember: We do not know how many 1s of the last bucket are still within the wanted window. This creates error.
38
Example: Bucketized Stream
1001010110001011
At least 1 of
size 16.
Partially beyond window. The leftmost 8 bits are not shown (can be anything)
2 of size 8
2 of size 4
1 of size 2
2 of size 1
010101010101011010101010101110101010111010100010110010
N
Last bucket 1⁄2*size=1/2*16=8
Other buckets
Sum of sizes=8+8+4+4+2+1+1=28
Approximate answer = 8 + 28 =36 Exact answer = 6 + 28 = 34
39
Error Bound: Proof
1001010110001011
With 1 or 2 buckets of each size, the error can be at most 50%. Let’s prove it!
Suppose the last bucket has size 2r that is partially covered.
Estimate < correct answer
Then by assuming 2r-1 (i.e., half) of its 1s are still within the window, we make an error of at most 2r-1 (worst case in which all were 1s) – the estimate misses half a bucket
Thus, error at most 50% 8/16 1s in the window
At least 16 1s
010101010101011010101010101110101010111010100010110010
N
Error Bound: Proof
11111111000000001
1101010101011
110101 101
With 1 or 2 buckets of each size, the error can be at most 50%. Let’s prove it!
Suppose the last bucket has size 2r
Estimate >= correct answer
Only the rightmost bit of the bucket is within the answer
Only one bucket of size each size that is less than 2r
the true sum is at least
1 + 2 + 4 + .. + 2r-1 = 2r -1
Thus, error at most 50%
At least 16 1s
0101010101011101010101
000 10010
N
41
Further Reducing the Error
Instead of maintaining 1 or 2 of each size bucket, we allow either r-1 or r buckets (r > 2)
Except for the largest size buckets; we can have any number between 1 and r of those
Error is at most O(1/r)
By picking r appropriately, we can tradeoff
between number of bits we store and the error
42
Python libraries
HASHLIB library (standard library for hashing) https://docs.python.org/3/library/hashlib.html
examples: https://pymotw.com/2/hashlib
>>> import hashlib
>>> hashed_string=hashlib.sha1(b’String to hash’).hexdigest() >>> hashed_string
‘422fbfbc67fe17c86642c5eaaa48f8b670cbed1b’
>>> md5=hashlib.md5()
>>> md5.update(b’Another string to hash’)
>>> md5.hexdigest()
‘1482ec1b2364f64e7d162a2b5b16f477’
43
Python libraries
CSAMPLE library (hash-based and reservoir sampling) https://pypi.python.org/pypi/csample
In cloudera terminal: pip install csample
>>> import csample
>>> data=[‘alan’,’brad’,’cate’,’david’]
>>> samples=csample.sample_line(data,0.5)
It uses a hash function xxhash or spooky to get an a/b fraction of the tuples (here a/b=0.5).
>>> my_list=[0,1,2,3,4,5]
>>> reservoir_sample=csample.reservoir(my_list,3) >>> reservoir_sample
[0, 5, 4]
>>> reservoir_sample=csample.reservoir(my_list,3) [0, 5, 2]
It performs reservoir sampling
44
Python libraries
DGIM library
Implements the DGIM method, to estimate the number of “True” elements in the last N elements of a Boolean stream (“False” corresponds to 0, “True” to 1)
https://dgim.readthedocs.io/en/latest/readme.html#usage In cloudera terminal: pip install csample
N (int) – sliding window width error_rate(float) – maximum error
45