程序代写代做代考 Java hbase go database html JDBC data science file system Hive graph hadoop clock 7CCSMBDT – Big Data Technologies Week 2

7CCSMBDT – Big Data Technologies Week 2
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Sections 1.5, 1.6 , 5.3.1, 5.3.2 from Bagha’s book
https://flume.apache.org/FlumeUserGuide.html
Spring 2017/2018
1

Analytics flow for big data
TODAY’s FOCUS
Data The data is collected and ingested into a big data stack collection
Data Preparation
Issues required for meaningful processing are resolved
Analysis types
The type of analysis is determined
The mode of analysis is determined Visualizations
FIRST, WE WILL BRIEFLY COVER THESE STEPS
Analysis modes
The analysis results are presented to the user
2

Data preparation
 Data preparation
 Data cleaning (correct typos, misspelled values)
 Data wrangling (convert data from a raw format to another)
 De-duplication (eliminate duplicate copies of data)
 Normalization (convert values to the same scales)
 Sampling (create a sample on which the analysis will be performed)
 Filtering (removes outliers or incorrect out-of-range values)
3

Data preparation
 OpenRefine (formerly known as GoogleRefine)
 Open source tool for data cleaning, data wrangling, de-duplication in a spreadsheet-like environment
 From string manipulation to translating street addresses to lat/lng coordinates to plot data on maps
https://github.com/OpenRefine/OpenRefine/wiki/Geocoding
 Watch the 3 videos on the main page of http://openrefine.org/
4

Analysis types & modes
 Analysis types
 Basic statistics
 Graph analysis
 Classification
 Regression
 Frequent pattern mining …
 Analysis modes: Depending on the application, each type can run in
 Batch mode: results updated “infrequently” (after days or months)
 Real-time mode: results updated “frequently” (after few seconds)
 Interactive mode: results updated “on demand” as answer to queries
5

Technologies by analysis mode
 Batch mode
 Hadoop / MapReduce: framework for distributed data processing
 Pig: high-level language to write MapReduce programs
 Spark: cluster computing framework; various data analytics components
 Solr: scalable framework for searching data
 Real-time mode
 Spark Streaming component: for stream processing
 Storm: for stream processing
 Interactive mode
 Hive: data warehousing framework built on HDFS, uses SQL-like
language
 Spark SQL component: SQL-like queries within Spark programs
6

Visualizations
 Service database: to store and query the data analysis results
 Web framework: to build web-based tools for presenting the results
 Lightning http://lightning-viz.org/ : provides API-based access to reproducible web visualizations
 Pygal http://www.pygal.org/en/stable/ : python library
 Seaborn http://seaborn.pydata.org/ : python library
 PyQtgraph http://www.pyqtgraph.org/ : python library
7

Visualizations
 Static: The analysis results are stored in a database (e.g., MySQL, DynamoDB, MongoDB) and displayed
 Dynamic: The analysis results are updated regularly and displayed (using live widgets, plots, gauges)
 Interactive: The analysis results are displayed on demand, based on user input
* https://www.youtube.com/watch?v=FYnM84TgzZU
8

Data collection
 Data access connectors:
 tools and frameworks to collect and ingest data from various sources into big data storage & analytics frameworks
 5 common types, each used by several tools
 their choice depends on the type of data source
1. Publish-subscribe messaging (used by Apache Kafka, Amazon Kinesis)
 Publishers send messages to a topic managed by a broker (intermediary)
 Subscribers subscribe to topics
 Brokers route messages from publishers to subscribers 9

Data collection
 Data access connectors:
2. Source-sink connectors (used by Apache Flume)
 Source connectors import data from another system (e.g., relational database) into a centralized data store (e.g., distributed file system)
 Sink connectors export data to another system (e.g., HDFS)
3. Database connectors (used by Apache Sqoop)
 Import data from relational DBMSes into big data storage and analytics frameworks
10

Data collection
 Data access connectors:
4. Messaging Queues
(by RabbitMQ, ZeroMQ, AmazonSQS)
 Producers push data to the queues
 Consumers pull the data from the queues
 Producers and consumers do not need to be aware of each other
5. Custom connectors
(used e.g., to collect data from social networks, NoSQL databases, or Internet-of-Things)
 Built based on the data sources and data collection requirements  For Twitter e.g., https://dev.twitter.com/resources/twitter-libraries
