程序代写代做代考 graph algorithm html Hive 7CCSMBDT – Big Data Technologies Week 8

7CCSMBDT – Big Data Technologies Week 8
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1

Objectives
 Data streams
 What are they?
 What are their properties?
 How do we manage them?
 Common queries (problems) on streams  Sampling
 Window queries
 Reading
 Chapter 4 Leskovec’s book
 Tutorial: mining massive data streams http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
2

Motivation
 So far, all our data was available when we wanted it
 BUT
 we may not know the entire data set in advance
 data may have high velocity, so that we cannot store
all data and then access them
In domains such as
 web click-stream data
 computer network monitoring data  telecommunication connection data  readings from sensor networks
 stock quotes
3

Data stream definition
 Data stream
 An ordered and potentially infinite sequence of
elements
 An element (a.k.a data point) can be:
 event “page A visited” in web click-stream data
 stock price “10.3” in stock quote data
 customer record “[item, qty, price]” in marketing data  graph in social network data streams
…
4

Data stream example 1
 Network of Bluetooth sensor tags  each contains various sensors and outputs a stream of JSON data
“2016-01-26T20:47:53”
“2016-01-26T20:47:55”
“2016-01-26T20:47:57”
http://processors.wiki.ti.com/index.php/SensorTag_User_Guide#SensorTag_Hardware_Overview
5
Monitor Avg(temp) every 30 secs and display SensorName, Avg(temp) if Avg(temp)>100 degrees

Data stream example 2
 HTTP server log
Report number of distinct accessed page every hour Report the most frequently accessed pages every hour
Report average view time of a page every hour
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
6

Properties of data streams
1. Potentially infinite
• Transient (stream might not be realized on disk)
• Single pass over the data
• Only summaries can be stored
• Real-time processing (in main memory)
2. Data streams are not static
• Incremental updates • Concept drift
• Forgetting old data
3. Temporal order may be important
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
7

Data Stream Management System (DSMS)
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Streams entering the DSMS
Processor
Often have different velocity (arrival rate) and data types
8

Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
Archives streams
Not for querying due to
very low efficiency of data retrieval
9

Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
* Storage space in disk or in main memory
* Used for answering queries
* Cannot store all data
Limited Working Storage
10

Data stream management system
permanently executing and produce outputs at appropriate times
Standing Queries
Example of standing query
“output an alert, when the variable TEMPERATURE exceeds 25”
Archival Storage
Processor
Limited Working Storage
11

Data stream management system
Example of standing query
“list the total number of transactions
in the last hour”
Ad-Hoc Queries
Standing Queries
Processor
Output
Query answers
Archival Storage
Limited Working Storage
12
asked once about the current state of a stream or streams

Common queries
 Sampling data from a stream
 Construct a random sample (i.e., a random subset
of the stream)
 Queries over sliding windows
 Number of items of type x in the last k elements of the stream
13

Common queries
 Filtering a data stream
 Select elements with property x from the stream
 Counting distinct elements
 Number of distinct elements in the last k elements
of the stream
 Estimating frequency moments
 Estimate avg./std.dev. of last k elements
 Finding frequent elements
14

Common applications of these queries
 Mining query streams
 Google wants to know what queries are
more frequent today than yesterday
 Mining click streams
 Yahoo wants to know which of its pages are
getting an unusual number of hits in the past hour
 Mining social network news feeds
 E.g., look for trending topics on Twitter, Facebook
15

Sampling from data stream
 Construct a sample from a data stream such that we can ask queries on the sample and get answers that are statistically representative of the stream as a whole
 Example application
 Stream from search query engine
(u1, q1, t1), (u1, q1, t2), (u2, q2, t3),…, user query time
 What fraction of the typical user’s queries were repeated over the last month?
 Cannot store more than 1/10th of the stream
16

Sampling from data stream
 Naïve solution for the example application Store 1/10 of each user’s queries:
 Generate a random integer in [0..9] for each user’s query
 Store the tuple (u,q,t) if the integer is 0, otherwise discard
 For users with many queries, 1/10th of their queries will be in the sample
————
Is the naïve solution good for answering:
What is the fraction of duplicate queries in the stream by an average user?
Stream=[q1,q1,q2,q2,q3] x=1,d=2, fraction of duplicates is 2/3
Suppose each user issues x queries once and d queries twice. Queries issued twice will be “duplicate queries”.
Correct answer: 𝒅 𝒙+𝒅
17

