7CCSMBDT – Big Data Technologies Week 4
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1
Objectives
Today:
MapReduce patterns
Numerical summarization (count, max) Filtering
Distinct
Binning (partitioning records into bins) Sorting
Read: Chapter 3.2 from Bagha
https://github.com/mattwg/mrjob-examples
MapReduce (join, cost measurement) NoSQL databases (intro)
2
MapReduce with python
mrjob
lets you write MapReduce jobs in Python
2.6+/3.3+ and run them on several platforms
https://pythonhosted.org/mrjob/index.html
Reference manual in pdf
https://media.readthedocs.org/pdf/mrjob/latest/mrjob.pdf
3
MapReduce with python
mr_word_count.py Empty key and
a line as value
Returns three key-value pairs
For each key,
returns the sum of its values
4
MapReduce with python
Numerical summarization: for count, max, min, avg, top-N
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
We will discuss count and max.
Min is similar to max.
See Chapter 3.2 from Bagha for avg and top-N
Problem 1: Count total number of times each page is visited in 2014.
“Page visited” means URL appears in Log file
Number of times means lines containing the same URL
5
MapReduce with python
Numerical summarization: Problem 1
Parses each line of the input
emits (key, value) pair as (url,1)
Receives pairs (key, list_of_values) grouped by key
For each pair, sums its values to compute its count
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] if year==‘2014’:
yield url,1
def reducer(self, key, list_of_values):
yield key, sum(list_of_values)
if __name__ ==‘__main__’: Mrmyjob.run()
Recall .strip() removes leading and trailing whitespace characters
6
MapReduce with python
Numerical summarization: for count, max, min, avg
Example:Logfile(Date,Time,URL,IP,visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
month
Problem 2: Most visited page in each month of 2014.
“Page visited” means URL appears in Log file
Most visited means maximum number of times (occurrences of a URL)
7
MapReduce with python
Numerical summarization: Problem 2
def reducer1(self, key, list_of_values):
from mrjob.job import MRJob from mrjob.step import MRStep class Mrmyjob(MRJob):
def mapper1(self, _, line): data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] month=date[5:7]
if year==‘2014’:
yield (month, url),1
yield key[0], (sum(list_of_values), key[1]) (key, value) as (month, (count, url))
def reducer2(self, key, list_of_values): yield key, max(list_of_values)
Receives: for each month, the count of each url Emits: for each month, the url and count of visits, for
the most visited url in the month
continues on the next slide
(key, value) as ((month,url),1)
8
MapReduce with python
Numerical summarization: Problem 2
from mrjob.job import MRJob from mrjob.step import MRStep
class Mrmyjob(MRJob):
def mapper1(self, _, line):
data=line.split() …
yield (month, url),1
def reducer1(self, key, list_of_values): …
def reducer2(self, key, list_of_values): …
def steps(self):
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1), MRStep(reducer=self.reducer2)]
if __name__ ==‘__main__’: Mrmyjob.run()
mrjob by default uses a single MapReduce job to execute the program. We redefine steps() to create two jobs, each defined with MRStep().
Note 3.2 in Bagha’s book uses the deprecated self.mr() method instead of MRStep
https://pythonhosted.org/mrjob/job.html#multi-step-jobs
9
MapReduce with python
Numerical summarization: Problem 2
2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-07 21:02:03 http://www.cnn.com 120.62.102.124 7
[cloudera@quickstart Desktop]$ python summarization.py toy_log.txt
Using configs in /etc/mrjob.conf
Creating temp directory /tmp/summarization.cloudera.20170208.120451.098082
Running step 1 of 2…
Running step 2 of 2…
Streaming final output from /tmp/summarization.cloudera.20170208.120451.098082/output…
“01” [1, “http://www.google.com”]
“02” [1, “http://www.google.com”]
“06” [2, “http://www.cnn2.com”]
Removing temp directory /tmp/summarization.cloudera.20170208.120451.098082…
Input file: toy_log.txt
10
MapReduce with python
Filtering: for getting out (i.e., retrieve) a subset of data
Example: Log file (Date, Time, URL, IP, visit_length) 2014-12-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-12-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 3: Retrieve all page visits for the page http://www.google.com in Dec 2014.
Example output
http://www.google.com , (2014-12-01 20:01:02 77.32.11.111 3) http://www.google.com , (2014-12-02 23:01:08 77.32.11.111 7)
11
MapReduce with python
Filtering: Problem 3 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=date[5:7]
Parses each line of the input emits (key, value) pair as (url,(date, …, visit_length))
There is no reducer.
if year==‘2014’ and month==‘12’ and url==‘http://www.google.com’: yield url,(date, time, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
12
MapReduce with python
Distinct: for getting out (i.e., retrieve) distinct values
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 4: Retrieve all distinct IP addresses
Example output
77.32.11.111, None 50.62.22.123, None 120.62.102.124, None
13
MapReduce with python
Distinct: Problem 4 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) yield ip, None
def reducer(self, key, list_of_values): yield key, None
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (ip,None)
Receives (key, None) and emits distinct keys
Question: Is reducer necessary here? Why?
14
MapReduce with python
Binning: for partitioning records into bins (i.e., categories)
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
…
Problem 5: Partition records of the Log file based on the quarter of 2014 in which the pages have been visited. Note quarters 1,2,3,4 are denoted with Q1, Q2, Q3, Q4.
Example output
Q1, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3) Q1, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7) Q2, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7)
15
MapReduce with python
Binning: Problem 5
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip())
year=date[0:4] month=int(date[5:7]) if year==‘2014’:
if month<=3:
yield ”Q1”, (date, time, url, ip, visit_length)
elif month<=6:
yield ”Q2”, (date, time, url, ip, visit_length)
elif month<=9:
yield ”Q3”, (date, time, url, ip, visit_length)
else:
yield ”Q4”, (date, time, url, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (Quarter_id, record)
where Quarter_id depends on the month of 2014
(e.g., Q1 for month<=3)
int(date[5:7]) because we use month in if, elseif, else
16
MapReduce with python
Sorting: for sorting with respect to a particular key
Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7
...
Problem 6: Sort the pages visited in 2014, with respect to their visit length in decreasing order
Example output
7, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111) 7, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124) 3, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111)
17
MapReduce with python
Sorting: Problem 6
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip())
year=date[0:4] month=int(date[5:7]) if year==‘2014’:
Parses each line of the input emits (key, value) pair as (None, tuple of tuples)
(e.g., None, (7, (2014-01-01,...))
By default, list_of_values in reducer is
generator, and list() converts it into list. reverse=True imposes decreasing order.
yield None, (visit_length, (date, time, url, ip))
def reducer(self, key, list_of_values): list_of_values=sorted(list(list_of_values), reverse=True) yield None, list_of_values
if __name__ ==‘__main__’: Mrmyjob.run()
18
More mrjob examples
Bagha’s book
https://github.com/mattwg/mrjob-examples
https://media.readthedocs.org/pdf/mrjob/latest/mrjob.pdf
19
Example: Join with Map-Reduce
Compute the natural join R(A,B) ⋈ S(B,C) R and S are each stored in a file
Tuples are pairs (a,b) or (b,c)
A
B
a1
b1
a2
b1
a3
b2
a4
b3
B
C
b2
c1
b2
c2
b3
c3
A
C
a3
c1
a3
c2
a4
c3
⋈
= S
R
R(A,B) ⋈ S(B,C)
20
Map-Reduce Join
A Map process turns:
Each input tuple R(a,b) into key-value pair (b,(a,R)) Each input tuple S(b,c) into key-value pair (b,(c,S))
Map processes send each key-value pair with key b to Reduce process
Each Reduce process matches all the pairs (b,(a,R)) with all (b,(c,S)) and outputs (a,b,c).
21
Cost Measures for MR
Efficient MapReduce algorithms are preferred Recall the purpose of combiner
For many algorithms, the bottleneck is moving data among tasks (e.g., from mappers to reducers)
We need cost measures to quantify efficiency Communication
Running time
We do we need both?
22
Cost Measures for MR
1. Communication cost = total I/O of all processes
often, communication is slower than computation (at least for simple algorithms)
2. Elapsedcommunicationcost=maxof I/O along any path
3. (Elapsed)computationcostanalogous, but counts running time of processes
23
Example: Cost Measures
For a map-reduce algorithm:
Communication cost is the sum of:
input file size +
2 (sum of the sizes of all files passed from Map processes to Reduce processes)* +
the sum of the output sizes of the Reduce processes.
Elapsed communication cost is the sum of:
the largest input + output, for any map process,
+
the largest input + output, for any reduce process
* Assuming that the input and output to Map process has the same size 24
What Cost Measures Mean
Either the I/O (communication) or processing (computation) cost dominates
Total cost tells what you pay in rent from your friendly neighborhood cloud
Elapsed computation cost is wall-clock time using parallelism
25
Complexity measures for MR
Key complexity: Over all key-value pairs, input to or output by any Mapper or Reducer, compute
Maximum size
Maximum running time Maximum used memory
Sequential complexity: Over all mappers and reducers, sum
The size of all key-value pairs input and output by the Mappers and Reducers
The total running time for all Mappers and Reducers http://web.stanford.edu/~ashishg/papers/mapreducecomplexity.pdf
26
NoSQL
Material based on Chapters 4 and 7 Thomas Erl’s book Kathleen Durant’s lecture on MongoDB
1
OLTP
We have collected the data and need to store them Two types of “online” (real-time) systems
Online Transaction Processing (OLTP) systems
Store: structured data, in Relational Databases
Queries: simple (insert, delete, update operations)
Relation to Big Data: Big Data analysis results can augment OLTP data
simple queries Relational
Database
Example: point of sale system
Business processes
2
OLTP and OLAP
We have collected the data and need to store them
Online Analytical Processing (OLAP) systems
Store: data in databases that are optimized for analytics
Queries: are long and complex - part of analytic tasks (business intelligence, data mining, machine learning)
Relation to Big Data: serve as sources and sinks (remember Sqoop?)
long-running Database
complex queries Example: predictive analytics system
Data analysis task
3
Relational databases
RDBMS
Long history & big players: Oracle, IBM, MS, ...
Expressive query language: SQL, ...
Optimizations: indices, materialized views, ...
Strong consistency: constraints (think of the point-of-sale system)
Important part of existing infrastructure: MySQL customers include NASA, CERN, BBC News, and many others https://www.mysql.com/customers/industry/
4
Recent changes
Data got bigger and unstructured
More users, across the globe, need 24/7 access
Many apps built quickly
Twitter has >300M active users/month and 1000s of apps fed by its data
Companies shift to Software-as-a-Service (SaaS) and cloud-based solutions
the cloud provider supplies and maintains the DBMS
the customer manages the databases and pays for storage and
computer resources
5
NoSQL
NoSQL databases relax one or more of the ACID properties
Isolation: concurrent and sequential execution of transactions yield the same result
A new user cannot update records being updated by another user Durability: a committed transaction cannot be rolled back
After a record has been updated (committed transaction), the power fails. After power is resumed, the record is up-to-date. Thus, the power failure did not affect the commited transaction.
8