11

Database connector: Apache sqoop
 Sqoop http://sqoop.apache.org/
 Open-source command line tool Scoop1
 Imports data from RDBMS into HDFS
 Exports data from HDFS back to RDBMS
 Built-in support for RDBMS: MySQL, PostgreSQL, Oracle, SQL Server, DB2, and Netezza
Complete reference, for commands
http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_using_command_aliases
12

Apache sqoop
 Sqoop import command
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities
JDBC is a set of classes and interfaces written in Java that allows Java programs to send SQL statements to a database
13
• database to connect to
• username in database
• password in database
• database table to import

Apache sqoop
 Sqoop import command – an example
• database to connect to
• username in database
• password in database
• database table to import
The result of this command will be a comma-separated CSV file where each row is stored in a single line.
1,UK,London 2,France,Paris …
This file will be stored into the HDFS (not in the local filesystem)
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities
14

Apache sqoop
 Sqoop import command – the process
The table to be imported is examined.
At this point no data transfer. Just queries DBMS for the metadata of the
table.
JAVA code is generated (a class for a table, a method for an attribute, plus methods to interact with JDBC)
Connects to Hadoop cluster and submits a MapReduce job (specified by the JAVA code), which will transfer the data from DBMS to HDFS in parallel.
15

Apache sqoop
 Sqoop import command – parallelism
 More mappers can transfer data in parallel faster, but increase the
number of concurrent queries to DBMS (trade-off needed).
 Parameter m is a hint to Sqoop for the number of mappers to use.
 Split-by specifies the attribute used to partition the table, so that each part corresponds to a mapper
Reading table in parallel
• Split the table horizontally, w.r.t. “split-by” attribute with parameter: split-by=id
• Use “m” mappers, each retrieves some rows
Select id, A1, A2 from table where id>=0 AND id<2000 Select id, A1, A2 from table where id>=2000 AND id<4000 ... Select id, A1, A2 from table where id>=8000 AND id<10000 id A1 A2 1 ... 1999 2000 ... ... 10000 16 Apache sqoop  Sqoop import - more examples  Import “widgets” table into HDFS %scoop import --connect jdbc:mysql://localhost/hadoopguide \ --username U --password P --table widgets -m 8 --columns “price,design_date” %scoop import --connect jdbc:mysql://localhost/hadoopguide \ --username U --password P –m 8 \ --query “SELECT * FROM widgets WHERE price>3.00“ –direct
Vendor specific connectors with “–direct” https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#connectors
17
Number of mappers (level of parallelism)
Uses mysql connector for better performance

Apache sqoop
 Sqoop import example – updates
o What if we insert new rows after importing “widgets”? — incremental append
parameters:
–check column (column name to be checked for updates) –last-value (the last value that was
successfully imported into HDFS)
oWhat if we update existing rows?
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table visits \
–incremental append \
–check-column id \
–last-value 1
–incremental lastmodified \ –check-column update_date \ –last-value “2013-05-22 01:01:01″
update_date: column with date of last update
18

Apache sqoop
 Sqoop export – an example
 Export transfers data from HDFS into relational databases.
• database to connect to
• username in database
• password in database
• database table to export to
• HDFS directory whose data will be transferred to table cities
sqoop export \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities \
–export-dir cities
19

Apache sqoop
(3) The JAVA code is used in the submitted MapReduce Job that will export the data.
(4) For efficiency, “m” mappers write data in parallel, an INSERT command may transfer multiple rows
(2) Generates JAVA code to parse records from text files and generate INSERT statements
(1) Picks up a strategy for the target table
 Sqoop export – the process
20

Apache sqoop
 Sqoop export
 We first need to create a mysql table and/or db – why?
 Both char(64) and varchar(64) can hold Java String, but different to the db.
 Then, we use export command
21

Source-sink connector Apache flume
 System for collecting, aggregating, and moving data
 from different sources
server logs, databases, social media, IoT devices
 into a centralized (big) data store distributed file system or NoSQL database
 Advantages over ad-hoc solutions
 Reliable, Scalable, High performance
 Manageable, Customizable
 Low-cost installation, operation and maintenance