Sampling from data stream
 Same query (fraction of duplicate queries), based on sample?
 Out of x singleton queries, x/10 will appear in the sample
 This assumes that I have many singletons and happens because I sample
1/10 of the elements (each singleton is an element)
 Duplicate query twice in the sample with probability 1/100
Pr(1st occurrence in sample)*Pr(2nd occurrence in sample)=1/10*1/10=1/100
This is because when I have a duplicate query q1q1, to include
it in the sample, I need to include both q1s and each q1 is sampled with probability 1/10.
 Out of d duplicate queries, 𝑑 ∗ 1 will appear twice in the sample 100
18

Sampling from data stream
 Fraction of duplicate queries, based on sample?
 Duplicate query once in the sample with probability 18/100 Pr(1st occ. in sample)*Pr(2nd occ. not in sample) +
Pr(2nd occurrence in sample)*Pr(1st occurrence not in sample)= =1/10*(1-1/10) + 1/10*(1-1/10) =1/10*9/10+1/10*9/10=18/100
This is because when I have a duplicate query q1q1 I can miss it in the sample by: (I) not sampling the second q1 (red arrow), OR (II) by not sampling the first q1 (yellow arrow)
 Out of d duplicate queries, 𝑑 ∗ 18 will appear once in the sample 100
Fraction of duplication queries based on sample
𝑑∗ 1 100
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
=
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
Fraction of duplicate queries based on entire data stream
𝒅 𝒙+𝒅
19

Sampling from data stream
 How bad is the sample-based answer?
The sample-based answer is
𝑑∗ 1
100 =
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
The answer based on the entire stream is 𝒅
• For x=1000, d=300 𝒙+𝒅
• Sample-based answer 0.019 = 1.9%
• Answer based on entire stream 0.231 = 23.1%. 12.1 times larger
• For x=1000, d=50
• Sample based answer 0.0046= 0.46%
• Answer based on entire stream 0.0476= 4.76%. 10.4 times larger
WHAT WENT SO WRONG?
20

Sampling from data stream
 Each user issues x queries once and d queries twice
What is the fraction of duplicate queries in the stream by an average user?
Main idea: We want avg user, and we sampled queries
Solution
 Select 1/10th of users (assuming this is <=10% of the stream)  For each such user, store all their queries in the sample  To get 1/10th of users  use a hash function h: user{1,2,...,10}  If h(user)=1, we accept their query, otherwise we discard it  Then, count the duplicates in the sample: d/(x+d), for each user 21 Sampling from data stream  General problem  Stream of tuples with keys:  Key is some subset of each tuple’s components  e.g., tuple is (user, search, time); key is user  Choice of key depends on application  To get a sample of a/b fraction of the stream:  Hash each tuple’s key uniformly into b buckets  Pick the tuple if its hash value is in {1,..., a} * How to generate a 30% sample? * What if more and more new users come, and we have a budget on the number of tuples that we can store? 22 Sampling from data stream  General problem  To get a (uniform) sample of s tuples from a stream (e1,e2,...) that contains >s tuples
 Reservoir sampling
 Add the first s tuples from the stream into a reservoir R
 For j>s
 With probability 𝑠 replace a random entry of R with the j-th tuple 𝑗
of the stream (i.e., newcomer replaces random old one)
 At j=n, return R
 Does this work?
 We need to prove that after seeing n tuples, R contains each tuple
seen so far with probability s/n
23

Sampling from data stream
 Proof by induction for s=1 (exercise for given s>=1)
 For j=1, the tuple is in the sample with probability 1 = 1 = 1
𝑗1
 Assume that for j=n, each tuple is in the sample with prob. 1 𝑛
 We need to show that for j=n+1, each tuple is in the sample with
probability 1 𝑛+1
24

Sampling from data stream
 Proof by induction for s=1 (exercise for given s>=1)  For j=n+1
 The tuple j=n+1 is selected with probability 1
𝑛+1
(the for in the algorithm does this)
 If this tuple is selected, the old tuple in R is replaced with prob. 1
 Thus, the prob. that an old tuple is replaced is 1 and not replaced with prob. 1 − 1 𝑛+1
