程序代写代做代考 kernel data mining algorithm go database html finance distributed system Haskell Java JDBC data science file system hbase Hive graph compiler hadoop cache javascript data structure 7CCSMBDT – Big Data Technologies Week 11

7CCSMBDT – Big Data Technologies Week 11
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
1

Objectives
 Introduce the format of the exam
 Go through the main concepts quickly  Answer questions
2

Exam
 The exam will have a weight 80% (the rest 20% from the two courseworks)
 Format
 We are waiting for formal approval for the format.
 One option (that is not certain yet) is to have an 1 hour, open book exam, with open-ended questions.
 Please wait for announcements
3

Exam
 Examples of types of open-ended questions
 What is X? Briefly discuss
 Choose the best technology for X and justify your choice
 Provide an example of X
 Illustrate the use of X through example (X may be function in Spark, MR, Hive etc.)
 Pros and cons of X
 Describe a method for X
4

Disclaimer
For each question Q in the exam:
 P(Q in the following slides only)<1  P(Q in the slides of the course only)<1 where P denotes a probability. 5 Introduction: Big Data Technologies  Big Data characteristics • Data quantity is substantial and evergrowing • Different storage and processing requirements Volume • Data speed may be high • Elastic and available time for processing Velocity • Multiple formats and types Variety • Bias, noise, abnormalities • Different integration, transformation, processing and storage requirements Veracity Value • Different requirements for removing noise and resolving invalid data • Data usefulness (utility) • Depends on veracity, time of processing, storage & analysis decisions (how?) 6 Introduction: Big Data Technologies  Big Data characteristics in [Your own example] from healthcare, transportation, finance, marketing, web, ... Variety Veracity Value Volume Velocity 7 Data analytics & Big Data  Analytics  Processes, technologies, frameworks to extract meaningful insights from data  Data are filtered, processed, categorized, condensed, and contextualized  Goal is to infer knowledge that makes systems smarter & more efficient  Analytic tasks – The 7 giants  By the National Research council of the US National Academies  7 common classes of tasks for Big Data analysis that help design technologies for Big Data https://www.nap.edu/read/18374/chapter/12 8 Data analytics & Big Data  Analytics overview Basic Statistics Generalized N-body problems Linear algebraic comput. Graph- Optimiz. theoretic comput. Integration Alignment Problems 9 TYPES OF ANALYTICS Descriptive Diagnostic Predictive Prescriptive Data analytics & Big Data  Descriptive analytics  Aim to answer what has happened, based on past data  Performed by reports, alerts  Basic statistics  Mean, Median, Variance, Count, Top-N, Distinct  Challenging to do on streams  Problem: Given a stream 𝐷 = 𝑥 ,...,𝑥 , find the number 𝐹 of distinct items of 𝐷. 1𝑁0 𝐹𝐹 𝑁=10000,𝑟=10%∗𝑁,𝛾=0.5,max 0, 0 =1.76 𝐹𝐹 [Char] http://dl.acm.org/citation.cfm?doid=335168.335230 10 0 0 Data analytics & Big Data  Descriptive analytics  Linear algebraic computations  Linear Regression  Principal Component Analysis (PCA)  Singular Value Decomposition (SVD): Low-rank approximation Problem: Approximate matrix 𝑀 with a matrix 𝑀 of rank k such that 𝑀 − 𝑀 = 𝑚2 − 𝐹 𝑖,𝑗 𝑖𝑗 𝑚2 is minimum 𝑖,𝑗 𝑖𝑗 Solution:𝑀 =𝑈∗𝑑𝑖𝑎𝑔 𝜎 ,...,𝜎 ,0,...,0 ∗𝑉𝑇 𝑘1𝑘 𝐹 Error 𝑀−𝑀 = 𝜎2 +⋯+𝜎2 𝑘+1 𝑟 11 Data analytics & Big Data  Diagnostic analytics  Aims to answer why it has happened, based on past data  Performed by queries, data mining  Linear algebraic computations  Challenge: Large matrices, with “difficult” properties  Generalized N-body problems  Distances, kernels, pairwise similarity, clustering, classification (NN, SVM)  Challenge: high-dimensionality  Graph-theoretic computations  Search, centrality, shortest paths, minimum spanning tree  Challenge: high interconnectivity (complex relationships) 12 Data analytics & Big Data  Generalized N-body problems: Kernel smoothing  Estimating a function using its noisy observations  Kernel 𝐾: a nonnegative, real-valued, integrable function s.t. +∞𝐾𝑢𝑑𝑢=1,𝐾−𝑢 =𝐾𝑢,∀𝑢 −∞  Uniform Kernel 𝐾 𝑢 =  Nadaraya-Watson kernel smoothing 0, 𝑢 < 1 1, 𝑢 ≥ 1  𝑌 = [𝑋 , ... , 𝑋 ] is a probability distribution 1𝑁  A distance 𝑑(𝑋 ,𝑋 ) measures how far is 𝑋 from 𝑋 𝑖𝑗𝑖𝑗  𝑌 is a smoothed distribution where 𝑑𝑋,𝑋 𝑌(𝑋𝑖) = 𝑗=1 h 𝑗 𝑁𝐾 𝑖𝑗⋅𝑌(𝑋) 𝐾 𝑑𝑋,𝑋 𝑖𝑗 h is estimated based on other 𝑌s and those with close values to 𝑋𝑖  𝑌 𝑋𝑖 contribute more to the estimation and h (bandwidth) controls the “smoothing” 13 Data analytics & Big Data  Graph-theoretic computations: Centrality http://cs.brynmawr.edu/Courses/cs380/spring2013/section02/slides/05_Centrality.pdf http://matteo.rionda.to/centrtutorial/ 14 Data analytics & Big Data  Predictive analytics  Aims to answer what is likely to happen, based on existing data  Performed by predictive models trained on existing data  Models are used to learn patterns and trends and predict  The occurrence of an event (classification). E.g., Offer car insurance to a person?  The value of a variable (regression). E.g., What insurance premium to offer?  Linear algebraic computations  Generalized N-body problems  Graph-theoretic computations  Integration  Compute high-dimensional integrals of functions  Alignment problems  Matching (entity resolution, synonyms in text, images referring to the same entity) 15 Data analytics & Big Data  Predictive analytics: classification Training data Classification algorithm Name Age Income Car Insure Anne 22 11K BMW N Bob 33 20K Nissan Y Chris 44 30K Nissan Y Dan 55 50K BMW Y ... ... ... ... ... Classification rules IF Age<30, Income<15K, Car=BMW THEN Insure=N IF Age>30, Income>15K, Car=Nissan THEN Insure=Y
If Age>30, Income>40K THEN Insure=Y
Test data
Emily
22
14K
BMW
?
Insure=N
16

Data analytics & Big Data
 Predictive analytics: entity resolution
Do they refer to the same person?
How likely do they refer to the same person?
Is Amazon reviewer X and Facebook user Y the same person?
https://www.umiacs.umd.edu/~getoor/Tutorials/ER_VLDB2012.pdf
17

Data analytics & Big Data
 Predictive analytics: entity resolution
Training: entity pairs are annotated w.r.t. whether they represent a match or a non-match and a classifier is trained
Application: classifier decides whether two entities match http://dbs.uni-leipzig.de/file/learning_based_er_with_mr.pdf 18

Data analytics & Big Data
 Prescriptive analytics
 Aims to answer what can we do to make something happen, based
on existing data
 Performed by multiple predictive models trained on different data
