程序代写代做代考 data mining algorithm html database C clock 7CCSMBDT – Big Data Technologies Week 4

7CCSMBDT – Big Data Technologies Week 4
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1

Objectives
Today:
 MapReduce patterns
 Numerical summarization (count, max)  Filtering
 Distinct
Binning (partitioning records into bins)  Sorting
Read: Chapter 3.2 from Bagha
https://github.com/mattwg/mrjob-examples
 MapReduce (join, cost measurement)  NoSQL databases (intro)
2

MapReduce with python
 mrjob
 lets you write MapReduce jobs in Python
2.6+/3.3+ and run them on several platforms
 https://pythonhosted.org/mrjob/index.html
 Reference manual in pdf
 https://media.readthedocs.org/pdf/mrjob/latest/mrjob.pdf
3

MapReduce with python
 mr_word_count.py Empty key and
a line as value
Returns three key-value pairs
For each key,
returns the sum of its values
4

MapReduce with python
 Numerical summarization: for count, max, min, avg, top-N
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

We will discuss count and max.
Min is similar to max.
See Chapter 3.2 from Bagha for avg and top-N
Problem 1: Count total number of times each page is visited in 2014.
“Page visited” means URL appears in Log file
Number of times means lines containing the same URL
5

MapReduce with python
 Numerical summarization: Problem 1
Parses each line of the input
emits (key, value) pair as (url,1)
Receives pairs (key, list_of_values) grouped by key
For each pair, sums its values to compute its count
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] if year==‘2014’:
yield url,1
def reducer(self, key, list_of_values):
yield key, sum(list_of_values)
if __name__ ==‘__main__’: Mrmyjob.run()
Recall .strip() removes leading and trailing whitespace characters
6

MapReduce with python
 Numerical summarization: for count, max, min, avg
 Example:Logfile(Date,Time,URL,IP,visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

month
Problem 2: Most visited page in each month of 2014.
“Page visited” means URL appears in Log file
Most visited means maximum number of times (occurrences of a URL)
7

MapReduce with python
 Numerical summarization: Problem 2
def reducer1(self, key, list_of_values):
from mrjob.job import MRJob from mrjob.step import MRStep class Mrmyjob(MRJob):
def mapper1(self, _, line): data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip()
year=date[0:4] month=date[5:7]
if year==‘2014’:
yield (month, url),1
yield key[0], (sum(list_of_values), key[1]) (key, value) as (month, (count, url))
def reducer2(self, key, list_of_values): yield key, max(list_of_values)
Receives: for each month, the count of each url Emits: for each month, the url and count of visits, for
the most visited url in the month
continues on the next slide
(key, value) as ((month,url),1)
8

MapReduce with python
 Numerical summarization: Problem 2
from mrjob.job import MRJob from mrjob.step import MRStep
class Mrmyjob(MRJob):
def mapper1(self, _, line):
data=line.split() …
yield (month, url),1
def reducer1(self, key, list_of_values): …
def reducer2(self, key, list_of_values): …
def steps(self):
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1), MRStep(reducer=self.reducer2)]
if __name__ ==‘__main__’: Mrmyjob.run()
mrjob by default uses a single MapReduce job to execute the program. We redefine steps() to create two jobs, each defined with MRStep().
Note 3.2 in Bagha’s book uses the deprecated self.mr() method instead of MRStep
https://pythonhosted.org/mrjob/job.html#multi-step-jobs
9

MapReduce with python
 Numerical summarization: Problem 2