𝑛+1
 So, R contains an old tuple either because it was added into R and not replaced by j, which happens with prob.
1∗1−1=1 𝑛 𝑛+1 𝑛+1
Probability in the sample by the induction for j=n
Probability NOT replaced
or because it was added in the latest round: prob 1 QED 𝑛+1
25

Queries over a (long) sliding window
 Many queries are about a window containing the N most recent elements.
 Window is long: N is so large that the data cannot be stored in memory, or even on disk
 Amazon example:
 For every product X we keep 0/1 stream of whether that product was sold in the n-th transaction
 We want to answer queries, how many times have we sold X in the last k transactions
26

Queries over a (long) sliding window
 Sliding window on a single stream: N = 6 0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 Past Future
27

Queries over a (long) sliding window
 Problem:
 Given a stream of 0s and 1s
 How many 1s are in the last k bits? where k ≤ N
 Obvious solution:
Store the most recent N bits
 When new bit comes in, discard the N+1st bit
0 1 0 1 01 0 0 0 10 1 0 1 10 0 1 0 11 1 Past Future
28

Queries over a (long) sliding window
 What if we cannot afford to store N bits?  E.g., N = 1 billion
 We cannot get an exact answer (the bits we do not store may affect the result)
 But we can still get an approximate answer
29

Queries over a (long) sliding window
 Naïve method
Maintain 2 counters:
S: number of 1s from the beginning of the stream Z: number of 0s from the beginning of the stream
How many 1s are in the last N bits? 𝑵 ∙ 𝑺 𝑺+𝒁
0001110 00111
N=6, S=6, Z=6, so answer 𝑵 ∙ 𝑺 = 𝟔 ⋅ 𝟔 = 𝟑 is the same
𝑺+𝒁 𝟔+𝟔
as that if we stored all N last bits
0 0 0 0 0 0 0 0 00 1 1
N=6, S=2, Z=10, so answer 6*2/(10+2)=1, but there are 2 1s!
What is the assumption of the naïve method?
30

Queries over a (long) sliding window
 DGIM method
 Does not assume uniformity
 Bounded approximation error (within 1+r of the true answer)  Stores only O(log2N ) bits*
* All logarithms are base-2
Datar, Gionis, Indyk, Montwani
Maintaining stream statistics over sliding windows: (extended abstract)
http://dl.acm.org/citation.cfm?id=545466&CFID=730180783&CFTOKEN=65233872
31

Queries over a (long) sliding window
 DGIM method  Timestamps
 Each element has timestamp t mod N, where t is 1,2,3,… and N is the window length.
 Each timestamp needs O(logN) bits space
e.g., 4 represented as 100 with O(log(4)) bits
 Bucket (a segment of the window)
 Contains the timestamp of its most recent element
 Contains the number of 1s in it (size), which is a power of 2
32

Queries over a (long) sliding window
 DGIM method
 Memory needed to store a Bucket
Timestamp needs O(log(N)) bits
 Size
 is a power of 2
 is at most 2^log(N), since each bucket cannot have more than N 1s. It suffices to store log(N) in bits.
Thus, we need O(log(log(N))) bits to represent size Overall, O(log(N)) bits.
 We have O(log(N)) buckets
 So O(log(N)*log(N))=O(log^2(N)) bits.
33

Queries over a (long) sliding window
 Rules for representing a stream with buckets
 The right end of a bucket is always an 1
 Every 1 is in a bucket
 No position is in >1 buckets
 There are one or two buckets of any given size, up to some maximum size
 All sizes must be a power of 2
 Buckets cannot decrease in size as we move from left to right
34

Updating Buckets (1)
 When a new bit comes in, drop the last (oldest) bucket if its end-time is prior to N time units before the current time
 2 cases: Current bit is 0 or 1
 If the current bit is 0:
no other changes are needed
35

Updating Buckets (2)
 If the current bit is 1:
 (1) Create a new bucket of size 1, for just this bit
 End timestamp = current time
 (2) If there are now three buckets of size 1,
combine the oldest two into a bucket of size 2
 (3) If there are now three buckets of size 2, combine the oldest two into a bucket of size 4
 (4) And so on …
We need to satisfy rule: there are one or two buckets of each size
36

