代写代考 COMP9313: Big Data Management

COMP9313: Big Data Management
Course web site: http://www.cse.unsw.edu.au/~cs9313/

Chapter 6.1:
Mining Data Streams

Data Streams
❖ In many data mining situations, we do not know the entire data set in advance
❖ Stream Management is important when the input rate is controlled externally:
➢ Google queries
➢ Twitter or Facebook status updates
❖ We can think of the data as infinite and non-stationary (the distribution changes over time)

Characteristics of Data Streams
❖ Traditional DBMS: data stored in finite, persistent data sets
❖ Data Streams: distributed, continuous, unbounded, rapid, time
varying, noisy, . . .
❖ Characteristics
➢ Huge volumes of continuous data, possibly infinite
➢ Fast changing and requires fast, real-time response
➢ Random access is expensive—single scan algorithm (can only have one look)
➢ Store only the summary of the data seen thus far

Massive Data Streams
❖ Data is continuously growing faster than our ability to store or index it
❖ There are 3 Billion Telephone Calls in US each day,
30 Billion emails daily, 1 Billion SMS, IMs
❖ Scientific data: NASA’s observation satellites generate billions of readings each per day
❖ IP Network Traffic: up to 1 Billion packets per hour per router. Each ISP has many (hundreds) routers!

The Stream Model
❖ Input elements enter at a rapid rate, at one or more input ports (i.e., streams)
➢ We call elements of the stream tuples
❖ The system cannot store the entire stream accessibly
❖ Q: How do you make critical calculations about the stream using a limited amount of memory?

Database Management System ( Processing
DBMS) Data
Standing Queries
Ad-Hoc Queries
Data Storage (RDBMS, NoSQL,
Big Data Processing Platforms, etc.)

General Data Stream Management System (DSMS) Processing Model
Ad-Hoc Queries
Standing Queries
. . . 1, 5, 2, 7, 0, 9, 3
… a,r,v,t,y,h,b
. . . 0, 0, 1, 0, 1, 1, 0
Streams Entering.
Each stream is composed of
elements/tuples
Limited Working Storage
Archival Storage

Continuous Query (CQ) Result
Query Processing
DataStream(s) MainMemory DataStream(s)
DBMS vs. DSMS #1
Query Processing
Main Memory

❖ Traditional DBMS:
➢ stored sets of relatively static records with no pre-defined notion of time
➢ good for applications that require persistent data storage and complex querying
support on-line analysis of rapidly changing data streams
data stream: real-time, continuous, ordered (implicitly by arrival time or explicitly by timestamp) sequence of items, too large to store entirely, no ending
continuous queries
DBMS vs. DSMS #2

❖ Persistent relations (relatively static, stored)
❖ One-timequeries
❖ Random access
❖ “Unbounded” disk store
❖ Only current state matters
❖ No real-time services
❖ Relatively low update rate
❖ Data at any granularity
❖ Assumeprecisedata
❖ Access plan determined by query processor, physical DB design
❖ Transient streams (on-line analysis)
❖ Continuous queries (CQs)
❖ Sequential access
❖ Bounded main memory
❖ Historical data is important
❖ Real-time requirements
❖ Possibly multi-GB arrival rate
❖ Data at fine granularity
❖ Data stale/imprecise
❖ Unpredictable/variable data arrival and characteristics
DBMS vs. DSMS #3

Problems on Data Streams
❖ Types of queries one wants on answer on a data stream: (we’ll learn these today)
➢ Sampling data from a stream  Construct a random sample
➢ Queries over sliding windows
 Number of items of type x in the last k elements of the stream
➢ Filtering a data stream
 Select elements with property x from the stream
➢ Counting distinct elements
 Number of distinct elements in the last k elements of the
➢ Finding frequent elements ➢……