Example: Electronic health record contains a patient’s diagnosed diseases and prescribed medications over multiple visits to a hospital. What medication to prescribe now for treating a disease?
 Generalized N-body problems
 Graph-theoretic computations
 Alignment problems
 Optimization
 Maximization/minimization problems solved by linear programing, integer programing 19

Data analytics & Big Data
 Prescriptive analytics & optimization
• We want to select a good model and its parameters, among
several possibilities
• Optimization in model training: A core optimization problem, which asks for optimizing the variables or parameters of the problem w.r.t. a selected objective function (e.g., minimize loss) is solved.
• Optimization in model selection and validation: The core optimization problem may be solved many times.
20

Data analytics & Big Data
 Prescriptive analytics: linear programming maximize 𝑐 𝑥 +⋯+𝑐 𝑥
subject to
and
11 𝑛𝑛
𝑎 𝑥 + ⋯ + 𝑎 𝑥 ≤ 𝑏 111 1𝑛𝑛1

𝑎𝑚1𝑥1+⋯+𝑎𝑚𝑛𝑥𝑛 ≤𝑏𝑚 𝑥1 ≥0,…,𝑥𝑛 ≥0
Design a webpage that brings maximum profit
• An image occupies 5% of the webpage and brings profit $0.1
• A link occupies 1% of the webpage and brings profit $0.01
• A video occupies 15% of the webpage and brings profit $0.5
• Use at most 10 images, 25 links, and 2 videos
21

Data analytics & Big Data
 Settings in which analytics are applied:
 Default: the dataset is stored in RAM
 Streaming: data arrive as a stream, a part (window) is stored
 Distributed: data are distributed over multiple machines (in RAM and/or disk)
 Multi-threaded: data are in one machine and multiple processors share the RAM of the machine
22

Data collection
 Data access connectors
 5 common types, each used by several tools
 their choice depends on the type of data source
1. Publish-subscribe messaging (used by Apache Kafka, Amazon Kinesis)
 Publishers send messages to a topic managed by a broker (intermediary)
 Subscribers subscribe to topics
 Broker routes messages from publishers to subscribers
23

Data collection
 Data access connectors
2. Source-sink connectors (used by Apache Flume)
 Source connectors import data from another system (e.g., relational database)
 Sink connectors export data to another system (e.g., HDFS)
3. Database connectors (used by Apache Sqoop)  Import data from DBMS
24

Data collection
 Data access connectors
4. Messaging Queues
(by RabbitMQ, ZeroMQ, AmazonSQS)
 Producers push data to the queues
 Consumers pull the data from the queues
 Producers and consumers do not need to be aware of each other
5. Custom connectors
(used e.g., to collect data from social networks, NoSQL databases, or Internet-of-Things)
 Built based on the data sources and data collection requirements  For Twitter e.g., https://dev.twitter.com/resources/twitter-libraries
25

Analysis types & modes
 Analysis types
 Basic statistics
 Graph analysis
 Classification
 Regression
 Frequent pattern mining …
 Depending on the application, each type can run in
 Batch mode: results updated “infrequently” (after days or months)
 Real-time mode: results updated “frequently” (after few seconds)
 Interactive mode: results updated “on demand” as answer to queries
26

Technologies by analysis mode
 Batch mode
 Hadoop / MapReduce: framework for distributed data processing
 Pig: high-level language to write MapReduce programs
 Spark: cluster computing framework; various data analytics components
 Solr: scalable framework for searching data
 Real-time mode
 Spark Streaming component: for stream processing
 Storm: for stream processing
 Interactive mode
 Hive: data warehousing framework built on HDFS, uses SQL-like
language
 Spark SQL component: SQL-like queries within Spark programs
27

Apache flume
 System for collecting, aggregating, and moving data
 from different sources
server logs, databases, social media, IoT devices
 into a centralized (big) data store distributed file system or NoSQL database
 Advantages over ad-hoc solutions
 Reliable, Scalable, High performance
 Manageable, Customizable
 Low-cost installation, operation and maintenance
Slides based on: Chapter 5.3.1 from Big Data Science & Analytics
A. Prabhakar Planning and Deploying Apache Flume
https://flume.apache.org/FlumeUserGuide.html
28

Apache flume
 Event: a unit of data flow having byte payload and possibly a set of attributes (headers)
Headers
Payload
 Payload is opaque to Flume
 Headers
 can be used for contextual routing
 are an unordered collection of string key-value pairs
29

Apache flume
 Agent: a process that hosts components through which the events flow / transported from a place to another
 Source: receives data from data generators and transfers it to one or more channels
 requires at least one channel to function
 specialized sources for integration with well-known systems
30

Apache flume
 Agent: a process that hosts components through which the events flow / transported from a place to another
 Channel: a transient store which buffers events until consumed by sinks  a channel can work with any number of sources and sinks
 different channels offer different levels of durability (memory, file, db)
31

Apache flume
 Agent: a process that hosts components through which the events flow / transported from a place to another
 Sink: removes events from a channel and transmits them to their next hop destination
 different types of sinks
 requires exactly one channel to function
32

Apache flume
 Data ingest
 Clients send events to agents
 Source(s) operating with the Agent receive events
 Source(s) pass events through Interceptors and if not filtered, puts them on the channel that is identified by channel selector
33

Apache flume
 Data drain
 Sink Processor identifies a sink and invokes it
 Invoked sink takes events from the channel and sends it
to the next hop destination
 If event transmission fails, Sink Processor takes secondary action
34

Apache sqoop
 Sqoop http://sqoop.apache.org/
 Open-source command line tool
 Scoop2 with JAVA API under development
 Imports data from RDBMS into HDFS
 Exports data from HDFS back to RDBMS
 Built-in support for RDBMS: MySQL, PostgreSQL, Oracle, SQL Server, DB2, and Netezza
 Third-party support for many other data stores
35

Apache sqoop
 Sqoop commands
36

Apache sqoop
 Sqoop import command
37

Apache sqoop
 Sqoop import command
The table to be imported is examined
JAVA code is generated (a class for a table, a method for an attribute, plus methods to interact with JDBC)
38

Apache sqoop
 Sqoop import command
id
A1
A2
1

1999
2000


10000
Reading table in parallel
• Split the table horizontally, w.r.t. “split-by” attribute split-by attribute=id
• Use “m” mappers, each retrieves some rows
Select id, A1, A2 from table where id>=0 AND id<2000 Select id, A1, A2 from table where id>=2000 AND id<4000 ... Select id, A1, A2 from table where id>=8000 AND id<10000 39 Apache sqoop  Sqoop import example  Create a mysql database 40 Apache sqoop  Sqoop import example  Create “widgets” table in the database 41 Apache sqoop  Sqoop import example  Import “widgets” table into HDFS %scoop import --connect jdbc:mysql://localhost/hadoopguide \ > –username U –password P –table widgets -m 8 –columns “price,design_date”
%scoop import –connect jdbc:mysql://localhost/hadoopguide \
> –username U –password P –m 8 \
> –query “SELECT * FROM widgets WHERE price>3.00“ –direct
Vendor specific connectors with “–direct”
https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#connectors
42
Number of mappers (level of parallelism)
Uses mysql connector for better performance

Apache sqoop
 Sqoop export
For efficiency, “m” mappers write data in parallel, an INSERT command may transfer multiple rows
Generates JAVA code to parse records from text files and generate INSERT commands
Picks up a strategy for the target table
43