Example: Updating Buckets
110101
001010110001011
010110001011101010101010111 0101100010111 010110001011
Current state of the stream:
10010101100010110101010101010110101010101011101010101 00010110010
Bit of value 1 arrives
0101010101010110101010101011101010101110101000101100101 Two blue buckets get merged into a yellow bucket
0010101100010110101010101010110101010101011101010101110101000101100101
Next bit 1 arrives, new blue bucket is created, then 0 comes, then 1:
0 010101010101110101010111010100010110010110 Buckets get merged and new yellow (rightmost) is created…
010101010101011010101010101110101010111010100010110010110 Subsequent merging… State of the buckets after merging
0101010101010110101010101011101010101110101000101100101101
37

How to Query?
 To estimate the number of 1s in the most recent N bits:
1. Sum the sizes of all buckets but the last (leftmost)
(note “size” means the number of 1s in the bucket)
2. Add half the size of the last bucket
 Remember: We do not know how many 1s of the last bucket are still within the wanted window. This creates error.
38

Example: Bucketized Stream
1001010110001011
At least 1 of
size 16.
Partially beyond window. The leftmost 8 bits are not shown (can be anything)
2 of size 8
2 of size 4
1 of size 2
2 of size 1
010101010101011010101010101110101010111010100010110010
N
Last bucket 1⁄2*size=1/2*16=8
Other buckets
Sum of sizes=8+8+4+4+2+1+1=28
Approximate answer = 8 + 28 =36 Exact answer = 6 + 28 = 34
39

Error Bound: Proof
1001010110001011
 With 1 or 2 buckets of each size, the error can be at most 50%. Let’s prove it!
 Suppose the last bucket has size 2r that is partially covered.
 Estimate < correct answer  Then by assuming 2r-1 (i.e., half) of its 1s are still within the window, we make an error of at most 2r-1 (worst case in which all were 1s) – the estimate misses half a bucket  Thus, error at most 50% 8/16 1s in the window At least 16 1s 010101010101011010101010101110101010111010100010110010 N Error Bound: Proof 11111111000000001 1101010101011 110101 101  With 1 or 2 buckets of each size, the error can be at most 50%. Let’s prove it!  Suppose the last bucket has size 2r  Estimate >= correct answer
 Only the rightmost bit of the bucket is within the answer
 Only one bucket of size each size that is less than 2r
 the true sum is at least
1 + 2 + 4 + .. + 2r-1 = 2r -1
 Thus, error at most 50%
At least 16 1s
0101010101011101010101
000 10010
N
41

Further Reducing the Error
 Instead of maintaining 1 or 2 of each size bucket, we allow either r-1 or r buckets (r > 2)
 Except for the largest size buckets; we can have any number between 1 and r of those
 Error is at most O(1/r)
 By picking r appropriately, we can tradeoff
between number of bits we store and the error
42

Python libraries
 HASHLIB library (standard library for hashing)  https://docs.python.org/3/library/hashlib.html
 examples: https://pymotw.com/2/hashlib
>>> import hashlib
>>> hashed_string=hashlib.sha1(b’String to hash’).hexdigest() >>> hashed_string
‘422fbfbc67fe17c86642c5eaaa48f8b670cbed1b’
>>> md5=hashlib.md5()
>>> md5.update(b’Another string to hash’)
>>> md5.hexdigest()
‘1482ec1b2364f64e7d162a2b5b16f477’
43

Python libraries
 CSAMPLE library (hash-based and reservoir sampling)  https://pypi.python.org/pypi/csample
In cloudera terminal: pip install csample
>>> import csample
>>> data=[‘alan’,’brad’,’cate’,’david’]
>>> samples=csample.sample_line(data,0.5)
It uses a hash function xxhash or spooky to get an a/b fraction of the tuples (here a/b=0.5).
>>> my_list=[0,1,2,3,4,5]
>>> reservoir_sample=csample.reservoir(my_list,3) >>> reservoir_sample
[0, 5, 4]
>>> reservoir_sample=csample.reservoir(my_list,3) [0, 5, 2]
It performs reservoir sampling
44

Python libraries
 DGIM library
Implements the DGIM method, to estimate the number of “True” elements in the last N elements of a Boolean stream (“False” corresponds to 0, “True” to 1)
 https://dgim.readthedocs.io/en/latest/readme.html#usage  In cloudera terminal: pip install csample
N (int) – sliding window width error_rate(float) – maximum error
45