Applications
❖ Mining query streams
➢ Google wants to know what queries are more frequent today than
❖ Mining click streams
➢ Yahoo wants to know which of its pages are getting an unusual number of hits in the past hour
❖ Mining social network news feeds
➢ E.g., look for trending topics on Twitter, Facebook
❖ Sensor Networks
➢ Many sensors feeding into a central controller
❖ Telephone call records
➢ Data feeds into customer bills as well as settlements between
telephone companies
❖ IP packets monitored at a switch
➢ Gather information for optimal routing 6.13

Example: IP Network Data
❖ Networks are sources of massive data: the metadata per hour per IP router is gigabytes
❖ Fundamental problem of data stream analysis: ➢ Too much information to store or transmit
❖ So process data as it arrives
➢ One pass, small space: the data stream approach
❖ Approximate answers to many questions are OK, if there are guarantees of result quality

Part 1: Sampling Data Streams

Sampling from a Data Stream
❖ Since we can not store the entire stream, one obvious approach is to store a sample
❖ Two different problems:
➢ (1) Sample a fixed proportion of elements in the stream (say 1 in 10)
 As the stream grows the sample also gets bigger
➢ (2) Maintain a random sample of fixed size over a potentially infinite
 As the stream grows, the sample is of fixed size
 At any “time” t we would like a random sample of s elements
– What is the property of the sample we want to maintain? For all time steps t, each of t elements seen so far has equal probability of being sampled

Sampling a Fixed Proportion
❖ Problem 1: Sampling fixed proportion ❖ Scenario: Search engine query stream
➢ Stream of tuples: (user, query, time)
➢ Answer questions such as: How often did a user run the
same query in a single days
➢ Have space to store 1/10th of query stream
❖ Naïve solution:
➢ Generate a random integer in [0..9] for each query ➢ Store the query if the integer is 0, otherwise discard

Problem with Naïve Approach
❖ Simple question: What fraction of queries by an average search engine user are duplicates?
➢ Suppose each user issues x queries once and d queries twice (total of x+2d queries)
 Correct answer: d/(x+d)
➢ Proposed solution: We keep 10% of the queries
 Sample will contain x/10 of the singleton queries and 2d/10 of the duplicate queries at least once
 But only d/100 pairs of duplicates – d/100=1/10∙1/10∙d
 Of d “duplicates” 18d/100 appear exactly once – 18d/100 = ((1/10 ∙ 9/10)+(9/10 ∙ 1/10)) ∙ d
➢ So the sample-based answer is 100 = 𝒅
𝑥 + 𝑑 +18𝑑 10 100 100

Solution: Sample Users
❖ Pick 1/10th of users and take all their searches in the sample
❖ Use a hash function that hashes the username or user id uniformly
into 10 buckets
➢ We hash each username to one of ten buckets, 0 through 9
➢ If the user hashes to bucket 0, then accept this search query for the sample, and if not, then not.

Generalized Problem and Solution
❖ Problem: Give a data stream, take a sample of fraction a/b. ❖ Stream of tuples with keys:
➢ Key is some subset of each tuple’s components  e.g., tuple is (user, search, time); key is user
➢ Choice of key depends on application
❖ To get a sample of a/b fraction of the stream:
➢ Hash each tuple’s key uniformly into b buckets ➢ Pick the tuple if its hash value is at most a
How to generate a 30% sample?
Hash into b=10 buckets, take the tuple if it hashes to one of the first 3 buckets

Sample Operator in Spark
❖ sample(withReplacement, fraction, seed)
➢ Return a sampled subset of this RDD.
➢ withReplacement: can elements be sampled multiple times
➢ fraction: expected size of the sample as a fraction of this RDD’s size without replacement
 This is not guaranteed to provide exactly the fraction specified of the total count of the given
➢ seed: seed for the random number generator