2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-06 21:02:03 http://www.cnn2.com 120.62.102.124 7 2014-06-07 21:02:03 http://www.cnn.com 120.62.102.124 7
[cloudera@quickstart Desktop]$ python summarization.py toy_log.txt
Using configs in /etc/mrjob.conf
Creating temp directory /tmp/summarization.cloudera.20170208.120451.098082
Running step 1 of 2…
Running step 2 of 2…
Streaming final output from /tmp/summarization.cloudera.20170208.120451.098082/output…
“01” [1, “http://www.google.com”]
“02” [1, “http://www.google.com”]
“06” [2, “http://www.cnn2.com”]
Removing temp directory /tmp/summarization.cloudera.20170208.120451.098082…
Input file: toy_log.txt
10

MapReduce with python
 Filtering: for getting out (i.e., retrieve) a subset of data
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-12-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-12-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 3: Retrieve all page visits for the page http://www.google.com in Dec 2014.
Example output
http://www.google.com , (2014-12-01 20:01:02 77.32.11.111 3) http://www.google.com , (2014-12-02 23:01:08 77.32.11.111 7)
11

MapReduce with python
 Filtering: Problem 3 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=date[5:7]
Parses each line of the input emits (key, value) pair as (url,(date, …, visit_length))
There is no reducer.
if year==‘2014’ and month==‘12’ and url==‘http://www.google.com’: yield url,(date, time, ip, visit_length)
if __name__ ==‘__main__’: Mrmyjob.run()
12

MapReduce with python
 Distinct: for getting out (i.e., retrieve) distinct values
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 4: Retrieve all distinct IP addresses
Example output
77.32.11.111, None 50.62.22.123, None 120.62.102.124, None
13

MapReduce with python
 Distinct: Problem 4 from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) yield ip, None
def reducer(self, key, list_of_values): yield key, None
if __name__ ==‘__main__’: Mrmyjob.run()
Parses each line of the input emits (key, value) pair as (ip,None)
Receives (key, None) and emits distinct keys
Question: Is reducer necessary here? Why?
14

MapReduce with python
 Binning: for partitioning records into bins (i.e., categories)
 Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7

Problem 5: Partition records of the Log file based on the quarter of 2014 in which the pages have been visited. Note quarters 1,2,3,4 are denoted with Q1, Q2, Q3, Q4.
Example output
Q1, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3) Q1, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7) Q2, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7)
15

MapReduce with python
 Binning: Problem 5