Apache sqoop
 Sqoop export
 We first need to create a mysql table and/or db – why?
 Both char(64) and varchar(64) can hold Java String, but different to the db.
 Then, we use export command
44

ZeroMQ
 A high-performance asynchronous messaging library for distributed or concurrent applications.
 Runs without a dedicated message broker.
 Library for many languages (e.g., C, Java, Python). Can run on any OS.
 zmq_recv() and zmq_send() to receive and send a message. Each task is performed by a node and nodes communicate in different ways (as threads, or processes)
 Can make an application elastic on a large network:
 Routes messages in a variety of patterns (including push/pull, pub/sub)  Queues messages automatically
 Can deal with over-full queues
 No imposed format on messages
 Delivers whole messages
…
45

 Distributed file system
 Provides global file namespace
GoogleGFS,HadoopHDFS, CloudStore
Used when
 files are huge (100s of GBs to TB)  existing files are rarely updated
 reads and appends are common
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
46

 Distributed file system
 Files are divided into chunks that are stored in chunk servers and replicated on different machines or racks (recovery from server or rack failures)

C5
D1
C0
D0 C1
C2
D0
C5
C1
C2
C5
Chunk server 1 Chunk server 2 Chunk server 3
C2
Chunk server N
C3
 Master node is a metadata file used to find the chunks of a file. Master node is also replicated.
 Directory for the file system is also replicated
 Client library for accessing files (talks to master, connects to chunk servers)
47

 Distributed file system: Hadoop HDFS
 In HDFS: Chunks=blocks, Master node= Namenode, Chunkserver=Datanode
48

 Distributed file system  HDFS read path
Client library
(1) Get block locations
(4) Block locations sorted by the distance to the client
(2) Does the file exist? (3) Does the client have read permission?
Namenode
Metadata {file.txt Block A: 1,3,5 Block B: 2,4,5 ,,,}
(5) Datanodes
stream data
(if a server becomes unavailable, the client can read another replica on a different server)

C2
C0
C5
Datanode 1
Datanode N
C1
C2
49

 Distributed file system  HDFS write path (1/2)
Client library
(2) Does the file exists (3) Does the client have write permission?
Namenode
Metadata {file.txt Chunk A: 1,3,5 Chunk B: 2,4,5 ,,,}
(4) Outputstream object (5) Data split into packets and added into queue
(6) Namenode allocates
new blocks
C0
Datanode 1
Datanode N

C0
C5
C1
C2
D0
C5
C2
50

 Distributed file system  HDFS write path (2/2)
Client library
(7) Connection with Datanodes
(9) Client closes outputstream and sends request to
close the file
C0
Datanode 1
Datanode N
(8) Datanode 1 consumes data from the queue, writes to aDatanode 2 (for replication), which sends acknowledgement to Datanode 1, …, Datanode 1 sends ack. to client
51

C0
Namenode
Metadata {file.txt Chunk A: 1,3,5 Chunk B: 2,4,5 ,,,}
D0
C5
C2
C5
C1
C2

MapReduce
 Overview
 Map task: extract something you care about
 Gets chunks from the distributed file system
 Turns chunk into key-value pairs
 Code tells what chunks to get and what key-value pairs to create
 Group by key: sort and shuffle
 For each key, a pair (key,list_of_values) is created  Performed by the system
 Reduce task: Aggregate, summarize, filter or transform  Combines all the values associated with a key
 Code tells how to combine the values
 Write the combined output
52

MapReduce
 Map and Reduce as functions specified by the programmer
 𝑀𝑎𝑝𝑘,𝑣 →<𝑘′,𝑣′>∗
 Input: a key-value pair
 Output: a possibly empty list of key-value pairs  One Map call for every different (k,v) pair
 𝑅𝑒𝑑𝑢𝑐𝑒 𝑘′,<𝑣′ >∗ →<𝑘′,𝑣′′>∗
 Input: a key and a possibly empty list of its associated values  Output: A possibly empty list of key-value pairs
 One Reduce function for every different key k’
53

Map-Reduce: Environment
Map-Reduce environment takes care of:
 Partitioning the input data
Scheduling the program’s execution across a
set of machines
 Performing the group by key step  Handling machine failures
 Managing required inter-machine communication
54

MapReduce dealing with failures
 Master periodically pings the workers for failures  Map worker failure
 Map tasks completed or in-progress at worker are reset to idle
 Reduce workers are notified when task is rescheduled on another worker
 Reduce worker failure
 Only in-progress tasks are reset to idle
 Reduce task is restarted
 Master failure
 MapReduce task is aborted and client is notified
55

Refinement: Combiners
 Often a Map task will produce many pairs of the form (k,v1), (k,v2), … for the same key k
 E.g., popular words in the word count example
 Can save network time by pre-aggregating values in the mapper:
 combine(k, list(v1))v2
 Combiner is usually same
as the reduce function
 Works only if reduce
function is commutative and associative
56

Refinement: Combiners
 Back to our word counting example:
 Combiner combines the values of all keys of a
single mapper (single machine):
 Much less data needs to be copied and shuffled!
57

Refinement: Partitioning Function
 Want to control how keys get partitioned
 Inputs to map tasks are created by contiguous
splits of input file
 Reduce needs to ensure that records with the same intermediate key end up at the same worker
 System uses a default partition function:
 hash(key) mod R – aims to create
“well-balanced” partitions
 Sometimes useful to override the hash function:
 E.g., hash(hostname(URL)) mod R to put all URLs from a host in the same output file
58

MapReduce with python
 mr_word_count.py Empty key and
a line as value
Returns three key-value pairs
For each key,
returns the sum of its values
59

MapReduce with python
 Numerical summarization: Problem 1
Parses each line of the input
emits (key, value) pair as (url,1)
Receives pairs (key, list_of_values) grouped by key
For each pair, sums its values to compute its count
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] if year==‘2014’:
yield url,1
def reducer(self, key, list_of_values):
yield key, sum(list_of_values)
if __name__ ==‘__main__’: Mrmyjob.run()
Recall .strip() removes leading and trailing whitespace characters
60

MapReduce with python
 Numerical summarization: for count, max, min, avg
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

month
Problem 2: Most visited page in each month of 2014.
“Page visited” means URL appears in Log file
Most visited means maximum number of times (occurrences of a URL)
61

MapReduce with python
 Numerical summarization: Problem 2
def reducer1(self, key, list_of_values):
from mrjob.job import MRJob from mrjob.step import MRStep class Mrmyjob(MRJob):
def mapper1(self, _, line): data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] month=date[5:7]
if year==‘2014’:
yield (month, url),1
yield key[0], (sum(list_of_values), key[1]) (key, value) as (month, (count, url))
def reducer2(self, key, list_of_values): yield key, max(list_of_values)
Receives: for each month, the count of each url Emits: for each month, the url and count of visits, for
the most visited url in the month
continues on the next slide
(key, value) as ((month,url),1)
62

MapReduce with python
 Numerical summarization: Problem 2
from mrjob.job import MRJob from mrjob.step import MRStep
class Mrmyjob(MRJob):
def mapper1(self, _, line):
data=line.split() …
yield (month, url),1
def reducer1(self, key, list_of_values): …
def reducer2(self, key, list_of_values): …
def steps(self):
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1), MRStep(reducer=self.reducer2)]
if __name__ ==‘__main__’: Mrmyjob.run()
mrjob by default uses a single MapReduce job to execute the program. We redefine steps() to create two jobs, each defined with MRStep().
Note 3.2 in Bagha’s book uses the deprecated self.mr() method instead of MRStep
https://pythonhosted.org/mrjob/job.html#multi-step-jobs
63