Maintaining a Fixed
❖ Problem 2: Fixed-size sample
❖ Suppose we need to maintain a random sample S of size exactly s
➢ E.g., main memory size constraint
❖ Why? Don’t know length of stream in advance ❖ Suppose at time n we have seen n items
➢ Each item is in the sample S with equal prob. s/n
How to think about the problem: say s = 2
Note that the same item is treated as different tuples at different timestamps
Stream: a x c y z k c d e g…
At n= 5, each of the first 5 tuples is included in the sample S with equal prob. At n= 7, each of the first 7 tuples is included in the sample S with equal prob.
size Sample
Impractical solution would be to store all the n tuples seen so far and out of them pick s at random

Solution: Fixed Size Sample
❖ Algorithm (a.k.a. Reservoir Sampling)
➢ Store all the first s elements of the stream to S
➢ Suppose we have seen n-1 elements, and now the nth element arrives
 With probability s/n, keep the nth element, else discard it
 If we picked the nth element, then it replaces one of the s elements in the sample S, picked uniformly at random
❖ Claim: This algorithm maintains a sample S with the desired property:
➢ After n elements, the sample contains each element seen so far with probability s/n

Proof: By Induction
❖ We prove this by induction:
➢ Assume that after n elements, the sample contains each element
seen so far with probability s/n
➢ We need to show that after seeing element n+1 the sample
maintains the property
 Sample contains each element seen so far with probability
❖ Base case:
➢ After we see n=s elements the sample S has the desired property
 Each out of n=s elements is in the sample with probability s/s =1

Proof: By Induction
❖ Inductive hypothesis: After n elements, the sample S contains each element seen so far with prob. s/n
❖ Now element n+1 arrives
❖ Inductive step: For elements already in S, probability that the
algorithm keeps it in S is:
1− s + s s−1= n
 n + 1   n + 1   s  n + 1
Element n+1 Element in the not discarded sample not picked
Element n+1 discarded
❖ So, at time n, tuples in S were there with prob. s/n
❖ Time n→n+1, tuple stayed in S with prob. n/(n+1)
❖ So prob. tuple is in S at time n+1 = 𝒔 ⋅ 𝒏 = 𝒔

takeSample
Operator in Spark
❖ takeSample(withReplacement, num, seed=None)
➢ Return a fixed-size sampled subset of this RDD.
➢ withReplacement: can elements be sampled multiple times
➢ num: sample size
➢ This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

: Querying Data Streams

Sliding Windows
❖ A useful model of stream processing is that queries are about a window of length N – the N most recent elements received
❖ Interesting case: N is so large that the data cannot be stored in memory, or even on disk
➢ Or, there are so many streams that windows for all cannot be stored ❖ Amazon example:
➢ For every product X we keep 0/1 stream of whether that product was sold in the n-th transaction
➢ We want answer queries, how many times have we sold X in the last k sales

Sliding Window: 1 Stream
❖ Sliding window on a single stream:
q w e r t y u i o p a s d f g h jk lz x c v b n m
q w e r t y u i o p a s d f g h jk lz x c v b n m
q w e r t y u i o p a s d f g h jk lz x c v b n m
q w e r t y u i o p a s d f g h jk lz x c v b n m Past Future

Counting Bits (1)
❖ Problem:
➢ Given a stream of 0s and 1s
➢ Be prepared to answer queries of the form:
How many 1s are in the last k bits? where k ≤ N
❖ Obvious solution:
➢ Store the most recent N bits
 When new bit comes in, discard the N+1st bit 0 1 0 0 1 1 0 1 1 1 0 1 0 1 0 1 1 0 1 1 0 1 1 0
Past Future
Suppose N=7

Counting Bits (2)
❖ You can not get an exact answer without storing the entire window ❖ Real Problem:
What if we cannot afford to store N bits?
➢ E.g., we’re processing 1 billion streams and N = 1 billion
❖ But we are happy with an approximate answer
0 1 0 0 1 1 0 1 11 01 0 10 11 0 11 01 10
Past Future