Slides based on: Chapter 5.3.1 from Big Data Science & Analytics
A. Prabhakar Planning and Deploying Apache Flume
https://flume.apache.org/FlumeUserGuide.html
Book for Flume (not required, suggested if you want to learn more):
http://shop.oreilly.com/product/0636920030348.do
22

Apache flume
 Architecture
 Distributed pipeline architecture (i.e., agents can be connected to each other and multiple agents can receive or output data to different destinations (e.g., file systems) in parallel).
 Optimized for different data sources and destinations
 Built-in support for contextual routing (i.e., we can specify the route
of events to any particular source).
 Fully customizable and extendable
23

Apache flume
 Event: a unit of data flow having byte payload (a byte array) and possibly a set of attributes (headers)
Headers
Payload
 Payload is opaque to Flume  Headers
 can be used for contextual routing (e.g., which events should be transferred first)
 are an unordered collection (map) of string key-value pairs
24

Apache flume
 Agent: a process that hosts components through which the events flow / transported from a place to another
 Source: receives data from data generators and transfers it to one or more channels
 requires at least one channel to function
 specialized sources for integration with well-known systems
25

Apache flume
 Different types of sources • Avro
• Thrift
• Exec
• Twitter
• NetCat • HTTP
• Custom •…
# Describing/Configuring the source TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource TwitterAgent.sources.Twitter.consumerKey =
Your OAuth consumer key TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret TwitterAgent.sources.Twitter.accessToken =
Your OAuth consumer key access token TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret TwitterAgent.sources.Twitter.keywords =
bigdata, mapreduce, mahout, hbase, nosql
It runs a web server that listens in a particular port and writes events, based on the HTTP (POST) requests sent to it, into its associated channel(s).
Data serialization frameworks
Data from stdout Tweets from Twitter API
Data from port written by netcat
HTTP POST events
26

Apache flume
 Agent
 Channel: a transient store which buffers events until they are consumed by sinks
 a channel can work with any number of sources and sinks
 different channels offer different levels of durability (memory, file, db)
27

Apache flume
 Why Channels?
 Insulate downstream from load spikes (e.g., sink fails and cannot consume data, or too much data come from source due to peak hour)
 Persistent data store, in case the process restarts
 Provides transactional guarantees about the data that are written in a channel (more on this later)
28

Apache flume
 Different types of channels • Memory
• File
• JDBC
• Custom •…
Stores events in memory (queue). Provides high throughput.
Data lost if agent fails.
Stores events in local files in disk(s). Uses checkpoint file (state of an in-memory queue)
Stores events in a db. Durable channel that is ideal for flows where recoverability is important.
29

Apache flume
 Agent
 Sink: removes events from a channel and transmits them to their next hop destination
 different types of sinks
 requires exactly one channel to function
30

Apache flume
 Different types of sinks • HDFS
Data written as a file (e.g., SequenceFile, DataStream). Stops based on time, size, #events.
• Hbase
• File Roll
• Avro
• Thrift
• Custom •…
Data written as an Hbase (column-oriented db) table. Data written as a local file
New file is written
31

Apache flume
 Agent
 Allows for different configurations, multi-hop flows
Web server
Source
Channel Channel
Sink Sink
Source
Channel
Sink
HDFS
32

Apache flume
 Agent
 Allows for different configurations, multi-hop flows
33

Apache flume
 Additional components of an agent  Interceptor
 Channel processor
 Sink selector
Interceptor
 Interceptor: Applied to source to modify, filter, or drop events  Different types to add different information to headers
 Timestamp, host , static marker (e.g., country), regular expression
 Custom
34

Apache flume
 Additional components of an agent
Source
Channel Channel
Sink Sink
Channel selector
 Channel selector: When there are multiple channels, it defines policy about distributing events to the channels
 If there are interceptors, the channel selector it is applied after them, using the events that were modified by the interceptors
35

Apache flume
 Different types of channel selector
 Replicating: The default option. Replicates (i.e., duplicates)
events to all connected channels.
 Multiplexing: Distributes events to all connected channels based
on header attribute A of event and mapping properties (values of A)
 Custom: It can use dynamic criteria.
Events with state=CZ will go to channel c1
36

Apache flume
 Additional components of an agent
 Sink processor: invokes one sink from a specified group of sinks (i.e., determines which of the sinks will pull events out of its channel).
 Default: no sink processor, single channel