MapReduce with python
 Numerical summarization: Problem 2
2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-07 21:02:03 http://www.cnn.com 120.62.102.124 7
[cloudera@quickstart Desktop]$ python summarization.py toy_log.txt
Using configs in /etc/mrjob.conf
Creating temp directory /tmp/summarization.cloudera.20170208.120451.098082
Running step 1 of 2…
Running step 2 of 2…
Streaming final output from /tmp/summarization.cloudera.20170208.120451.098082/output…
“01” [1, “http://www.google.com”]
“02” [1, “http://www.google.com”]
“06” [2, “http://www.cnn2.com”]
Removing temp directory /tmp/summarization.cloudera.20170208.120451.098082…
Input file: toy_log.txt
64

MapReduce with python
 Filtering: for getting out (i.e., retrieve) a subset of data
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-12-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-12-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 3: Retrieve all page visits for the page http://www.google.com in Dec 2014.
Example output
http://www.google.com , (2014-12-01 20:01:02 77.32.11.111 3) http://www.google.com , (2014-12-02 23:01:08 77.32.11.111 7)
65

MapReduce with python
 Filtering: Problem 3 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=date[5:7]
Parses each line of the input emits (key, value) pair as (url,(date, …, visit_length))
There is no reducer.
if year==‘2014’ and month==‘12’ and url==‘http://www.google.com’: yield url,(date, time, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
66

MapReduce with python
 Distinct: for getting out (i.e., retrieve) distinct values
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 4: Retrieve all distinct IP addresses
Example output
77.32.11.111, None 50.62.22.123, None 120.62.102.124, None
67

MapReduce with python
 Distinct: Problem 4 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) yield ip, None
def reducer(self, key, list_of_values): yield key, None
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (ip,None)
Receives (key, None) and emits distinct keys
Question: Is reducer necessary here? Why?
68

MapReduce with python
 Binning: for partitioning records into bins (i.e., categories)
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 5: Partition records of the Log file based on the quarter of 2014 in which the pages have been visited. Note quarters 1,2,3,4 are denoted with Q1, Q2, Q3, Q4.
Example output
Q1, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3) Q1, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7) Q2, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7)
69