An attempt: Simple solution
❖ Q: How many 1s are in the last N bits?
❖ A simple solution that does not really solve our problem: Uniformity
Assumption
010011100010100100010110110111001010110011010
Past Future
❖ Maintain 2 counters:
➢ S: number of 1s from the beginning of the stream ➢ Z: number of 0s from the beginning of the stream
❖ How many 1s are in the last N bits? 𝑵 ∙ 𝑺 𝑺+𝒁
❖ But, what if stream is non-uniform?
➢ What if distribution changes over time?

❖ Maintaining Stream Statistics over Sliding Windows (SODA’02)
❖ DGIM solution that does not assume uniformity
❖ We store 𝑶(log𝟐𝑵) bits per stream
➢ If 𝑵 = 2^16 (64KB), log (log 𝑵) = log (16) = 4
❖ Solution gives approximate answer, never off by more than 50% ➢ Error factor can be reduced to any fraction > 0, with more
complicated algorithm and proportionally more stored bits

Window of width 16 has 6 1s
looking backward
➢ Drop small regions if they begin at the same point as a larger region
Idea: Exponential Windows
❖ Solution that doesn’t (quite) work:
➢ Summarize exponentially increasing regions of the stream,
010011100010100100010110110111001010110011010
We can reconstruct the count of the last N bits, except we are not sure how many of the last 6 1s are included in the N

What’s Good?
❖ Stores only 𝑶(log𝟐 𝑵) bits
➢ 𝑶(log 𝑵) counts of log𝟐𝑵 bits each
❖ Easy update as more bits enter
❖ Error in count no greater than the number of 1s in the “unknown” area

What’s Not So Good?
❖ As long as the 1s are fairly evenly distributed, the error due to the unknown region is small – no more than 50%
❖ But it could be that all the 1s are in the unknown area at the end ❖ In that case, the error is unbounded!
➢ Because that the number of 1’s in the known regions could be 0! 6
010011100010100100010110110111001010110011010

Fixup: DGIM Algorithm
❖ Idea: Instead of summarizing fixed-length blocks, summarize blocks with specific number of 1s:
➢ Let the block sizes (number of 1s) increase exponentially
❖ When there are few 1s in the window, block sizes stay small, so errors
1001010110001011010101010101011010101010101110101010111010100010110010 N

DGIM: Timestamps
❖ Each bit in the stream has a timestamp, starting from 1, 2, … ❖ Record timestamps modulo N (the window size), so we can
represent any relevant timestamp in 𝑶(log𝟐𝑵) bits
➢ E.g., given the windows size 40 (N), timestamp 123 will be
recorded as 3, and thus the encoding is on 3 rather than 123

1001010110001011
DGIM: Buckets
❖ A bucket in the DGIM method is a record consisting of:
➢ (A) The timestamp of its end [𝑶(log 𝑵) bits]
➢ (B) The number of 1s between its beginning and end [𝑶(loglog 𝑵) bits]
❖ Constraint on buckets:
➢ Number of 1s must be a power of 2
➢ That explains the 𝑶(loglog 𝑵) in (B) above
010101010101011010101010101110101010111010100010110010

Representing a Stream by Buckets
❖ The right end of a bucket is always a position with a 1
❖ Every position with a 1 is in some bucket
❖ Either one or two buckets with the same power-of-2 number of 1s
❖ Buckets do not overlap in timestamps
❖ Buckets are sorted by size
➢ Earlier buckets are not smaller than later buckets
❖ Buckets disappear when their end-time is > N time units in the past

At least 1 of
size 16. Partially beyond window.
Bucketized
2 of size 8
2 of 1 of size 4 size 2
2 of size 1
1001010110001011010101010101011010101010101110101010111010100010110010 N
❖ Three properties of buckets that are maintained:
➢ Either one or two buckets with the same power-of-2 number of 1s ➢ Buckets do not overlap in timestamps
➢ Buckets are sorted by size