Source
Channel Channel
Sink Sink
Sink processor
37

Apache flume
 Different types of sink processor
Load balancing: provides load balancing capabilities over all sinks inside the group.
 Load is distributed to sinks selected randomly or in a round robin fashion
 If sink fails, next available sink is selected
 Thus, the next agents may not all receive the same amount of data, which is
bad. This is addressed by failsafe processor (next slide)
 Failed sinks can be blacklisted for a given timeout that increases exponentially if sink is still failed after the timeout
38

Apache flume
 Different types of sink processor
Failover: guarantees that events will be processed, as
long as there are available sinks
 Unique priorities are assigned to sinks
 The sink with the highest priority writes data until it fails
 If a sink fails while sending an event, it is moved to a pool to “cool down” for a maxpenalty time period, and the next sink with highest priority is tried.
39

Apache flume
1. Interceptor
Source
Channel Channel
Agent
Sink Sink
Web Server
 Client (web server) send events to agent
HDFS
File roll
2. Channel selector
 Source(s) operating with the Agent
 receive events
 pass events through interceptors
 events put on the channel identified by channel selector
 sink processor invokes a sink
 Invoked sink takes events from channel and sends them to the next hop destination
 If event transmission fails, sink processor takes secondary action
3. Sink processor
40

Apache flume
 Transactional data exchange to guarantee no data loss
 Source uses transactions to write to channel. Transaction commits, after events are written to the channel, otherwise transaction is rolled back.
 Sink uses transactions to read from channel. Transaction commits after successful data transfer, and then the events can be removed from the channel. If data transfer fails, transaction is rolled back.
41

Messaging queues again
 Messaging Queue
Producers push data to the queue
Consumers pull the data from the queue
Producers and consumers do not need to be aware of each other
Producer Producer
Producer
Messaging queue
Consumer
42

Messaging queues again
 Messaging Queues prevent consumers from being overloaded due to high velocity of BigData. That is, they provide load levelling.
Producer Producer
Producer
Messaging queue
Consumer
 Load levelling because they  act as buffers
 allow decoupling between producers and consumer
 allow producers and consumers to operate asynchronously
 allow dynamic scaling up/down in terms of consumers
 offer reliability and availability (e.g., consumer fails but producers keep
pushing messages to the queue)
43

Messaging queues again
 Messaging Queues balance load between multiple consumers
Producer Producer
Producer
Messaging queue
Consumer Consumer
Consumer
 Load balancing because they
 allow scaling up in terms of consumers, each consumer gets equal load  offer reliability and availability
 (e.g., the queue delivers a message to multiple consumers but a consumer does not know which other consumers got it. If the consumer fails while processing the message, the message becomes visible to another consumer).
44

Example of messaging queue: ZeroMQ
 Asynchronous messaging library for distributed or concurrent applications.
 Runs without a dedicated message broker.
 This means you don’t see the “Messaging queue” in the architecture.  You do your own auditing and recovery if something fails.
 Special patterns for reliability and for load balancing
 Can make an application elastic on a large network:
 Routes messages in a variety of patterns (including push/pull, pub/sub)  Queues messages automatically
 Can deal with over-full queues
 No imposed format on messages
 Delivers whole messages
45

ZeroMQ
 ZeroMQ allows implementing different messaging patterns over different channels (e.g., TCP).
 Library for many languages (e.g., C, Java, Python). Can run on any OS.  Based on sockets* which carry messages (i.e., carry data)
 It uses functions for socket programming:
 to create, destroy, and connect sockets with each other  to send and receive messages
* a socket allows two applications that are usually on different parts of the network to communicate 46

Extra slides: ZeroMQ
Extra slides are not part of the course, they are useful if you want to learn more on the topic of ZeroMQ and try it yourself.
 To install ZeroMQ for python: pip install zmq