MapReduce with python
 Binning: Problem 5
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4]
month=int(date[5:7]) if year==‘2014’:
if month<=3: yield ”Q1”, (date, time, url, ip, visit_length) elif month<=6: yield ”Q2”, (date, time, url, ip, visit_length) elif month<=9: yield ”Q3”, (date, time, url, ip, visit_length) else: yield ”Q4”, (date, time, url, ip, visit_length) if __name__ ==‘__main__’: Mrmyjob.run() Parses each line of the input emits (key, value) pair as (Quarter_id, record) where Quarter_id depends on the month of 2014 (e.g., Q1 for month<=3) int(date[5:7]) because we use month in if, elseif,else 70 MapReduce with python  Sorting: for sorting with respect to a particular key  Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 ... Problem 6: Sort the pages visited in 2014, with respect to their visit length in decreasing order Example output 7, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111) 7, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124) 3, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111) 71 MapReduce with python  Sorting: Problem 6 from mrjob.job import MRJob class Mrmyjob(MRJob): def mapper(self, _, line): data=line.split() date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=int(date[5:7]) if year==‘2014’: Parses each line of the input emits (key, value) pair as (None, tuple of tuples) (e.g., None, (7, (2014-01-01,...)) By default, list_of_values in reducer is generator, and list() converts it into list. reverse=True imposes decreasing order. yield None, (visit_length, (date, time, url, ip)) def reducer(self, key, list_of_values): list_of_values=sorted(list(list_of_values), reverse=True) yield None, list_of_values if __name__ ==‘__main__’: Mrmyjob.run() 72 Key/value databases  Store key-value pairs  Keys are unique  The values are only retrieved via keys and are opaque to the database Key Value 111 John Smth 324 Binary representation of image 567 XML file  Key-value pairs are organized into collections (a.k.a buckets)  Data are partitioned across nodes by keys  The partition for a key is determined by hashing the key 73 Key/value databases  Pros: very fast simple model able to scale horizontally  Cons: - many data structures (objects) can't be easily modeled as key value pairs 74 Key/value databases  Suitable when:  Unstructured data  Fast read/writes  Key suffices for identifying value (“1-1” mapping)  No dependences among values  Simple insert/delete/select operation  Unsuitable when:  Operations (search, filter, update) on individual attributes of the value  Operations on multiple keys in a single transaction 75 Document databases  Store documents in a semi-structured form  A document is a nested structure in JSON or XML format JSON { name : “Joe Smith”, title : “Mr”, Address : { address1 : “Dept. of Informatics”, address2 : “Strand”, postcode : “WC2 1ER”, } expertise: [ “MongoDB”, “Python”, “Javascript” ], employee_number : 320, location : [ 53.34, -6.26 ] } Unordered name-value pairs Ordered collection of values 76 Document databases  Store documents in a semi-structured form  A document is a nested structure in JSON or XML format XML
Joe Smith Mr

Dept. of Informatics


http://www.w3schools.com/xml/
77

Document vs. key-value databases
1. In document databases, each document has a unique key
2. Document databases provide more support for value operations
 They are aware of values
 A select operation can retrieve fields or parts of a value
 e.g., regular expression on a value
 A subset of values can be updated together
 Indexes are supported
 Each document has a schema that can be inferred from the structure of the value
 e.g., store two employees one of which does not have address3
78

Document databases
 Suitable when:
 Semi-structured data with flat or nested schema
 Search on different values of document
 Updates on subsets of values
 CRUD operations (Create, Read, Update, Delete)  Schema changes are likely
 Unsuitable when:
 Binary data
 Updates on multiple documents in a single transaction  Joins between multiple documents
79

Column-family databases
 Store columns. Each column has a name and value.  Group related columns in a row
 A row does not necessarily have fixed schema or number of columns Difference in storage layout
RDBMS
Column Database
ID
Name
Salary
1
Joe D
$24000
2
Peter J
$28000
3
Phil G
$23000
1
Joe D
$24000
2
Peter J
$28000
3
Phil G
$23000
1
2
3
Joe D
Peter J
Phil G
$24000
$28000
$23000

Column-family databases
 Suitable when:
 Data has tabular structure with many columns and sparsely
populated rows (i.e., high-dimensional matrix with many 0s)
 Columns are interrelated and accessed together often
 OLAP
 Realtime random read-write is needed
 Insert/select/update/delete operations
 Unsuitable when:  Joins
 ACID support is needed
 Binary data
 SQL-compliant queries
 Frequently changing query patterns that lead to column
restructuring
81

Column-family databases
 Examples of applications:
82

Graph databases
 Store graph-structured data
 Nodes represent the entities and have a set of attributes
 Edges represent the relationships and have a set of attributes
83

Graph databases
 Optimized for representing connections
 Can add/remove edges and their attributes easily
Bob becomes friend of Grace
84

Graph databases
 Designed to allow efficient queries for finding interconnected nodes based on node and/or edge attributes
Find all friends of each node
Scalable w.r.t. nodes because each node knows its neighbors
Find all friends of friends of Jim who are not his colleagues
85

Graph databases
 Suitable when:
 Data comprised of interconnected entities  Queries are based on entity relationships
 Finding groups of interconnected entities  Finding distances between entities
 Unsuitable when:  Joins
 ACID support is needed
 Binary data
 SQL-compliant queries
 Frequently changing query patterns that lead to column
restructuring
86

MongoDB
 A document database
 Hash-based
 Stores hashes (system-assigned _id) with keys and values
for each document
 Dynamic schema
 No Data Definition Language
 Application tracks the schema and mapping
 Uses BSON (Binary JSON) format
 APIs for many languages
 JavaScript, Python, Ruby, Perl, Java,
Scala, C#, C++, Haskell, …
https://www.mongodb.com/ 87

Index support
 B+ tree indices, GeoSpatial indices, text indices
 Index created by the system on _id. Can be viewed by
db.system.indexes.find()
 Users can create other indices db.phones.ensureIndex(
{ display : 1 },
{ unique : true, dropDups : true } )
and compound indices
db.products.createIndex( { “item”: 1, “stock”: 1 } ) https://docs.mongodb.com/v3.4/indexes/ 88
Only unique values
in “display” field duplicates are dropped

Replication
 Multiple replicas (dataset copies) are stored
 Provides scalability (distributed operations), availability (due to
redundancy), and fault tolerance (automatic failover)
 Replica set: group of <=50 mongod instances that contain the same copy of the dataset  Primary instance: receives operation requests  Secondary instances: apply the operations to their data  Automatic failover When a primary does not communicate with its secondaries for >10 seconds, a secondary becomes primary after elections and voting
https://docs.mongodb.com/v3.4/replication/ 89

Sharding: overview
 Sharding is the process of horizontally partitioning the dataset into parts (shards) that are distributed across multiple nodes
 Each node is responsible only for its shard
Interface between applications and sharded cluster. Processes all requests
Decides how the query is distributed based on metadata from config server
Stores metadata and configuration settings for the cluster
To benefit from replication, shards and config server may be implemented as replica sets
https://docs.mongodb.com/v3.4/sharding/ 90

Sharding: benefits
 Efficient reads and writes: They are distributed across the shards.
 More shards, higher efficiency
 Storage capacity: Each shard has a part of the dataset
 More shards, increased capacity
 High availability: Partial read / write operations performed if shards are unavailable. Reads or writes directed at the available shards can still succeed.
 More shards, less chance of a shard to be unavailable
https://docs.mongodb.com/v3.4/sharding/
91

Fast in-place updates
 In-place update: Modifies a document without growing its size
Increases stock by 5 and changes item name
{
_id: 1,
item: “book1” stock: 0,
}
db.books.update( { _id: 1 },
{
$inc: { stock: 5 }, $set: {
item: “ABC123“} }
)
 In-place updates are fast because the database does not have to allocate and write a full new copy of the object.
 Multiple updates are written one time (every few seconds). This lazy writing strategy helps efficiency, when updates are frequent (e.g., 1000 increments to stock)
92

What is Apache Spark?
 Unified engine for distributed data processing
 Spark extends MapReduce programming model with an
abstraction that allows efficient data reuse
User’s program
Data processing libraries
STORAGE
HDFS, local filesystem, Hbase, …
Cluster management
Apache Mesos, YARN, Standalone
93

Motivation for Spark
 Map Reduce is inefficient for applications that reuse intermediate results across multiple computations.
Example
PageRank
 The output of a MR job in iteration i, is the input of another MR job in iteration i+1. And i can be very large.
k-means clustering
Logistic Regression
94

Motivation for Spark
 Map Reduce is inefficient for interactive data mining (multiple ad-hoc queries on the same data)
Example
Web log queries to find total views of
(1) all pages,
(2) pages with titles exactly matching a given word, and (3) pages with titles partially matching a word.
95

Motivation for Spark
 Hadoop is inefficient because it writes to distributed file system (HDFS)  Overhead due to data replication, disk I/O, and serialization
 Frameworks to address the inefficiency support specific computation patterns
 Pregel: Iterative graph computations
 HaLoop: Iterative MapReduce interface
but not generic data reuse (e.g., a user cannot load all logs in memory and perform interactive data mining on them)
96

Spark
 Resilient Distributed Datasets (RDDs) enable efficient data reuse
 An RDD is a read-only, fault-tolerant collection of records that can be
operated in parallel
 An RDD is a data structure that serves as the core unit of data
 User program can manipulate it, control its partitioning, make it
persistent in memory.
97

Overview of RDDs
 Do not need to be materialized
 Each RDD knows its lineage (how it was derived from other datasets)
and can compute its partitions from data in stable storage
 Only RDDs that can be reconstructed after failure can be referenced by a user’s program
 Persistence
 Users can indicate which RDDs they will reuse and choose a storage strategy
for them (e.g., in-memory storage)
 Partitioning
 Users can specify how records of an RDD are partitioned across machines
based on a key in each record
98

Overview of RDDs
 Persist method to implement persistence
 Persistent RDDs are stored in RAM by default
 If there is not enough RAM, they are spilled to disk
 Users can request other storage strategies
 Storing an RDD only on disk
 Replicating an RDD across machines, through
flags to persist.
 Set a persistence priority on each RDD to specify
which in-memory data should spill to disk first.
99

Applications for RDD
 Suitable for batch applications that perform the same operation to the entire dataset
 RDDs remember each transformation as one step in a lineage graph  RDDs can recover lost partitions efficiently
 Not suitable for applications that make asynchronous fine-grained updates to shared state
 e.g., storage system for a web application in which many users update values on the same table
100

pySpark in shell
 Parallelized collections
 Created by the parallelize method of sc on an existing iterable or collection d  The elements of d are copied to form a distributed dataset that can be
operated on in parallel
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d)
 The number of partitions of the parallelized collection is determined automatically based on the cluster, but it can be set up as a second optional parameter
>>> d= [1,2,3,4,5]
>>>parallel_col = sc.parallelize(d,10)
101