Updating Buckets
❖ When a new bit comes in, drop the last (oldest) bucket if its end-time is prior to N time units before the current time
❖ 2 cases: Current bit is 0 or 1
❖ If the current bit is 0: no other changes are needed ❖ If the current bit is 1:
➢ (1) Create a new bucket of size 1, for just this bit  End timestamp = current time
➢ (2) If there are now three buckets of size 1, combine the oldest two into a bucket of size 2
➢ (3) If there are now three buckets of size 2, combine the oldest two into a bucket of size 4
➢ (4) And so on …

1001010110001011 10101010101011
10101010101011
10101010101011 1010101010111 10101010101011 1010101010111 10101010101011 1010101010111
1010101010101101010101010111 1010101110101
1010101110101
Example: Updating Buckets
Current state of the stream:
0 010101010101110101010111010100010110010
Bit of value 1 arrives
0010101100010110 0101010101011101010101110101000101100101 Two white buckets get merged into a yellow bucket
0010101100010110 0 0 000101100101
Next bit 1 arrives, new orange white is created, then 0 comes, then 1:
0101100010110 0
Buckets get merged…
0101100010110 0
State of the buckets after merging
0101100010110
01010101110101000101100101101 0 000101100101101 0 000101100101101

How to Query?
❖ To estimate the number of 1s in the most recent N bits: ➢ Sum the sizes of all buckets but the last
 (note “size” means the number of 1s in the bucket) ➢ Add half the size of the last bucket
❖ Remember: We do not know how many 1s of the last bucket are still within the wanted window
❖ Example:
At least 1 of 2 of 2 of 1 of size 16. Partially size 8 size 4 size 2 beyond window.
2 of size 1
1001010110001011010101010101011010101010101110101010111010100010110010 N

Error Bound: Proof
❖ Why is error 50%? Let’s prove it!
❖ Suppose the last bucket has size 2r
❖Thenbyassuming2r-1 (i.e.,half)ofits1sarestillwithinthewindow, we make an error of at most 2r-1
❖ Since there is at least one bucket of each of the sizes less than 2r, the true sum is at least
1 + 2 + 4 + .. + 2r-1 = 2r -1
❖ Thus, error at most 50% 111111110000000011101010101011010101010101110101010111010100010110010
At least 16 1s

Further Reducing the Error
❖ Instead of maintaining 1 or 2 of each size bucket, we allow either r-1 or r buckets (r > 2)
➢ Except for the largest size buckets; we can have any number between 1 and r of those
❖ Error is at most O(1/r) ➢ WHY?
❖ By picking r appropriately, we can tradeoff between number of bits we store and the error

Extensions (optional)
❖ Can we use the same trick to answer queries How many 1’s in the last k? where k < N? ➢ A: Find earliest bucket B that at overlaps with k. Number of 1s is the sum of sizes of more recent buckets + 1⁄2 size of B 1001010110001011010101010101011010101010101110101010111010100010110010 k ❖ Can we handle the case where the stream is not bits, but integers, and we want the sum of the last k elements? Extensions (optional) ❖ Stream of positive integers ❖ We want the sum of the last k elements ➢ Amazon: Avg. price of last k sales ❖ Solution: ➢ (1) If you know all have at most m bits  Treat m bits of each integer as a separate stream  Use DGIM to count 1s in each integer  The sum is = σ𝑚−1 𝑐 2𝑖 c ...estimated count for the i-th bit 𝑖=0𝑖 i ➢ (2) Use buckets to keep partial sums  Sum of elements in size b bucket is at most 2b 2571384679137 71331226 257138467913765 263 257138467913765 2632 2 5 7 1 3 8 4 6 7 9 1 3 7 6 5 Idea: Sum in each bucket is at most 2b (unless bucket has only 1 integer) Bucket sizes: 16 8 4 2 1 ❖ Chapter 4, Mining of Massive Datasets. References Chapter 6.1