7CCSMBDT – Big Data Technologies Week 11
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
1
Objectives
Introduce the format of the exam
Go through the main concepts quickly Answer questions
2
Exam
The exam will have a weight 80% (the rest 20% from the two courseworks)
Format
We are waiting for formal approval for the format.
One option (that is not certain yet) is to have an 1 hour, open book exam, with open-ended questions.
Please wait for announcements
3
Exam
Examples of types of open-ended questions
What is X? Briefly discuss
Choose the best technology for X and justify your choice
Provide an example of X
Illustrate the use of X through example (X may be function in Spark, MR, Hive etc.)
Pros and cons of X
Describe a method for X
4
Disclaimer
For each question Q in the exam:
P(Q in the following slides only)<1
P(Q in the slides of the course only)<1
where P denotes a probability.
5
Introduction: Big Data Technologies
Big Data characteristics
• Data quantity is substantial and evergrowing
• Different storage and processing requirements
Volume
• Data speed may be high
• Elastic and available time for processing
Velocity
• Multiple formats and types
Variety
• Bias, noise, abnormalities
• Different integration, transformation, processing and storage requirements
Veracity Value
• Different requirements for removing noise and resolving invalid data
• Data usefulness (utility)
• Depends on veracity, time of processing, storage & analysis decisions (how?)
6
Introduction: Big Data Technologies
Big Data characteristics in [Your own example] from healthcare, transportation, finance, marketing, web, ...
Variety Veracity
Value
Volume Velocity
7
Data analytics & Big Data
Analytics
Processes, technologies, frameworks to extract meaningful insights from
data
Data are filtered, processed, categorized, condensed, and contextualized
Goal is to infer knowledge that makes systems smarter & more efficient
Analytic tasks – The 7 giants
By the National Research council of the US National Academies
7 common classes of tasks for Big Data analysis that help design technologies for Big Data
https://www.nap.edu/read/18374/chapter/12
8
Data analytics & Big Data
Analytics overview
Basic Statistics
Generalized N-body problems
Linear algebraic comput.
Graph- Optimiz. theoretic
comput.
Integration
Alignment Problems
9
TYPES OF ANALYTICS
Descriptive
Diagnostic
Predictive
Prescriptive
Data analytics & Big Data
Descriptive analytics
Aim to answer what has happened, based on past data
Performed by reports, alerts
Basic statistics
Mean, Median, Variance, Count, Top-N, Distinct
Challenging to do on streams
Problem: Given a stream 𝐷 = 𝑥 ,...,𝑥 , find the number 𝐹 of distinct items of 𝐷. 1𝑁0
𝐹𝐹 𝑁=10000,𝑟=10%∗𝑁,𝛾=0.5,max 0, 0 =1.76
𝐹𝐹
[Char] http://dl.acm.org/citation.cfm?doid=335168.335230 10
0
0
Data analytics & Big Data
Descriptive analytics
Linear algebraic computations
Linear Regression
Principal Component Analysis (PCA)
Singular Value Decomposition (SVD): Low-rank approximation
Problem: Approximate matrix 𝑀 with a matrix 𝑀 of rank k such
that 𝑀 − 𝑀 = 𝑚2 − 𝐹 𝑖,𝑗 𝑖𝑗
𝑚2 is minimum 𝑖,𝑗 𝑖𝑗
Solution:𝑀 =𝑈∗𝑑𝑖𝑎𝑔 𝜎 ,...,𝜎 ,0,...,0 ∗𝑉𝑇 𝑘1𝑘
𝐹
Error 𝑀−𝑀 = 𝜎2 +⋯+𝜎2
𝑘+1 𝑟
11
Data analytics & Big Data
Diagnostic analytics
Aims to answer why it has happened, based on past data
Performed by queries, data mining
Linear algebraic computations
Challenge: Large matrices, with “difficult” properties
Generalized N-body problems
Distances, kernels, pairwise similarity, clustering, classification (NN, SVM) Challenge: high-dimensionality
Graph-theoretic computations
Search, centrality, shortest paths, minimum spanning tree
Challenge: high interconnectivity (complex relationships)
12
Data analytics & Big Data
Generalized N-body problems: Kernel smoothing
Estimating a function using its noisy observations
Kernel 𝐾: a nonnegative, real-valued, integrable function s.t.
+∞𝐾𝑢𝑑𝑢=1,𝐾−𝑢 =𝐾𝑢,∀𝑢 −∞
Uniform Kernel 𝐾 𝑢 =
Nadaraya-Watson kernel smoothing
0, 𝑢 < 1 1, 𝑢 ≥ 1
𝑌 = [𝑋 , ... , 𝑋 ] is a probability distribution 1𝑁
A distance 𝑑(𝑋 ,𝑋 ) measures how far is 𝑋 from 𝑋 𝑖𝑗𝑖𝑗
𝑌 is a smoothed distribution where 𝑑𝑋,𝑋
𝑌(𝑋𝑖) =
𝑗=1
h
𝑗
𝑁𝐾 𝑖𝑗⋅𝑌(𝑋)
𝐾
𝑑𝑋,𝑋 𝑖𝑗
h
is estimated based on other 𝑌s and those with close values to 𝑋𝑖
𝑌 𝑋𝑖
contribute more to the estimation and h (bandwidth) controls the “smoothing”
13
Data analytics & Big Data
Graph-theoretic computations: Centrality
http://cs.brynmawr.edu/Courses/cs380/spring2013/section02/slides/05_Centrality.pdf
http://matteo.rionda.to/centrtutorial/
14
Data analytics & Big Data
Predictive analytics
Aims to answer what is likely to happen, based on existing data
Performed by predictive models trained on existing data
Models are used to learn patterns and trends and predict
The occurrence of an event (classification). E.g., Offer car insurance to a person? The value of a variable (regression). E.g., What insurance premium to offer?
Linear algebraic computations
Generalized N-body problems
Graph-theoretic computations
Integration
Compute high-dimensional integrals of functions
Alignment problems
Matching (entity resolution, synonyms in text, images referring to the same entity) 15
Data analytics & Big Data
Predictive analytics: classification
Training data
Classification algorithm
Name
Age
Income
Car
Insure
Anne
22
11K
BMW
N
Bob
33
20K
Nissan
Y
Chris
44
30K
Nissan
Y
Dan
55
50K
BMW
Y
...
...
...
...
...
Classification rules
IF Age<30, Income<15K, Car=BMW THEN Insure=N
IF Age>30, Income>15K, Car=Nissan THEN Insure=Y
If Age>30, Income>40K THEN Insure=Y
Test data
Emily
22
14K
BMW
?
Insure=N
16
Data analytics & Big Data
Predictive analytics: entity resolution
Do they refer to the same person?
How likely do they refer to the same person?
Is Amazon reviewer X and Facebook user Y the same person?
https://www.umiacs.umd.edu/~getoor/Tutorials/ER_VLDB2012.pdf
17
Data analytics & Big Data
Predictive analytics: entity resolution
Training: entity pairs are annotated w.r.t. whether they represent a match or a non-match and a classifier is trained
Application: classifier decides whether two entities match http://dbs.uni-leipzig.de/file/learning_based_er_with_mr.pdf 18
Data analytics & Big Data
Prescriptive analytics
Aims to answer what can we do to make something happen, based
on existing data
Performed by multiple predictive models trained on different data
Example: Electronic health record contains a patient’s diagnosed diseases and prescribed medications over multiple visits to a hospital. What medication to prescribe now for treating a disease?
Generalized N-body problems
Graph-theoretic computations
Alignment problems
Optimization
Maximization/minimization problems solved by linear programing, integer programing 19
Data analytics & Big Data
Prescriptive analytics & optimization
• We want to select a good model and its parameters, among
several possibilities
• Optimization in model training: A core optimization problem, which asks for optimizing the variables or parameters of the problem w.r.t. a selected objective function (e.g., minimize loss) is solved.
• Optimization in model selection and validation: The core optimization problem may be solved many times.
20
Data analytics & Big Data
Prescriptive analytics: linear programming maximize 𝑐 𝑥 +⋯+𝑐 𝑥
subject to
and
11 𝑛𝑛
𝑎 𝑥 + ⋯ + 𝑎 𝑥 ≤ 𝑏 111 1𝑛𝑛1
…
𝑎𝑚1𝑥1+⋯+𝑎𝑚𝑛𝑥𝑛 ≤𝑏𝑚 𝑥1 ≥0,…,𝑥𝑛 ≥0
Design a webpage that brings maximum profit
• An image occupies 5% of the webpage and brings profit $0.1
• A link occupies 1% of the webpage and brings profit $0.01
• A video occupies 15% of the webpage and brings profit $0.5
• Use at most 10 images, 25 links, and 2 videos
21
Data analytics & Big Data
Settings in which analytics are applied:
Default: the dataset is stored in RAM
Streaming: data arrive as a stream, a part (window) is stored
Distributed: data are distributed over multiple machines (in RAM and/or disk)
Multi-threaded: data are in one machine and multiple processors share the RAM of the machine
22
Data collection
Data access connectors
5 common types, each used by several tools
their choice depends on the type of data source
1. Publish-subscribe messaging (used by Apache Kafka, Amazon Kinesis)
Publishers send messages to a topic managed by a broker (intermediary)
Subscribers subscribe to topics
Broker routes messages from publishers to subscribers
23
Data collection
Data access connectors
2. Source-sink connectors (used by Apache Flume)
Source connectors import data from another system (e.g., relational database)
Sink connectors export data to another system (e.g., HDFS)
3. Database connectors (used by Apache Sqoop) Import data from DBMS
24
Data collection
Data access connectors
4. Messaging Queues
(by RabbitMQ, ZeroMQ, AmazonSQS)
Producers push data to the queues
Consumers pull the data from the queues
Producers and consumers do not need to be aware of each other
5. Custom connectors
(used e.g., to collect data from social networks, NoSQL databases, or Internet-of-Things)
Built based on the data sources and data collection requirements For Twitter e.g., https://dev.twitter.com/resources/twitter-libraries
25
Analysis types & modes
Analysis types
Basic statistics
Graph analysis
Classification
Regression
Frequent pattern mining …
Depending on the application, each type can run in
Batch mode: results updated “infrequently” (after days or months)
Real-time mode: results updated “frequently” (after few seconds)
Interactive mode: results updated “on demand” as answer to queries
26
Technologies by analysis mode
Batch mode
Hadoop / MapReduce: framework for distributed data processing
Pig: high-level language to write MapReduce programs
Spark: cluster computing framework; various data analytics components
Solr: scalable framework for searching data
Real-time mode
Spark Streaming component: for stream processing
Storm: for stream processing
Interactive mode
Hive: data warehousing framework built on HDFS, uses SQL-like
language
Spark SQL component: SQL-like queries within Spark programs
27
Apache flume
System for collecting, aggregating, and moving data
from different sources
server logs, databases, social media, IoT devices
into a centralized (big) data store distributed file system or NoSQL database
Advantages over ad-hoc solutions
Reliable, Scalable, High performance
Manageable, Customizable
Low-cost installation, operation and maintenance
Slides based on: Chapter 5.3.1 from Big Data Science & Analytics
A. Prabhakar Planning and Deploying Apache Flume
https://flume.apache.org/FlumeUserGuide.html
28
Apache flume
Event: a unit of data flow having byte payload and possibly a set of attributes (headers)
Headers
Payload
Payload is opaque to Flume
Headers
can be used for contextual routing
are an unordered collection of string key-value pairs
29
Apache flume
Agent: a process that hosts components through which the events flow / transported from a place to another
Source: receives data from data generators and transfers it to one or more channels
requires at least one channel to function
specialized sources for integration with well-known systems
30
Apache flume
Agent: a process that hosts components through which the events flow / transported from a place to another
Channel: a transient store which buffers events until consumed by sinks a channel can work with any number of sources and sinks
different channels offer different levels of durability (memory, file, db)
31
Apache flume
Agent: a process that hosts components through which the events flow / transported from a place to another
Sink: removes events from a channel and transmits them to their next hop destination
different types of sinks
requires exactly one channel to function
32
Apache flume
Data ingest
Clients send events to agents
Source(s) operating with the Agent receive events
Source(s) pass events through Interceptors and if not filtered, puts them on the channel that is identified by channel selector
33
Apache flume
Data drain
Sink Processor identifies a sink and invokes it
Invoked sink takes events from the channel and sends it
to the next hop destination
If event transmission fails, Sink Processor takes secondary action
34
Apache sqoop
Sqoop http://sqoop.apache.org/
Open-source command line tool
Scoop2 with JAVA API under development
Imports data from RDBMS into HDFS
Exports data from HDFS back to RDBMS
Built-in support for RDBMS: MySQL, PostgreSQL, Oracle, SQL Server, DB2, and Netezza
Third-party support for many other data stores
35
Apache sqoop
Sqoop commands
36
Apache sqoop
Sqoop import command
37
Apache sqoop
Sqoop import command
The table to be imported is examined
JAVA code is generated (a class for a table, a method for an attribute, plus methods to interact with JDBC)
38
Apache sqoop
Sqoop import command
id
A1
A2
1
…
1999
2000
…
…
10000
Reading table in parallel
• Split the table horizontally, w.r.t. “split-by” attribute split-by attribute=id
• Use “m” mappers, each retrieves some rows
Select id, A1, A2 from table where id>=0 AND id<2000 Select id, A1, A2 from table where id>=2000 AND id<4000 ...
Select id, A1, A2 from table where id>=8000 AND id<10000
39
Apache sqoop
Sqoop import example
Create a mysql database
40
Apache sqoop
Sqoop import example
Create “widgets” table in the database
41
Apache sqoop
Sqoop import example
Import “widgets” table into HDFS
%scoop import --connect jdbc:mysql://localhost/hadoopguide \
> –username U –password P –table widgets -m 8 –columns “price,design_date”
%scoop import –connect jdbc:mysql://localhost/hadoopguide \
> –username U –password P –m 8 \
> –query “SELECT * FROM widgets WHERE price>3.00“ –direct
Vendor specific connectors with “–direct”
https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#connectors
42
Number of mappers (level of parallelism)
Uses mysql connector for better performance
Apache sqoop
Sqoop export
For efficiency, “m” mappers write data in parallel, an INSERT command may transfer multiple rows
Generates JAVA code to parse records from text files and generate INSERT commands
Picks up a strategy for the target table
43
Apache sqoop
Sqoop export
We first need to create a mysql table and/or db – why?
Both char(64) and varchar(64) can hold Java String, but different to the db.
Then, we use export command
44
ZeroMQ
A high-performance asynchronous messaging library for distributed or concurrent applications.
Runs without a dedicated message broker.
Library for many languages (e.g., C, Java, Python). Can run on any OS.
zmq_recv() and zmq_send() to receive and send a message. Each task is performed by a node and nodes communicate in different ways (as threads, or processes)
Can make an application elastic on a large network:
Routes messages in a variety of patterns (including push/pull, pub/sub) Queues messages automatically
Can deal with over-full queues
No imposed format on messages
Delivers whole messages
…
45
Distributed file system
Provides global file namespace
GoogleGFS,HadoopHDFS, CloudStore
Used when
files are huge (100s of GBs to TB) existing files are rarely updated
reads and appends are common
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
46
Distributed file system
Files are divided into chunks that are stored in chunk servers and replicated on different machines or racks (recovery from server or rack failures)
…
C5
D1
C0
D0 C1
C2
D0
C5
C1
C2
C5
Chunk server 1 Chunk server 2 Chunk server 3
C2
Chunk server N
C3
Master node is a metadata file used to find the chunks of a file. Master node is also replicated.
Directory for the file system is also replicated
Client library for accessing files (talks to master, connects to chunk servers)
47
Distributed file system: Hadoop HDFS
In HDFS: Chunks=blocks, Master node= Namenode, Chunkserver=Datanode
48
Distributed file system HDFS read path
Client library
(1) Get block locations
(4) Block locations sorted by the distance to the client
(2) Does the file exist? (3) Does the client have read permission?
Namenode
Metadata {file.txt Block A: 1,3,5 Block B: 2,4,5 ,,,}
(5) Datanodes
stream data
(if a server becomes unavailable, the client can read another replica on a different server)
…
C2
C0
C5
Datanode 1
Datanode N
C1
C2
49
Distributed file system HDFS write path (1/2)
Client library
(2) Does the file exists (3) Does the client have write permission?
Namenode
Metadata {file.txt Chunk A: 1,3,5 Chunk B: 2,4,5 ,,,}
(4) Outputstream object (5) Data split into packets and added into queue
(6) Namenode allocates
new blocks
C0
Datanode 1
Datanode N
…
C0
C5
C1
C2
D0
C5
C2
50
Distributed file system HDFS write path (2/2)
Client library
(7) Connection with Datanodes
(9) Client closes outputstream and sends request to
close the file
C0
Datanode 1
Datanode N
(8) Datanode 1 consumes data from the queue, writes to aDatanode 2 (for replication), which sends acknowledgement to Datanode 1, …, Datanode 1 sends ack. to client
51
…
C0
Namenode
Metadata {file.txt Chunk A: 1,3,5 Chunk B: 2,4,5 ,,,}
D0
C5
C2
C5
C1
C2
MapReduce
Overview
Map task: extract something you care about
Gets chunks from the distributed file system
Turns chunk into key-value pairs
Code tells what chunks to get and what key-value pairs to create
Group by key: sort and shuffle
For each key, a pair (key,list_of_values) is created Performed by the system
Reduce task: Aggregate, summarize, filter or transform Combines all the values associated with a key
Code tells how to combine the values
Write the combined output
52
MapReduce
Map and Reduce as functions specified by the programmer
𝑀𝑎𝑝𝑘,𝑣 →<𝑘′,𝑣′>∗
Input: a key-value pair
Output: a possibly empty list of key-value pairs One Map call for every different (k,v) pair
𝑅𝑒𝑑𝑢𝑐𝑒 𝑘′,<𝑣′ >∗ →<𝑘′,𝑣′′>∗
Input: a key and a possibly empty list of its associated values Output: A possibly empty list of key-value pairs
One Reduce function for every different key k’
53
Map-Reduce: Environment
Map-Reduce environment takes care of:
Partitioning the input data
Scheduling the program’s execution across a
set of machines
Performing the group by key step Handling machine failures
Managing required inter-machine communication
54
MapReduce dealing with failures
Master periodically pings the workers for failures Map worker failure
Map tasks completed or in-progress at worker are reset to idle
Reduce workers are notified when task is rescheduled on another worker
Reduce worker failure
Only in-progress tasks are reset to idle
Reduce task is restarted
Master failure
MapReduce task is aborted and client is notified
55
Refinement: Combiners
Often a Map task will produce many pairs of the form (k,v1), (k,v2), … for the same key k
E.g., popular words in the word count example
Can save network time by pre-aggregating values in the mapper:
combine(k, list(v1))v2
Combiner is usually same
as the reduce function
Works only if reduce
function is commutative and associative
56
Refinement: Combiners
Back to our word counting example:
Combiner combines the values of all keys of a
single mapper (single machine):
Much less data needs to be copied and shuffled!
57
Refinement: Partitioning Function
Want to control how keys get partitioned
Inputs to map tasks are created by contiguous
splits of input file
Reduce needs to ensure that records with the same intermediate key end up at the same worker
System uses a default partition function:
hash(key) mod R – aims to create
“well-balanced” partitions
Sometimes useful to override the hash function:
E.g., hash(hostname(URL)) mod R to put all URLs from a host in the same output file
58
MapReduce with python
mr_word_count.py Empty key and
a line as value
Returns three key-value pairs
For each key,
returns the sum of its values
59
MapReduce with python
Numerical summarization: Problem 1
Parses each line of the input
emits (key, value) pair as (url,1)
Receives pairs (key, list_of_values) grouped by key
For each pair, sums its values to compute its count
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] if year==‘2014’:
yield url,1
def reducer(self, key, list_of_values):
yield key, sum(list_of_values)
if __name__ ==‘__main__’: Mrmyjob.run()
Recall .strip() removes leading and trailing whitespace characters
60
MapReduce with python
Numerical summarization: for count, max, min, avg
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
month
Problem 2: Most visited page in each month of 2014.
“Page visited” means URL appears in Log file
Most visited means maximum number of times (occurrences of a URL)
61
MapReduce with python
Numerical summarization: Problem 2
def reducer1(self, key, list_of_values):
from mrjob.job import MRJob from mrjob.step import MRStep class Mrmyjob(MRJob):
def mapper1(self, _, line): data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] month=date[5:7]
if year==‘2014’:
yield (month, url),1
yield key[0], (sum(list_of_values), key[1]) (key, value) as (month, (count, url))
def reducer2(self, key, list_of_values): yield key, max(list_of_values)
Receives: for each month, the count of each url Emits: for each month, the url and count of visits, for
the most visited url in the month
continues on the next slide
(key, value) as ((month,url),1)
62
MapReduce with python
Numerical summarization: Problem 2
from mrjob.job import MRJob from mrjob.step import MRStep
class Mrmyjob(MRJob):
def mapper1(self, _, line):
data=line.split() …
yield (month, url),1
def reducer1(self, key, list_of_values): …
def reducer2(self, key, list_of_values): …
def steps(self):
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1), MRStep(reducer=self.reducer2)]
if __name__ ==‘__main__’: Mrmyjob.run()
mrjob by default uses a single MapReduce job to execute the program. We redefine steps() to create two jobs, each defined with MRStep().
Note 3.2 in Bagha’s book uses the deprecated self.mr() method instead of MRStep
https://pythonhosted.org/mrjob/job.html#multi-step-jobs
63
MapReduce with python
Numerical summarization: Problem 2
2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-07 21:02:03 http://www.cnn.com 120.62.102.124 7
[cloudera@quickstart Desktop]$ python summarization.py toy_log.txt
Using configs in /etc/mrjob.conf
Creating temp directory /tmp/summarization.cloudera.20170208.120451.098082
Running step 1 of 2…
Running step 2 of 2…
Streaming final output from /tmp/summarization.cloudera.20170208.120451.098082/output…
“01” [1, “http://www.google.com”]
“02” [1, “http://www.google.com”]
“06” [2, “http://www.cnn2.com”]
Removing temp directory /tmp/summarization.cloudera.20170208.120451.098082…
Input file: toy_log.txt
64
MapReduce with python
Filtering: for getting out (i.e., retrieve) a subset of data
Example: Log file (Date, Time, URL, IP, visit_length) 2014-12-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-12-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 3: Retrieve all page visits for the page http://www.google.com in Dec 2014.
Example output
http://www.google.com , (2014-12-01 20:01:02 77.32.11.111 3) http://www.google.com , (2014-12-02 23:01:08 77.32.11.111 7)
65
MapReduce with python
Filtering: Problem 3 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=date[5:7]
Parses each line of the input emits (key, value) pair as (url,(date, …, visit_length))
There is no reducer.
if year==‘2014’ and month==‘12’ and url==‘http://www.google.com’: yield url,(date, time, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
66
MapReduce with python
Distinct: for getting out (i.e., retrieve) distinct values
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 4: Retrieve all distinct IP addresses
Example output
77.32.11.111, None 50.62.22.123, None 120.62.102.124, None
67
MapReduce with python
Distinct: Problem 4 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) yield ip, None
def reducer(self, key, list_of_values): yield key, None
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (ip,None)
Receives (key, None) and emits distinct keys
Question: Is reducer necessary here? Why?
68
MapReduce with python
Binning: for partitioning records into bins (i.e., categories)
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 5: Partition records of the Log file based on the quarter of 2014 in which the pages have been visited. Note quarters 1,2,3,4 are denoted with Q1, Q2, Q3, Q4.
Example output
Q1, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3) Q1, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7) Q2, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7)
69
MapReduce with python
Binning: Problem 5
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4]
month=int(date[5:7]) if year==‘2014’:
if month<=3:
yield ”Q1”, (date, time, url, ip, visit_length)
elif month<=6:
yield ”Q2”, (date, time, url, ip, visit_length)
elif month<=9:
yield ”Q3”, (date, time, url, ip, visit_length)
else:
yield ”Q4”, (date, time, url, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (Quarter_id, record)
where Quarter_id depends on the month of 2014
(e.g., Q1 for month<=3)
int(date[5:7]) because we use month in if, elseif,else
70
MapReduce with python
Sorting: for sorting with respect to a particular key
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
...
Problem 6: Sort the pages visited in 2014, with respect to their visit length in decreasing order
Example output
7, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111) 7, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124) 3, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111)
71
MapReduce with python
Sorting: Problem 6
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4]
month=int(date[5:7]) if year==‘2014’:
Parses each line of the input emits (key, value) pair as (None, tuple of tuples)
(e.g., None, (7, (2014-01-01,...))
By default, list_of_values in reducer is
generator, and list() converts it into list. reverse=True imposes decreasing order.
yield None, (visit_length, (date, time, url, ip))
def reducer(self, key, list_of_values): list_of_values=sorted(list(list_of_values), reverse=True) yield None, list_of_values
if __name__ ==‘__main__’: Mrmyjob.run()
72
Key/value databases
Store key-value pairs
Keys are unique
The values are only retrieved via keys and are opaque to the database
Key
Value
111
John Smth
324
Binary representation of image
567
XML file
Key-value pairs are organized into collections (a.k.a buckets)
Data are partitioned across nodes by keys
The partition for a key is determined by hashing the key
73
Key/value databases
Pros: very fast
simple model
able to scale horizontally
Cons:
- many data structures (objects) can't be easily modeled as key value pairs
74
Key/value databases
Suitable when:
Unstructured data
Fast read/writes
Key suffices for identifying value (“1-1” mapping) No dependences among values
Simple insert/delete/select operation
Unsuitable when:
Operations (search, filter, update) on individual
attributes of the value
Operations on multiple keys in a single transaction
75
Document databases
Store documents in a semi-structured form
A document is a nested structure in JSON or XML format
JSON
{
name : “Joe Smith”, title : “Mr”, Address : {
address1 : “Dept. of Informatics”, address2 : “Strand”,
postcode : “WC2 1ER”,
}
expertise: [ “MongoDB”, “Python”, “Javascript” ], employee_number : 320,
location : [ 53.34, -6.26 ]
}
Unordered name-value pairs
Ordered collection of values
76
Document databases
Store documents in a semi-structured form
A document is a nested structure in JSON or XML format
XML
…
http://www.w3schools.com/xml/
77
Document vs. key-value databases
1. In document databases, each document has a unique key
2. Document databases provide more support for value operations
They are aware of values
A select operation can retrieve fields or parts of a value
e.g., regular expression on a value
A subset of values can be updated together
Indexes are supported
Each document has a schema that can be inferred from the structure of the value
e.g., store two employees one of which does not have address3
78
Document databases
Suitable when:
Semi-structured data with flat or nested schema
Search on different values of document
Updates on subsets of values
CRUD operations (Create, Read, Update, Delete) Schema changes are likely
Unsuitable when:
Binary data
Updates on multiple documents in a single transaction Joins between multiple documents
79
Column-family databases
Store columns. Each column has a name and value. Group related columns in a row
A row does not necessarily have fixed schema or number of columns Difference in storage layout
RDBMS
Column Database
ID
Name
Salary
1
Joe D
$24000
2
Peter J
$28000
3
Phil G
$23000
1
Joe D
$24000
2
Peter J
$28000
3
Phil G
$23000
1
2
3
Joe D
Peter J
Phil G
$24000
$28000
$23000
Column-family databases
Suitable when:
Data has tabular structure with many columns and sparsely
populated rows (i.e., high-dimensional matrix with many 0s)
Columns are interrelated and accessed together often
OLAP
Realtime random read-write is needed
Insert/select/update/delete operations
Unsuitable when: Joins
ACID support is needed
Binary data
SQL-compliant queries
Frequently changing query patterns that lead to column
restructuring
81
Column-family databases
Examples of applications:
82
Graph databases
Store graph-structured data
Nodes represent the entities and have a set of attributes
Edges represent the relationships and have a set of attributes
83
Graph databases
Optimized for representing connections
Can add/remove edges and their attributes easily
Bob becomes friend of Grace
84
Graph databases
Designed to allow efficient queries for finding interconnected nodes based on node and/or edge attributes
Find all friends of each node
Scalable w.r.t. nodes because each node knows its neighbors
Find all friends of friends of Jim who are not his colleagues
85
Graph databases
Suitable when:
Data comprised of interconnected entities Queries are based on entity relationships
Finding groups of interconnected entities Finding distances between entities
Unsuitable when: Joins
ACID support is needed
Binary data
SQL-compliant queries
Frequently changing query patterns that lead to column
restructuring
86
MongoDB
A document database
Hash-based
Stores hashes (system-assigned _id) with keys and values
for each document
Dynamic schema
No Data Definition Language
Application tracks the schema and mapping
Uses BSON (Binary JSON) format
APIs for many languages
JavaScript, Python, Ruby, Perl, Java,
Scala, C#, C++, Haskell, …
https://www.mongodb.com/ 87
Index support
B+ tree indices, GeoSpatial indices, text indices
Index created by the system on _id. Can be viewed by
db.system.indexes.find()
Users can create other indices db.phones.ensureIndex(
{ display : 1 },
{ unique : true, dropDups : true } )
and compound indices
db.products.createIndex( { “item”: 1, “stock”: 1 } ) https://docs.mongodb.com/v3.4/indexes/ 88
Only unique values
in “display” field duplicates are dropped
Replication
Multiple replicas (dataset copies) are stored
Provides scalability (distributed operations), availability (due to
redundancy), and fault tolerance (automatic failover)
Replica set: group of <=50 mongod instances that contain the same copy of the dataset
Primary instance: receives operation requests
Secondary instances: apply the operations to
their data
Automatic failover
When a primary does not communicate with its secondaries for >10 seconds, a secondary becomes primary after elections and voting
https://docs.mongodb.com/v3.4/replication/ 89
Sharding: overview
Sharding is the process of horizontally partitioning the dataset into parts (shards) that are distributed across multiple nodes
Each node is responsible only for its shard
Interface between applications and sharded cluster. Processes all requests
Decides how the query is distributed based on metadata from config server
Stores metadata and configuration settings for the cluster
To benefit from replication, shards and config server may be implemented as replica sets
https://docs.mongodb.com/v3.4/sharding/ 90
Sharding: benefits
Efficient reads and writes: They are distributed across the shards.
More shards, higher efficiency
Storage capacity: Each shard has a part of the dataset
More shards, increased capacity
High availability: Partial read / write operations performed if shards are unavailable. Reads or writes directed at the available shards can still succeed.
More shards, less chance of a shard to be unavailable
https://docs.mongodb.com/v3.4/sharding/
91
Fast in-place updates
In-place update: Modifies a document without growing its size
Increases stock by 5 and changes item name
{
_id: 1,
item: “book1” stock: 0,
}
db.books.update( { _id: 1 },
{
$inc: { stock: 5 }, $set: {
item: “ABC123“} }
)
In-place updates are fast because the database does not have to allocate and write a full new copy of the object.
Multiple updates are written one time (every few seconds). This lazy writing strategy helps efficiency, when updates are frequent (e.g., 1000 increments to stock)
92
What is Apache Spark?
Unified engine for distributed data processing
Spark extends MapReduce programming model with an
abstraction that allows efficient data reuse
User’s program
Data processing libraries
STORAGE
HDFS, local filesystem, Hbase, …
Cluster management
Apache Mesos, YARN, Standalone
93
Motivation for Spark
Map Reduce is inefficient for applications that reuse intermediate results across multiple computations.
Example
PageRank
The output of a MR job in iteration i, is the input of another MR job in iteration i+1. And i can be very large.
k-means clustering
Logistic Regression
94
Motivation for Spark
Map Reduce is inefficient for interactive data mining (multiple ad-hoc queries on the same data)
Example
Web log queries to find total views of
(1) all pages,
(2) pages with titles exactly matching a given word, and (3) pages with titles partially matching a word.
95
Motivation for Spark
Hadoop is inefficient because it writes to distributed file system (HDFS) Overhead due to data replication, disk I/O, and serialization
Frameworks to address the inefficiency support specific computation patterns
Pregel: Iterative graph computations
HaLoop: Iterative MapReduce interface
but not generic data reuse (e.g., a user cannot load all logs in memory and perform interactive data mining on them)
96
Spark
Resilient Distributed Datasets (RDDs) enable efficient data reuse
An RDD is a read-only, fault-tolerant collection of records that can be
operated in parallel
An RDD is a data structure that serves as the core unit of data
User program can manipulate it, control its partitioning, make it
persistent in memory.
97
Overview of RDDs
Do not need to be materialized
Each RDD knows its lineage (how it was derived from other datasets)
and can compute its partitions from data in stable storage
Only RDDs that can be reconstructed after failure can be referenced by a user’s program
Persistence
Users can indicate which RDDs they will reuse and choose a storage strategy
for them (e.g., in-memory storage)
Partitioning
Users can specify how records of an RDD are partitioned across machines
based on a key in each record
98
Overview of RDDs
Persist method to implement persistence
Persistent RDDs are stored in RAM by default
If there is not enough RAM, they are spilled to disk
Users can request other storage strategies
Storing an RDD only on disk
Replicating an RDD across machines, through
flags to persist.
Set a persistence priority on each RDD to specify
which in-memory data should spill to disk first.
99
Applications for RDD
Suitable for batch applications that perform the same operation to the entire dataset
RDDs remember each transformation as one step in a lineage graph RDDs can recover lost partitions efficiently
Not suitable for applications that make asynchronous fine-grained updates to shared state
e.g., storage system for a web application in which many users update values on the same table
100
pySpark in shell
Parallelized collections
Created by the parallelize method of sc on an existing iterable or collection d The elements of d are copied to form a distributed dataset that can be
operated on in parallel
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d)
The number of partitions of the parallelized collection is determined automatically based on the cluster, but it can be set up as a second optional parameter
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d,10)
101
pySpark in shell
count
Returns the number of elements in this RDD
filter
Argument: a function that acts as a filter
lambda is a type of function with no specific name
Output: a new RDD with elements that pass the filter (i.e., those on which the
function returns true)
>>> tf = sc.textFile(“file:///usr/share/dict/words”) >>> lines_nonempty=tf.filter( lambda x: len(x) > 0) >>> lines_nonempty.count()
Same output with $cat /usr/share/dict/words |wc
102
pySpark in shell
map
Argument: a function f
Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them collect
returns all the elements of the dataset as an array at the driver program
>>> nums = sc.parallelize([1, 2, 3, 4])
>>> squared = nums.map(lambda x: x * x).collect() >>> for num in squared:
print(num,”,”) Output: 1,4,9,16
103
pySpark in shell
map
Argument: a function f
Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them (each element passed through f) flatMap
Output: similar to map but returns an RDD of the elements >>> x = sc.parallelize([“a b”, “c d”])
>>> y = x.map(lambda x: x.split(‘ ‘))
>>> y.collect()
[[‘a’, ‘b’], [‘c’, ‘d’]]
>>> y = x.flatMap(lambda x: x.split(‘ ‘)) >>> y.collect()
[‘a‘,’b’,’c’,’d’]
104
pySpark in shell
sample(withReplacement, fraction, [seed])
Output: A sample of size fraction*100% of the data, with or without
replacement, using a given random number generator seed
rddA.union(rddB)
rddA.intersection(rddB)
rddA.subtract(rddB)
rddA.cartesian(rddB)
rddA.join(rddB,[number of reduce tasks])
When called on datasets of type (K, V) and (K, W), where K is a key and V, W values, it returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key
>>> x = sc.parallelize([(‘a’,1),(‘b’,2)])
>>> y = sc.parallelize([(‘b’,3),(‘d’,4)])
>>> x.join(y).collect()
[(‘b’, (2,3))]
105
pySpark in shell
Reduce
Arguments: Two elements of the same type in an RDD, a commutative and
associative function f
Output: A new element of the same type as that of the arguments, which is
the result of applying f to the elements
Fold
Same as reduce but with a first identity (“zero”) argument of the type we want
to return
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> sum = rdd.reduce(lambda x,y: x+y)
>>> sum
Output: 15
>> sum2=rdd.fold(0.0,lambda x,y: x+y) >> sum2
Output: 15.0
106
pySpark in shell
Accumulators
Variables that are only “added” to through an associative and commutative
operation and can therefore be efficiently supported in parallel.
Created from an initial value v by calling SparkContext.accumulator(v). Tasks can add y to them using add(y)
Only the driver program can read the accumulator’s value, using value
>>>accum = sc.accumulator(0)
>>> accum
Output: Accumulator
>>>sc.parallelize([1, 2, 3, 4,5]).foreach(lambda x: accum.add(x)) >> accum.value
Output: 15
107
pySpark in shell
persist
Sets the storage level of an RDD to persist. Useful if it will be reused.
cache
Sets the storage level to MEMORY_ONLY
Deserialized: Keeps data in memory in a serialized format, for space efficiency Replication: Replicates RDD partitions on 1 or 2 cluster nodes
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER
True
True
False
1
108
HIVE
Motivation
Data warehousing
Not scalable enough
Prohibitively expensive at web scale
Up to $200K/TB
Little control over execution method Query optimization is hard
Parallel environment Little or no statistics
Lots of UDFs
109
HIVE
Motivation
Can MapReduce help?
MapReduce is scalable
Only parallelizable operations No transactions
MapReduce runs on cheap commodity hardware Clusters, cloud
MapReduce is procedural
A pipe for processing data
110
HIVE
Motivation
Can MapReduce help?
But
1. Data flow is rigid (one-input, two-stage)
and is complex to change (workarounds below)
111
HIVE
Motivation
Can MapReduce help?
But
2. custom code has to be written for even the most common operations
e.g., join, sorting, distinct (Recall MR patterns)
not all users willing/able to do it
3. Map and reduce functions are opaque difficult to maintain, extend, optimize
112
HIVE
Motivation
Can MapReduce help?
Main limitation of MapReduce that HIVE tries to address
The map-reduce programming model is very low level and requires developers to write custom programs which are hard to maintain and reuse.
113
HIVE
Motivation
Can MapReduce help?
Solution
Develop higher-level data processing languages
that “compile down” to Hadoop jobs
HIVE, PIG
114
HIVE
Intuition
* Make the unstructured data look like tables
regardless of how it really lays out
* SQL based query can be directly against these tables * Generate specific execution plan for this query
What is HIVE?
*An open-source data warehousing solution built on top of Hadoop, which supports queries that are
(i) expressed in HiveQL (an SQL-like declarative language) (ii) compiled into MapReduce jobs that are executed
using Hadoop
115
HIVE
Applications
• Log processing
• Text mining
• Document indexing
• Customer-facing business intelligence (Google analytics)
• Predictive modeling
• Hypothesis testing
Slide adapted from:
http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
116
HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
117
HIVE
Components of HIVE
* Optimizer:
Optimizes the tasks (improves HiveQL)
* Executor:
(i) executes the tasks in the right order (ii) Interacts with Hadoop
* Metastore:
(i) Acts as the system catalog. That is,
it stores information about tables, partitions, locations in HDFS etc.
(ii) It runs on an RDBMS. Not on HDFS, because it needs to have very low latency.
* Thrift Server:
Provides interfact between clients and Metastore (so that clients can query or Modify the information in Metastore)
118
HIVE
Data model: comprised of the following data units
* Table: each table consists of a number of rows, and each row consists of a specified
number of columns
Basic column types: int, float, boolean Complex column types:
List
Arbitrary data types:
(i) They are created by composing basic & complex types
list
HIVE
Data model: comprised of the following data units
* Partition: the result of decomposing a table into partitions, based on values
Creating a partitioned table
– The partitioning columns are not part of table data but behave like regular columns Adding new partition to partitioned table
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
120
HIVE
Data model: comprised of the following data units * Bucket: the result of decomposing a table into buckets
– bucketing is useful when partitions are too many and small (Hive will not create them)
Bucketing is useful for sampling
e.g., a 1/96 sample can be generated efficiently by looking at the first bucket
121
HIVE
The mapping of data units into the HDFS name space
• A table is stored in a directory in HDFS
• A partition of the table is stored in a subdirectory within a table’s HDFS directory
• A bucket is stored in a file within the partition’s or table’s directory depending on whether the table is a partitioned table or not.
TableHDFS directory
Partitionssubdirectory of the table’s HDFS directory Bucketsfile within dir or subdir
Warehouse root directory
Example:
table test_part mapped to /user/hive/warehouse/test_part partition(ds=‘2009-02-02’,hr=11) of test_part mapped to
/user/hive/warehouse/test_part/ds=2009-02-02/hr=11 bucket1 mapped to file bucket1 in /user/hive/…. /hr=11
122
HIVE
How the mapping helps efficiency
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
Hive prunes the data by scanning only the required sub-directories
Bucketing is useful for sampling
Hive uses the file corresponding to a bucket
123
HIVE
Pros
A easy way to process large scale data
Support SQL-based queries
Provide more user defined interfaces to extend
Programmability
Efficient execution plans for performance
Interoperability with other database
124
HIVE
Cons
No easy way to append data
Files in HDFS are immutable
125
SparkSQL
Spark SQL
A new module in Apache Spark that integrates relational processing with Spark’s functional programming API.
It lets Spark programmers leverage the benefits of relational processing (e.g., declarative queries and optimized storage)
• It lets SQL users call complex analytics libraries in Spark (e.g., machine learning).
Part of the core distribution since April 2014
Runs SQL / HiveQL queries, optionally alongside or replacing existing Hive deployments
126
SparkSQL
Motivation
SparkSQL tries to bridge the two models
Main goal: Extend relational processing to cover native RDDs in Spark and a much wider range of data sources, by:
1. Supporting relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer-friendly API.
2. Providing high performance using established DBMS techniques.
3. Easily supporting new data sources, including semi-structured data and external databases amenable to query federation.
4. Enabling extension with advanced analytics algorithms such as graph processing and machine learning.
127
SparkSQL
Motivation
SparkSQL tries to bridge the two models with:
1. A DataFrame API that can perform relational operations on both external data sources and Spark’s built-in RDDs.
2. A highly extensible optimizer, Catalyst, that uses features of Scala to add composable rule, control code gen., and define extensions.
128
SparkSQL
DataFrame
A distributed collection of rows with the same schema
Similar to a table in RDBMS
Can be constructed from external data sources or RDDs
Support relational operators (e.g. where, groupby) as well as Spark operations.
Evaluated lazily:
each DataFrame objects represents a logical plan to compute a
dataset but no execution occurs until user calls an output operation
This enables optimization
129
SparkSQL
Data Model
SparkSQL uses a nested data model based on HIVE
SparkSQL supports different data types:
primitive SQL types: boolean, integer, double, decimal, string, data, timestamp
complex types: structs, arrays, maps, and unions
user defined types
Supporting these data types allows modeling data from
HIVE
RDBMS
JSON
Native objects in JAVA/Python/Scala
Data Model
130
Motivation
So far, all our data was available when we wanted it
BUT
we may not know the entire data set in advance
data may have high velocity, so that we cannot store
all data and then access them
In domains such as
web click-stream data
computer network monitoring data telecommunication connection data readings from sensor networks
stock quotes
131
Data stream definition
Data stream
An ordered and potentially infinite sequence of
elements
An element (a.k.a data point) can be:
event “page A visited” in web click-stream data
stock price “10.3” in stock quote data
customer record “[item, qty, price]” in marketing data graph in social network data streams
…
132
Data stream example 2
HTTP server log
Report number of distinct accessed page every hour Report the most frequently accessed pages every hour
Report average view time of a page every hour
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
133
Properties of data streams
1. Potentially infinite
• Transient (stream might not be realized on disk)
• Single pass over the data
• Only summaries can be stored
• Real-time processing (in main memory)
2. Data streams are not static
• Incremental updates • Concept drift
• Forgetting old data
3. Temporal order may be important
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
134
Data Stream Management System (DSMS)
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Streams entering the DSMS
Processor
Often have different velocity (arrival rate) and data types
135
Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
Archives streams
Not for querying due to
very low efficiency of data retrieval
136
Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
* Storage space in disk or in main memory
* Used for answering queries
* Cannot store all data
Limited Working Storage
137
Data stream management system
permanently executing and produce outputs at appropriate times
Standing Queries
Example of standing query
“output an alert, when the variable TEMPERATURE exceeds 25”
Archival Storage
Processor
Limited Working Storage
138
Data stream management system
Ad-Hoc Queries
Standing Queries
Processor
Example of ad-hoc query
“list the total number of transactions
in the last hour”
Output
Query answers
Archival Storage
Limited Working Storage
139
asked once about the current state of a stream or streams
Common queries
Sampling data from a stream
Construct a random sample (i.e., a random subset
of the stream)
Queries over sliding windows
Number of items of type x in the last k elements of the stream
140
Common queries
Filtering a data stream
Select elements with property x from the stream
Counting distinct elements
Number of distinct elements in the last k elements
of the stream
Estimating frequency moments
Estimate avg./std.dev. of last k elements
Finding frequent elements
141
Common applications of these queries
Mining query streams
Google wants to know what queries are
more frequent today than yesterday
Mining click streams
Yahoo wants to know which of its pages are
getting an unusual number of hits in the past hour
Mining social network news feeds
E.g., look for trending topics on Twitter, Facebook
142
Sampling from data stream
Construct a sample from a data stream such that we can ask queries on the sample and get answers that are statistically representative of the stream as a whole
Example application
Stream from search query engine
(u1, q1, t1), (u1, q1, t2), (u2, q2, t3),…, user query time
What fraction of the typical user’s queries were repeated over the lat month?
Cannot store more than 1/10th of the stream
143
Sampling from data stream
Naïve solution for the example application
Generate a random integer in [0..9] for each query
Store the tuple (u,q,t) if the integer is 0, otherwise discard
For users with many queries, 1/10th of their queries will be in the sample
Suppose each user issues x queries once and d queries twice. Queries issued twice will be “duplicate queries”.
What is the fraction of duplicate queries in the stream by an average user?
𝒅 𝒙+𝒅
Stream=[q1,q1,q2,q2,q3] x=1,d=2, fraction of duplicates is 2/3
Is the naïve solution good for answering this query?
144
Sampling from data stream
Fraction of duplicate queries, based on sample?
Out of x singleton queries, x/10 will appear in the sample
This assumes that I have many singletons and happens because I sample
1/10 of the elements (each singleton is an element)
Duplicate query twice in the sample with probability 1/100
Pr(1st occurrence in sample)*Pr(2nd occurrence in sample)=1/10*1/10=1/100
This is because when I have a duplicate query q1q1, to include
it in the sample, I need to include both q1s and each q1 is sampled with probability 1/10.
Out of d duplicate queries, 𝑑 ∗ 1 will appear twice in the sample 100
145
Sampling from data stream
Fraction of duplicate queries, based on sample?
Duplicate query once in the sample with probability 18/100 Pr(1st occ. in sample)*Pr(2nd occ. not in sample) +
Pr(2nd occurrence in sample)*Pr(1st occurrence not in sample)= =1/10*(1-1/10) + 1/10*(1-1/10) =1/10*9/10+1/10*9/10=18/100
This is because when I have a duplicate query q1q1 I can miss it in the sample by: (I) not sampling the second q1 (red arrow), OR (II) by not sampling the first q1 (yellow arrow)
Out of d duplicate queries, 𝑑 ∗ 18 will appear once in the sample 100
𝑑∗1 𝒅 𝒅
The sample-based answer is
100 = ≠
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝟏𝟎𝒙+𝟏𝟗𝒅
𝒙+𝒅
146
Sampling from data stream
How bad is the sample-based answer?
The sample-based answer is
𝑑∗ 1
100 =
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
The answer based on the entire stream is 𝒅
• For x=1000, d=300 𝒙+𝒅
• Sample-based answer 0.019 = 1.9%
• Answer based on entire stream 0.231 = 23.1%. 12.1 times larger
• For x=1000, d=50
• Sample based answer 0.0046= 0.46%
• Answer based on entire stream 0.0476= 4.76%. 10.4 times larger
WHAT WENT SO WRONG?
147
Sampling from data stream
Each user issues x queries once and d queries twice
What is the fraction of duplicate queries in the stream by an average user?
Main idea: We want avg user, and we sampled queries
Solution
Select 1/10th of users (assuming this is <=10% of the stream) For each such user, store all their queries in the sample
To get 1/10th of users
use a hash function h: user{1,2,...,10}
If h(user)=1, we accept their query, otherwise we discard it
Then, count the duplicates in the sample: d/(x+d), for each user
148
Sampling from data stream
General problem
Stream of tuples with keys:
Key is some subset of each tuple’s components e.g., tuple is (user, search, time); key is user
Choice of key depends on application
To get a sample of a/b fraction of the stream:
Hash each tuple’s key uniformly into b buckets
Pick the tuple if its hash value is in {1,..., a}
* How to generate a 30% sample?
* What if more and more new users come, and we have a budget on the
number of tuples that we can store?
149
Sampling from data stream
General problem
To get a (uniform) sample of s tuples from a stream
(e1,e2,...) that contains >s tuples
Reservoir sampling
Add the first s tuples from the stream into a reservoir R
For j>s
With probability 𝑠 replace a random entry of R with the j-th tuple 𝑗
of the stream (i.e., newcomer replaces random old one)
At j=n, return R
Does this work?
We need to prove that after seeing n tuples, R contains each tuple
seen so far with probability s/n
150
Sampling from data stream
Proof by induction for s=1 (exercise for given s>=1)
For j=1, the tuple is in the sample with probability 1 = 1 = 1
𝑗1
Assume that for j=n, each tuple is in the sample with prob. 1 𝑛
We need to show that for j=n+1, each tuple is in the sample with
probability 1 𝑛+1
151
Sampling from data stream
Proof by induction for s=1 (exercise for given s>=1) For j=n+1
The tuple j=n+1 is selected with probability 1
𝑛+1
(the for in the algorithm does this)
If this tuple is selected, the old tuple in R is replaced with prob. 1
Thus, the prob. that an old tuple is replaced is 1 and not replaced with prob. 1 − 1 𝑛+1
𝑛+1
So, R contains an old tuple either because it was added into R and not replaced by j, which happens with prob.
1∗1−1=1 𝑛 𝑛+1 𝑛+1
Probability in the sample by the induction for j=n
Probability NOT replaced
or because it was added in the latest round: prob 1 QED 𝑛+1
152
Today
More algorithms for streams:
(1) Filtering a data stream: Bloom filters
Select elements with property x from stream
(2) Counting distinct elements: Flajolet-Martin
Number of distinct elements in the last k elements of the stream
(3) Estimating moments: AMS method Estimate std. dev. of last k elements
153
Bloom filter
Probabilistic (approximate) set membership test
Given a set S = {x1,x2,…,xn}, is y in S ?
Construct a data structure to answer that is:
Fast (Faster than searching through S).
Small (Smaller than explicit representation).
To obtain speed and size improvements, allow some probability of error.
False positives: y S but we report y S False negatives: y S but we report y S
Bloom filter
Data structure for probabilistic set membership testing Zero false negatives
Nonzero false positives
154
Bloom filter
Given a set of keys S that we want to filter
1. Create a bit array B of n bits, initially all 0
2. Choose k hash functions h1 ,…, hk with range [0,n)
3. Hash each member of s S
use k hash functions, h𝑖 𝑠 , 𝑖 ∈ 1, 𝑘 , which map s into random numbers
uniform in [1, n] (need modulo n if the hash function outputs larger numbers)
set the elements 𝐵 h1 𝑠 , … , 𝐵 h𝑘 𝑠 to 1
4. When a stream element with key y arrives use k hash functions
if 𝐵 h1 𝑦 , … , 𝐵 h𝑘 𝑦 are all 1, output y else discard y
155
Bloom filter
Bloom filter example Keys in S: a, b, y, l
Stream keys q, z
Tarkoma et al. Theory and practice of bloom filters for distributed systems. DOI: 10.1109/SURV.2011.031611.00024
156
False positive
Counting distinct elements
Estimate number of distinct elements
Select a hash function h that maps each of N elements to at least log2 N bits
N is the max. number of distinct elements e.g.,a1100 (12inbinary)
Define r(h(a)) as the number of 0s from the right e.g., ah(a)=1100r(h(a))=2
157
Counting distinct elements
Estimate number of distinct elements Flajolet-Martin (FM) sketch
For each element x in stream S Compute r(h(x))
LetR=max𝑟 h 𝑥 𝑥∈𝑆
Return 2𝑹 as the estimated number of distinct elements in S
158
Counting distinct elements
Exercise
Given the stream {3,1,4,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
159
Counting distinct elements
Exercise
Given the stream {3,1,3,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
160
Counting distinct elements
Given the stream {3,1,3,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
Element x
Hash h(x)
r(h(x))
3
1100
2
1
0100
2
3
1100
2
2
1000
3
R=max(2,2,2,3)=3 Estimate is 2^R=8
h(2)=4*2mod32=8 (in binary 1000)
161
Estimating frequency moments
What are frequency moments
The 𝑘-th frequency moment of a stream comprised of N
different types of elements 𝑎1, … , 𝑎𝑁 each appearing 𝑚1, … , 𝑚𝑁 times is defined as 𝑓 = 𝑁 𝑚𝑘
𝑘 𝑖=1 𝑖
𝑓 is the number of distinct elements 0
𝑓 is the total frequency (length of stream) 1
𝑓 computation 2
http://www.tau.ac.il/~nogaa/PDFS/amsz4.pdf
162
Estimating frequency moments
Why they are important
Indicate the degree of data skew, which is useful in many parallel database applications (e.g., determines the selection of algorithms for data
partitioning)
𝑓 is the surprise number S (how uneven is the distribution)
Stream of length 100 11 distinct values
Item counts: 10, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 Surprise S = 910
Item counts: 90, 1, 1, 1, 1, 1, 1, 1 ,1, 1, 1 Surprise S = 8,110
2
http://www.tau.ac.il/~nogaa/PDFS/amsz4.pdf
163
What is it ? (1/3)
Spark SQL
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
164
What is it ? (2/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
Scales to hundreds of nodes
Achieves second-scale latencies
Efficiently recovers from failures
Integrates with batch and interactive processing
165
What is it ? (3/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html 166
Combine batch and stream processing
Combining batch and stream processing is useful Online machine learning
Predict {True, False}, based on stream elements Stream: {product1, product2, product1, ….}
Prediction function F(Stream_window)
Continuously learn and update data models (updateStateByKey and transform)
Combine live data streams with historical data
Generate historical data models with Spark, etc.
Use data models to process live data stream (transform)
Combine batch and stream processing
Combining batch and stream processing is useful Complex Event Processing (CEP) processing
CEP matches continuously incoming events against a pattern Stream: {room_visit, sanitize, room_visit, … }
Pattern: {room_visit, room_visit}
If pattern appears in Stream Then doctor did not sanitize hands, alert!
window-based operations (reduceByWindow, etc.)
DStream Persistence
If a DStream is set to persist at a storage level, then all RDDs generated by it set to the same storage level
When to persist?
If there are multiple transformations / actions on a DStream
If RDDs in a DStream is going to be used multiple times
Window-based DStreams are automatically persisted in memory
DStream Persistence
Default storage level of DStreams is StorageLevel.MEMORY_ONLY_SER (i.e. in memory as serialized bytes)
Except for input DStreams which have StorageLevel.MEMORY_AND_DISK_SER_2
Note the difference from RDD’s default level (no serialization)
Serialization reduces random pauses due to GC providing more consistent job processing times
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER_2
True
True
False
2