pySpark in shell
 count
 Returns the number of elements in this RDD
 filter
 Argument: a function that acts as a filter
 lambda is a type of function with no specific name
 Output: a new RDD with elements that pass the filter (i.e., those on which the
function returns true)
>>> tf = sc.textFile(“file:///usr/share/dict/words”) >>> lines_nonempty=tf.filter( lambda x: len(x) > 0) >>> lines_nonempty.count()
Same output with $cat /usr/share/dict/words |wc
102

pySpark in shell
 map
 Argument: a function f
 Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them  collect
 returns all the elements of the dataset as an array at the driver program
>>> nums = sc.parallelize([1, 2, 3, 4])
>>> squared = nums.map(lambda x: x * x).collect() >>> for num in squared:
print(num,”,”) Output: 1,4,9,16
103

pySpark in shell
 map
 Argument: a function f
 Output: A new RDD whose elements are the elements of the source RDD,
after applying f on each of them (each element passed through f)  flatMap
 Output: similar to map but returns an RDD of the elements >>> x = sc.parallelize([“a b”, “c d”])
>>> y = x.map(lambda x: x.split(‘ ‘))
>>> y.collect()
[[‘a’, ‘b’], [‘c’, ‘d’]]
>>> y = x.flatMap(lambda x: x.split(‘ ‘)) >>> y.collect()
[‘a‘,’b’,’c’,’d’]
104

pySpark in shell
 sample(withReplacement, fraction, [seed])
 Output: A sample of size fraction*100% of the data, with or without
replacement, using a given random number generator seed
 rddA.union(rddB)
 rddA.intersection(rddB)
 rddA.subtract(rddB)
 rddA.cartesian(rddB)
 rddA.join(rddB,[number of reduce tasks])
 When called on datasets of type (K, V) and (K, W), where K is a key and V, W values, it returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key
>>> x = sc.parallelize([(‘a’,1),(‘b’,2)])
>>> y = sc.parallelize([(‘b’,3),(‘d’,4)])
>>> x.join(y).collect()
[(‘b’, (2,3))]
105

pySpark in shell
 Reduce
 Arguments: Two elements of the same type in an RDD, a commutative and
associative function f
 Output: A new element of the same type as that of the arguments, which is
the result of applying f to the elements
 Fold
 Same as reduce but with a first identity (“zero”) argument of the type we want
to return
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> sum = rdd.reduce(lambda x,y: x+y)
>>> sum
Output: 15
>> sum2=rdd.fold(0.0,lambda x,y: x+y) >> sum2
Output: 15.0
106

pySpark in shell
 Accumulators
 Variables that are only “added” to through an associative and commutative
operation and can therefore be efficiently supported in parallel.
 Created from an initial value v by calling SparkContext.accumulator(v).  Tasks can add y to them using add(y)
 Only the driver program can read the accumulator’s value, using value
>>>accum = sc.accumulator(0)
>>> accum
Output: Accumulator
>>>sc.parallelize([1, 2, 3, 4,5]).foreach(lambda x: accum.add(x)) >> accum.value
Output: 15
107

pySpark in shell
 persist
 Sets the storage level of an RDD to persist. Useful if it will be reused.
 cache
 Sets the storage level to MEMORY_ONLY
Deserialized: Keeps data in memory in a serialized format, for space efficiency Replication: Replicates RDD partitions on 1 or 2 cluster nodes
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER
True
True
False
1
108

HIVE
 Motivation
 Data warehousing
 Not scalable enough
 Prohibitively expensive at web scale
 Up to $200K/TB
 Little control over execution method  Query optimization is hard
 Parallel environment  Little or no statistics
 Lots of UDFs
109

HIVE
 Motivation
 Can MapReduce help?
 MapReduce is scalable
 Only parallelizable operations  No transactions
 MapReduce runs on cheap commodity hardware  Clusters, cloud
 MapReduce is procedural
 A pipe for processing data
110

HIVE
 Motivation
 Can MapReduce help?
 But
1. Data flow is rigid (one-input, two-stage)
and is complex to change (workarounds below)
111

HIVE
 Motivation
 Can MapReduce help?
 But
2. custom code has to be written for even the most common operations
e.g., join, sorting, distinct (Recall MR patterns)
 not all users willing/able to do it
3. Map and reduce functions are opaque difficult to maintain, extend, optimize
112

HIVE
 Motivation
 Can MapReduce help?
 Main limitation of MapReduce that HIVE tries to address
The map-reduce programming model is very low level and requires developers to write custom programs which are hard to maintain and reuse.
113

HIVE
 Motivation
 Can MapReduce help?
 Solution
 Develop higher-level data processing languages
that “compile down” to Hadoop jobs
 HIVE, PIG
114

HIVE
Intuition
* Make the unstructured data look like tables
regardless of how it really lays out
* SQL based query can be directly against these tables * Generate specific execution plan for this query
What is HIVE?
*An open-source data warehousing solution built on top of Hadoop, which supports queries that are
(i) expressed in HiveQL (an SQL-like declarative language) (ii) compiled into MapReduce jobs that are executed
using Hadoop
115

HIVE
Applications
• Log processing
• Text mining
• Document indexing
• Customer-facing business intelligence (Google analytics)
• Predictive modeling
• Hypothesis testing
Slide adapted from:
http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
116

HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
117

HIVE
Components of HIVE
* Optimizer:
Optimizes the tasks (improves HiveQL)
* Executor:
(i) executes the tasks in the right order (ii) Interacts with Hadoop
* Metastore:
(i) Acts as the system catalog. That is,
it stores information about tables, partitions, locations in HDFS etc.
(ii) It runs on an RDBMS. Not on HDFS, because it needs to have very low latency.
* Thrift Server:
Provides interfact between clients and Metastore (so that clients can query or Modify the information in Metastore)
118

HIVE
Data model: comprised of the following data units
* Table: each table consists of a number of rows, and each row consists of a specified
number of columns
Basic column types: int, float, boolean Complex column types:
List, Map, Struct
Arbitrary data types:
(i) They are created by composing basic & complex types
list >
(i) They are useful for external and legacy data
119

HIVE
Data model: comprised of the following data units
* Partition: the result of decomposing a table into partitions, based on values
Creating a partitioned table
– The partitioning columns are not part of table data but behave like regular columns Adding new partition to partitioned table
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
120

HIVE
Data model: comprised of the following data units * Bucket: the result of decomposing a table into buckets
– bucketing is useful when partitions are too many and small (Hive will not create them)
Bucketing is useful for sampling
e.g., a 1/96 sample can be generated efficiently by looking at the first bucket
121

HIVE
The mapping of data units into the HDFS name space
• A table is stored in a directory in HDFS
• A partition of the table is stored in a subdirectory within a table’s HDFS directory
• A bucket is stored in a file within the partition’s or table’s directory depending on whether the table is a partitioned table or not.
TableHDFS directory
Partitionssubdirectory of the table’s HDFS directory Bucketsfile within dir or subdir
Warehouse root directory
Example:
table test_part mapped to /user/hive/warehouse/test_part partition(ds=‘2009-02-02’,hr=11) of test_part mapped to
/user/hive/warehouse/test_part/ds=2009-02-02/hr=11 bucket1 mapped to file bucket1 in /user/hive/…. /hr=11
122

HIVE
How the mapping helps efficiency
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
Hive prunes the data by scanning only the required sub-directories
Bucketing is useful for sampling
 Hive uses the file corresponding to a bucket
123

HIVE
 Pros
 A easy way to process large scale data
 Support SQL-based queries
 Provide more user defined interfaces to extend
 Programmability
 Efficient execution plans for performance
 Interoperability with other database
124

HIVE
 Cons
 No easy way to append data
 Files in HDFS are immutable
125

SparkSQL
Spark SQL
 A new module in Apache Spark that integrates relational processing with Spark’s functional programming API.
 It lets Spark programmers leverage the benefits of relational processing (e.g., declarative queries and optimized storage)
• It lets SQL users call complex analytics libraries in Spark (e.g., machine learning).
 Part of the core distribution since April 2014
 Runs SQL / HiveQL queries, optionally alongside or replacing existing Hive deployments
126

SparkSQL
 Motivation
 SparkSQL tries to bridge the two models
Main goal: Extend relational processing to cover native RDDs in Spark and a much wider range of data sources, by:
1. Supporting relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer-friendly API.
2. Providing high performance using established DBMS techniques.
3. Easily supporting new data sources, including semi-structured data and external databases amenable to query federation.
4. Enabling extension with advanced analytics algorithms such as graph processing and machine learning.
127

SparkSQL
 Motivation
 SparkSQL tries to bridge the two models with:
1. A DataFrame API that can perform relational operations on both external data sources and Spark’s built-in RDDs.
2. A highly extensible optimizer, Catalyst, that uses features of Scala to add composable rule, control code gen., and define extensions.
128

SparkSQL
 DataFrame
 A distributed collection of rows with the same schema
 Similar to a table in RDBMS
 Can be constructed from external data sources or RDDs
 Support relational operators (e.g. where, groupby) as well as Spark operations.
 Evaluated lazily:
 each DataFrame objects represents a logical plan to compute a
dataset but no execution occurs until user calls an output operation
 This enables optimization
129

SparkSQL
 Data Model
 SparkSQL uses a nested data model based on HIVE
 SparkSQL supports different data types:
 primitive SQL types: boolean, integer, double, decimal, string, data, timestamp
 complex types: structs, arrays, maps, and unions
 user defined types
 Supporting these data types allows modeling data from
 HIVE
 RDBMS
 JSON
 Native objects in JAVA/Python/Scala
Data Model
130

Motivation
 So far, all our data was available when we wanted it
 BUT
 we may not know the entire data set in advance
 data may have high velocity, so that we cannot store
all data and then access them
In domains such as
 web click-stream data
 computer network monitoring data  telecommunication connection data  readings from sensor networks
 stock quotes
131

Data stream definition
 Data stream
 An ordered and potentially infinite sequence of
elements
 An element (a.k.a data point) can be:
 event “page A visited” in web click-stream data
 stock price “10.3” in stock quote data
 customer record “[item, qty, price]” in marketing data  graph in social network data streams
…
132

Data stream example 2
 HTTP server log
Report number of distinct accessed page every hour Report the most frequently accessed pages every hour
Report average view time of a page every hour
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
133

Properties of data streams
1. Potentially infinite
• Transient (stream might not be realized on disk)
• Single pass over the data
• Only summaries can be stored
• Real-time processing (in main memory)
2. Data streams are not static
• Incremental updates • Concept drift
• Forgetting old data
3. Temporal order may be important
http://michael.hahsler.net/SMU/EMIS8331/slides/datastream/datastream.pdf
134

Data Stream Management System (DSMS)
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Streams entering the DSMS
Processor
Often have different velocity (arrival rate) and data types
135

Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
Archives streams
Not for querying due to
very low efficiency of data retrieval
136

Data stream management system
. . . 1, 5, 2, 7, 0, 9, 3
. . . a, r, v, t, y, h, b
. . . 0, 0, 1, 0, 1, 1, 0
time
Archival Storage
Processor
* Storage space in disk or in main memory
* Used for answering queries
* Cannot store all data
Limited Working Storage
137

Data stream management system
permanently executing and produce outputs at appropriate times
Standing Queries
Example of standing query
“output an alert, when the variable TEMPERATURE exceeds 25”
Archival Storage
Processor
Limited Working Storage
138

Data stream management system
Ad-Hoc Queries
Standing Queries
Processor
Example of ad-hoc query
“list the total number of transactions
in the last hour”
Output
Query answers
Archival Storage
Limited Working Storage
139
asked once about the current state of a stream or streams

Common queries
 Sampling data from a stream
 Construct a random sample (i.e., a random subset
of the stream)
 Queries over sliding windows
 Number of items of type x in the last k elements of the stream
140

Common queries
 Filtering a data stream
 Select elements with property x from the stream
 Counting distinct elements
 Number of distinct elements in the last k elements
of the stream
 Estimating frequency moments
 Estimate avg./std.dev. of last k elements
 Finding frequent elements
141

Common applications of these queries
 Mining query streams
 Google wants to know what queries are
more frequent today than yesterday
 Mining click streams
 Yahoo wants to know which of its pages are
getting an unusual number of hits in the past hour
 Mining social network news feeds
 E.g., look for trending topics on Twitter, Facebook
142

Sampling from data stream
 Construct a sample from a data stream such that we can ask queries on the sample and get answers that are statistically representative of the stream as a whole
 Example application
 Stream from search query engine
(u1, q1, t1), (u1, q1, t2), (u2, q2, t3),…, user query time
 What fraction of the typical user’s queries were repeated over the lat month?
 Cannot store more than 1/10th of the stream
143

Sampling from data stream
 Naïve solution for the example application
 Generate a random integer in [0..9] for each query
 Store the tuple (u,q,t) if the integer is 0, otherwise discard
 For users with many queries, 1/10th of their queries will be in the sample
 Suppose each user issues x queries once and d queries twice. Queries issued twice will be “duplicate queries”.
What is the fraction of duplicate queries in the stream by an average user?
𝒅 𝒙+𝒅
Stream=[q1,q1,q2,q2,q3] x=1,d=2, fraction of duplicates is 2/3
Is the naïve solution good for answering this query?
144

Sampling from data stream
 Fraction of duplicate queries, based on sample?
 Out of x singleton queries, x/10 will appear in the sample
 This assumes that I have many singletons and happens because I sample
1/10 of the elements (each singleton is an element)
 Duplicate query twice in the sample with probability 1/100
Pr(1st occurrence in sample)*Pr(2nd occurrence in sample)=1/10*1/10=1/100
This is because when I have a duplicate query q1q1, to include
it in the sample, I need to include both q1s and each q1 is sampled with probability 1/10.
 Out of d duplicate queries, 𝑑 ∗ 1 will appear twice in the sample 100
145

Sampling from data stream
 Fraction of duplicate queries, based on sample?
 Duplicate query once in the sample with probability 18/100 Pr(1st occ. in sample)*Pr(2nd occ. not in sample) +
Pr(2nd occurrence in sample)*Pr(1st occurrence not in sample)= =1/10*(1-1/10) + 1/10*(1-1/10) =1/10*9/10+1/10*9/10=18/100
This is because when I have a duplicate query q1q1 I can miss it in the sample by: (I) not sampling the second q1 (red arrow), OR (II) by not sampling the first q1 (yellow arrow)
 Out of d duplicate queries, 𝑑 ∗ 18 will appear once in the sample 100
𝑑∗1 𝒅 𝒅
The sample-based answer is
100 = ≠
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝟏𝟎𝒙+𝟏𝟗𝒅
𝒙+𝒅
146

Sampling from data stream
 How bad is the sample-based answer?
The sample-based answer is
𝑑∗ 1
100 =
𝑑∗ 1 +𝑑∗ 18 + 𝑥 100 100 10
𝒅 𝟏𝟎𝒙+𝟏𝟗𝒅
The answer based on the entire stream is 𝒅
• For x=1000, d=300 𝒙+𝒅
• Sample-based answer 0.019 = 1.9%
• Answer based on entire stream 0.231 = 23.1%. 12.1 times larger
• For x=1000, d=50
• Sample based answer 0.0046= 0.46%
• Answer based on entire stream 0.0476= 4.76%. 10.4 times larger
WHAT WENT SO WRONG?
147

Sampling from data stream
 Each user issues x queries once and d queries twice
What is the fraction of duplicate queries in the stream by an average user?
Main idea: We want avg user, and we sampled queries
Solution
 Select 1/10th of users (assuming this is <=10% of the stream)  For each such user, store all their queries in the sample  To get 1/10th of users  use a hash function h: user{1,2,...,10}  If h(user)=1, we accept their query, otherwise we discard it  Then, count the duplicates in the sample: d/(x+d), for each user 148 Sampling from data stream  General problem  Stream of tuples with keys:  Key is some subset of each tuple’s components  e.g., tuple is (user, search, time); key is user  Choice of key depends on application  To get a sample of a/b fraction of the stream:  Hash each tuple’s key uniformly into b buckets  Pick the tuple if its hash value is in {1,..., a} * How to generate a 30% sample? * What if more and more new users come, and we have a budget on the number of tuples that we can store? 149 Sampling from data stream  General problem  To get a (uniform) sample of s tuples from a stream (e1,e2,...) that contains >s tuples
 Reservoir sampling
 Add the first s tuples from the stream into a reservoir R
 For j>s
 With probability 𝑠 replace a random entry of R with the j-th tuple 𝑗
of the stream (i.e., newcomer replaces random old one)
 At j=n, return R
 Does this work?
 We need to prove that after seeing n tuples, R contains each tuple
seen so far with probability s/n
150

Sampling from data stream
 Proof by induction for s=1 (exercise for given s>=1)
 For j=1, the tuple is in the sample with probability 1 = 1 = 1
𝑗1
 Assume that for j=n, each tuple is in the sample with prob. 1 𝑛
 We need to show that for j=n+1, each tuple is in the sample with
probability 1 𝑛+1
151

Sampling from data stream
 Proof by induction for s=1 (exercise for given s>=1)  For j=n+1
 The tuple j=n+1 is selected with probability 1
𝑛+1
(the for in the algorithm does this)
 If this tuple is selected, the old tuple in R is replaced with prob. 1
 Thus, the prob. that an old tuple is replaced is 1 and not replaced with prob. 1 − 1 𝑛+1
𝑛+1
 So, R contains an old tuple either because it was added into R and not replaced by j, which happens with prob.
1∗1−1=1 𝑛 𝑛+1 𝑛+1
Probability in the sample by the induction for j=n
Probability NOT replaced
or because it was added in the latest round: prob 1 QED 𝑛+1
152

Today
 More algorithms for streams:
 (1) Filtering a data stream: Bloom filters
 Select elements with property x from stream
 (2) Counting distinct elements: Flajolet-Martin
 Number of distinct elements in the last k elements of the stream
 (3) Estimating moments: AMS method  Estimate std. dev. of last k elements
153

Bloom filter
 Probabilistic (approximate) set membership test
 Given a set S = {x1,x2,…,xn}, is y in S ?
 Construct a data structure to answer that is:
 Fast (Faster than searching through S).
 Small (Smaller than explicit representation).
 To obtain speed and size improvements, allow some probability of error.
 False positives: y  S but we report y  S  False negatives: y  S but we report y  S
 Bloom filter
 Data structure for probabilistic set membership testing  Zero false negatives
 Nonzero false positives
154

Bloom filter
 Given a set of keys S that we want to filter
1. Create a bit array B of n bits, initially all 0
2. Choose k hash functions h1 ,…, hk with range [0,n)
3. Hash each member of s S
 use k hash functions, h𝑖 𝑠 , 𝑖 ∈ 1, 𝑘 , which map s into random numbers
uniform in [1, n] (need modulo n if the hash function outputs larger numbers)
 set the elements 𝐵 h1 𝑠 , … , 𝐵 h𝑘 𝑠 to 1
4. When a stream element with key y arrives  use k hash functions
 if 𝐵 h1 𝑦 , … , 𝐵 h𝑘 𝑦 are all 1, output y  else discard y
155

Bloom filter
 Bloom filter example Keys in S: a, b, y, l
Stream keys q, z
Tarkoma et al. Theory and practice of bloom filters for distributed systems. DOI: 10.1109/SURV.2011.031611.00024
156
False positive

Counting distinct elements
 Estimate number of distinct elements
 Select a hash function h that maps each of N elements to at least log2 N bits
 N is the max. number of distinct elements e.g.,a1100 (12inbinary)
 Define r(h(a)) as the number of 0s from the right e.g., ah(a)=1100r(h(a))=2
157

Counting distinct elements
 Estimate number of distinct elements  Flajolet-Martin (FM) sketch
For each element x in stream S Compute r(h(x))
LetR=max𝑟 h 𝑥 𝑥∈𝑆
Return 2𝑹 as the estimated number of distinct elements in S
158

Counting distinct elements
Exercise
 Given the stream {3,1,4,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
159

Counting distinct elements
Exercise
 Given the stream {3,1,3,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
160

Counting distinct elements
 Given the stream {3,1,3,2} and h(x)=4xmod32, compute the estimate output by an FM sketch that uses h(x) and stores each h(x) using 4 bits.
Element x
Hash h(x)
r(h(x))
3
1100
2
1
0100
2
3
1100
2
2
1000
3
R=max(2,2,2,3)=3 Estimate is 2^R=8
h(2)=4*2mod32=8 (in binary 1000)
161

Estimating frequency moments
 What are frequency moments
 The 𝑘-th frequency moment of a stream comprised of N
different types of elements 𝑎1, … , 𝑎𝑁 each appearing 𝑚1, … , 𝑚𝑁 times is defined as 𝑓 = 𝑁 𝑚𝑘
𝑘 𝑖=1 𝑖
 𝑓 is the number of distinct elements 0
 𝑓 is the total frequency (length of stream) 1
𝑓 computation 2
http://www.tau.ac.il/~nogaa/PDFS/amsz4.pdf
162

Estimating frequency moments

Why they are important
 Indicate the degree of data skew, which is useful in many parallel database applications (e.g., determines the selection of algorithms for data
partitioning)
 𝑓 is the surprise number S (how uneven is the distribution)
Stream of length 100 11 distinct values
Item counts: 10, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 Surprise S = 910
Item counts: 90, 1, 1, 1, 1, 1, 1, 1 ,1, 1, 1 Surprise S = 8,110
2
http://www.tau.ac.il/~nogaa/PDFS/amsz4.pdf
163

What is it ? (1/3)
Spark SQL
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
164

What is it ? (2/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
Scales to hundreds of nodes
Achieves second-scale latencies
Efficiently recovers from failures
Integrates with batch and interactive processing
165

What is it ? (3/3)
• Extension of the core Spark API that enables
• scalable, high-throughput, fault-tolerant stream processing of live data streams
https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html 166

Combine batch and stream processing
 Combining batch and stream processing is useful  Online machine learning
 Predict {True, False}, based on stream elements  Stream: {product1, product2, product1, ….}
 Prediction function F(Stream_window)
 Continuously learn and update data models (updateStateByKey and transform)
 Combine live data streams with historical data
 Generate historical data models with Spark, etc.
 Use data models to process live data stream (transform)

Combine batch and stream processing
 Combining batch and stream processing is useful  Complex Event Processing (CEP) processing
 CEP matches continuously incoming events against a pattern  Stream: {room_visit, sanitize, room_visit, … }
 Pattern: {room_visit, room_visit}
 If pattern appears in Stream Then doctor did not sanitize hands, alert!
 window-based operations (reduceByWindow, etc.)

DStream Persistence
 If a DStream is set to persist at a storage level, then all RDDs generated by it set to the same storage level
 When to persist?
 If there are multiple transformations / actions on a DStream
 If RDDs in a DStream is going to be used multiple times
 Window-based DStreams are automatically persisted in memory

DStream Persistence
 Default storage level of DStreams is StorageLevel.MEMORY_ONLY_SER (i.e. in memory as serialized bytes)
 Except for input DStreams which have StorageLevel.MEMORY_AND_DISK_SER_2
 Note the difference from RDD’s default level (no serialization)
 Serialization reduces random pauses due to GC providing more consistent job processing times
Storage level
useDisk
useMemory
deserialized
replication
MEMORY_ONLY
False
True
False
1
MEMORY_ONLY_2
False
True
False
2
MEMORY_ONLY_SER
False
True
False
1
DISK_ONLY
True
False
False
1
MEMORY_AND_DISK
True
True
True
1
MEMORY_AND_DISK_SER_2
True
True
False
2