from mrjob.job import MRJob
class Mrmyjob(MRJob): def mapper(self, _, line):
data=line.split()
date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip())
year=date[0:4] month=int(date[5:7]) if year==‘2014’:
if month<=3: yield ”Q1”, (date, time, url, ip, visit_length) elif month<=6: yield ”Q2”, (date, time, url, ip, visit_length) elif month<=9: yield ”Q3”, (date, time, url, ip, visit_length) else: yield ”Q4”, (date, time, url, ip, visit_length) if __name__ ==‘__main__’: Mrmyjob.run() Parses each line of the input emits (key, value) pair as (Quarter_id, record) where Quarter_id depends on the month of 2014 (e.g., Q1 for month<=3) int(date[5:7]) because we use month in if, elseif, else 16 MapReduce with python  Sorting: for sorting with respect to a particular key  Example: Log file (Date, Time, URL, IP, visit_length) 2014-01-01 20:01:02 http://www.google.com 77.32.11.111 3 2014-02-02 23:01:08 http://www.google.com 77.32.11.111 7 2015-03-03 01:02:03 http://www.ibm.com 50.62.22.123 5 2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124 7 ... Problem 6: Sort the pages visited in 2014, with respect to their visit length in decreasing order Example output 7, (2014-02-02 23:01:08 http://www.google.com 77.32.11.111) 7, (2014-06-06 21:02:03 http://www.cnn.com 120.62.102.124) 3, (2014-01-01 20:01:02 http://www.google.com 77.32.11.111) 17 MapReduce with python  Sorting: Problem 6 from mrjob.job import MRJob class Mrmyjob(MRJob): def mapper(self, _, line): data=line.split() date=data[0].strip() time=data[1].strip() url=data[2].strip() ip=data[3].strip() visit_length=int(data[4].strip()) year=date[0:4] month=int(date[5:7]) if year==‘2014’: Parses each line of the input emits (key, value) pair as (None, tuple of tuples) (e.g., None, (7, (2014-01-01,...)) By default, list_of_values in reducer is generator, and list() converts it into list. reverse=True imposes decreasing order. yield None, (visit_length, (date, time, url, ip)) def reducer(self, key, list_of_values): list_of_values=sorted(list(list_of_values), reverse=True) yield None, list_of_values if __name__ ==‘__main__’: Mrmyjob.run() 18 More mrjob examples Bagha’s book https://github.com/mattwg/mrjob-examples https://media.readthedocs.org/pdf/mrjob/latest/mrjob.pdf 19 Example: Join with Map-Reduce  Compute the natural join R(A,B) ⋈ S(B,C)  R and S are each stored in a file  Tuples are pairs (a,b) or (b,c) A B a1 b1 a2 b1 a3 b2 a4 b3 B C b2 c1 b2 c2 b3 c3 A C a3 c1 a3 c2 a4 c3 ⋈ = S R R(A,B) ⋈ S(B,C) 20 Map-Reduce Join  A Map process turns:  Each input tuple R(a,b) into key-value pair (b,(a,R))  Each input tuple S(b,c) into key-value pair (b,(c,S))  Map processes send each key-value pair with key b to Reduce process  Each Reduce process matches all the pairs (b,(a,R)) with all (b,(c,S)) and outputs (a,b,c). 21 Cost Measures for MR  Efficient MapReduce algorithms are preferred Recall the purpose of combiner  For many algorithms, the bottleneck is moving data among tasks (e.g., from mappers to reducers)  We need cost measures to quantify efficiency  Communication  Running time  We do we need both? 22 Cost Measures for MR 1. Communication cost = total I/O of all processes often, communication is slower than computation (at least for simple algorithms) 2. Elapsedcommunicationcost=maxof I/O along any path 3. (Elapsed)computationcostanalogous, but counts running time of processes 23 Example: Cost Measures  For a map-reduce algorithm:  Communication cost is the sum of:  input file size +  2  (sum of the sizes of all files passed from Map processes to Reduce processes)* +  the sum of the output sizes of the Reduce processes.  Elapsed communication cost is the sum of:  the largest input + output, for any map process, +  the largest input + output, for any reduce process * Assuming that the input and output to Map process has the same size 24 What Cost Measures Mean  Either the I/O (communication) or processing (computation) cost dominates  Total cost tells what you pay in rent from your friendly neighborhood cloud  Elapsed computation cost is wall-clock time using parallelism 25 Complexity measures for MR  Key complexity: Over all key-value pairs, input to or output by any Mapper or Reducer, compute  Maximum size  Maximum running time  Maximum used memory  Sequential complexity: Over all mappers and reducers, sum  The size of all key-value pairs input and output by the Mappers and Reducers  The total running time for all Mappers and Reducers http://web.stanford.edu/~ashishg/papers/mapreducecomplexity.pdf 26 NoSQL Material based on Chapters 4 and 7 Thomas Erl’s book Kathleen Durant’s lecture on MongoDB 1 OLTP  We have collected the data and need to store them  Two types of “online” (real-time) systems  Online Transaction Processing (OLTP) systems  Store: structured data, in Relational Databases  Queries: simple (insert, delete, update operations)  Relation to Big Data: Big Data analysis results can augment OLTP data simple queries Relational Database Example: point of sale system Business processes 2 OLTP and OLAP  We have collected the data and need to store them  Online Analytical Processing (OLAP) systems  Store: data in databases that are optimized for analytics  Queries: are long and complex - part of analytic tasks (business intelligence, data mining, machine learning)  Relation to Big Data: serve as sources and sinks (remember Sqoop?) long-running Database complex queries Example: predictive analytics system Data analysis task 3 Relational databases  RDBMS  Long history & big players: Oracle, IBM, MS, ...  Expressive query language: SQL, ...  Optimizations: indices, materialized views, ...  Strong consistency: constraints (think of the point-of-sale system)  Important part of existing infrastructure: MySQL customers include NASA, CERN, BBC News, and many others https://www.mysql.com/customers/industry/ 4 Recent changes  Data got bigger and unstructured  More users, across the globe, need 24/7 access  Many apps built quickly  Twitter has >300M active users/month and 1000s of apps fed by its data
 Companies shift to Software-as-a-Service (SaaS) and cloud-based solutions
 the cloud provider supplies and maintains the DBMS
 the customer manages the databases and pays for storage and
computer resources

Reuters built its own algorithmic prediction tool to help it spot (and verify) breaking news on Twitter


5

NoSQL
 NoSQL databases relax one or more of the ACID properties
 Isolation: concurrent and sequential execution of transactions yield the same result
 A new user cannot update records being updated by another user  Durability: a committed transaction cannot be rolled back
 After a record has been updated (committed transaction), the power fails. After power is resumed, the record is up-to-date. Thus, the power failure did not affect the commited transaction.
8