FREE Guide for ZeroMQ http://zguide.zeromq.org/page:all
 We will examine the pipeline pattern, but there are many other patterns
 (e.g., http://zguide.zeromq.org/page:all#advanced-request-reply for reliability
and http://zguide.zeromq.org/page:all for load balancing patterns) 47

ZeroMQ
 Simple pipeline pattern
 Ventilator: PUSHes task that is processed in parallel  Workers: Do the processing work
 Two types of sockets  zmq.PUSH
 Sends messages to downstream pipeline nodes. Messages are round-robined to all connected downstream nodes.
 zmq.PULL
 Receives messages from upstream pipeline nodes. Messages
are fair-queued (interleaved) from among all connected upstream nodes.
48

Extra slides: ZeroMQ
 Simple pipeline example
 One ventilator sends messages to one worker
 Recall the messaging queue is taken care of internally by ZeroMQ
import sys
import zmq
from multiprocessing import Process import time
def worker():
context = zmq.Context() # creates a Context (needed for socket creation)
work_receiver = context.socket(zmq.PULL) #creates socket of type zmq.PULL work_receiver.connect(“tcp://127.0.0.1:5557”) #connects to remote socket of ventilator
for task_nbr in range(10000000):
message = work_receiver.recv() #receives a message from the remote socket of ventilator
sys.exit(1)
Code continues on next page
49

Extra slides: ZeroMQ
 Simple pipeline example (continues from previous slide)
def main():
#see https://docs.python.org/2/library/multiprocessing.html#the-process-class
#for details on multiprocessing
Process(target=worker, args=()).start()
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH) #creates a socket of type zmq.PUSH
ventilator_send.bind(“tcp://127.0.0.1:5557″)
for num in range(10000000): ventilator_send.send(b”MESSAGE”)
#socket listens so that worker’s socket can #connect
#send a message on ventilator’s socket (so that #worker’s can receive it)
if __name__ == “__main__”:
start_time = time.time()
main()
end_time = time.time()
duration = end_time – start_time
msg_per_sec = 10000000 / duration print(“Duration: %s”,duration)
print(“Messages Per Second: %s”, msg_per_sec)
50

Extra slides: ZeroMQ
Pipeline example with a Sink
This is a more complex example.
Again:
• A ventilator that produces tasks that can be done in parallel
But now
• A set of workers that process tasks in parallel
• A sink collects results from the workers
FREE Guide for ZeroMQ http://zguide.zeromq.org/page:all 51

Extra slides: ZeroMQ
Example push/pull application
• Ventilator generates 100 tasks, each telling a worker to sleep for some ms
import zmq, random, time try:
raw_input
except NameError: # Python 3 raw_input = input
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH) sender.bind(“tcp://*:5557”)
# Socket with direct access to the sink: used to synchronize start of batch [Sync is a must] sink = context.socket(zmq.PUSH) sink.connect(“tcp://localhost:5558”)
print(“Press Enter when the workers are ready: “) _ = raw_input()
print(“Sending tasks to workers…”)
# The first message is “0” and signals start of batch sink.send(b’0′)
# Initialize random number generator
random.seed()
# Send 100 tasks [Scaling to more is easy]
total_msec = 0
for task_nbr in range(100):
# Random workload from 1 to 100 msecs
workload = random.randint(1, 100) total_msec += workload sender.send_string(u’%i’ % workload)
print(“Total expected cost: %s msec” % total_msec)
# Give 0MQ time to deliver
time.sleep(1)
52

Extra slides: ZeroMQ
Example push/pull application
• Worker receives a message, sleeps, and signals that it’s finished.
import sys, time, zmq context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL) receiver.connect(“tcp://localhost:5557”)
# Socket to send messages on
sender = context.socket(zmq.PUSH) sender.connect(“tcp://localhost:5558”)
# Process tasks forever
while True:
s = receiver.recv()
# Simple progress indicator for the user to #see
sys.stdout.write(‘.’)
sys.stdout.flush()
# Do the work
time.sleep(int(s)*0.001) # Send results to sink sender.send(b’’)
53

Extra slides: ZeroMQ
Example push/pull application
• Sink collects the 100 tasks and calculates how long the overall processing took.
import sys, time, zmq context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL) receiver.connect(“tcp://*:5558”)
# Wait for start of batch
s = receiver.recv()
# Start our clock
tstart=time.time()
# Process 100 confirmations
for task_nbr in range(100): s = receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(‘:’) else:
sys.stdout.write(‘.’) sys.stdout.flush()
# Calculate and report duration of batch
tend = time.time()
print(“Total elapsed time: %d msec” %
((tend-tstart)*1000))
The sink’s PULL socket collects results from workers evenly. This is called fair-queuing.
54