7CCSMBDT – Big Data Technologies Week 2
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Sections 1.5, 1.6 , 5.3.1, 5.3.2 from Bagha’s book
https://flume.apache.org/FlumeUserGuide.html
Spring 2017/2018
1
Analytics flow for big data
TODAY’s FOCUS
Data The data is collected and ingested into a big data stack collection
Data Preparation
Issues required for meaningful processing are resolved
Analysis types
The type of analysis is determined
The mode of analysis is determined Visualizations
FIRST, WE WILL BRIEFLY COVER THESE STEPS
Analysis modes
The analysis results are presented to the user
2
Data preparation
Data preparation
Data cleaning (correct typos, misspelled values)
Data wrangling (convert data from a raw format to another)
De-duplication (eliminate duplicate copies of data)
Normalization (convert values to the same scales)
Sampling (create a sample on which the analysis will be performed)
Filtering (removes outliers or incorrect out-of-range values)
3
Data preparation
OpenRefine (formerly known as GoogleRefine)
Open source tool for data cleaning, data wrangling, de-duplication in a spreadsheet-like environment
From string manipulation to translating street addresses to lat/lng coordinates to plot data on maps
https://github.com/OpenRefine/OpenRefine/wiki/Geocoding
Watch the 3 videos on the main page of http://openrefine.org/
4
Analysis types & modes
Analysis types
Basic statistics
Graph analysis
Classification
Regression
Frequent pattern mining …
Analysis modes: Depending on the application, each type can run in
Batch mode: results updated “infrequently” (after days or months)
Real-time mode: results updated “frequently” (after few seconds)
Interactive mode: results updated “on demand” as answer to queries
5
Technologies by analysis mode
Batch mode
Hadoop / MapReduce: framework for distributed data processing
Pig: high-level language to write MapReduce programs
Spark: cluster computing framework; various data analytics components
Solr: scalable framework for searching data
Real-time mode
Spark Streaming component: for stream processing
Storm: for stream processing
Interactive mode
Hive: data warehousing framework built on HDFS, uses SQL-like
language
Spark SQL component: SQL-like queries within Spark programs
6
Visualizations
Service database: to store and query the data analysis results
Web framework: to build web-based tools for presenting the results
Lightning http://lightning-viz.org/ : provides API-based access to reproducible web visualizations
Pygal http://www.pygal.org/en/stable/ : python library
Seaborn http://seaborn.pydata.org/ : python library
PyQtgraph http://www.pyqtgraph.org/ : python library
7
Visualizations
Static: The analysis results are stored in a database (e.g., MySQL, DynamoDB, MongoDB) and displayed
Dynamic: The analysis results are updated regularly and displayed (using live widgets, plots, gauges)
Interactive: The analysis results are displayed on demand, based on user input
* https://www.youtube.com/watch?v=FYnM84TgzZU
8
Data collection
Data access connectors:
tools and frameworks to collect and ingest data from various sources into big data storage & analytics frameworks
5 common types, each used by several tools
their choice depends on the type of data source
1. Publish-subscribe messaging (used by Apache Kafka, Amazon Kinesis)
Publishers send messages to a topic managed by a broker (intermediary)
Subscribers subscribe to topics
Brokers route messages from publishers to subscribers 9
Data collection
Data access connectors:
2. Source-sink connectors (used by Apache Flume)
Source connectors import data from another system (e.g., relational database) into a centralized data store (e.g., distributed file system)
Sink connectors export data to another system (e.g., HDFS)
3. Database connectors (used by Apache Sqoop)
Import data from relational DBMSes into big data storage and analytics frameworks
10
Data collection
Data access connectors:
4. Messaging Queues
(by RabbitMQ, ZeroMQ, AmazonSQS)
Producers push data to the queues
Consumers pull the data from the queues
Producers and consumers do not need to be aware of each other
5. Custom connectors
(used e.g., to collect data from social networks, NoSQL databases, or Internet-of-Things)
Built based on the data sources and data collection requirements For Twitter e.g., https://dev.twitter.com/resources/twitter-libraries
11
Database connector: Apache sqoop
Sqoop http://sqoop.apache.org/
Open-source command line tool Scoop1
Imports data from RDBMS into HDFS
Exports data from HDFS back to RDBMS
Built-in support for RDBMS: MySQL, PostgreSQL, Oracle, SQL Server, DB2, and Netezza
Complete reference, for commands
http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_using_command_aliases
12
Apache sqoop
Sqoop import command
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities
JDBC is a set of classes and interfaces written in Java that allows Java programs to send SQL statements to a database
13
• database to connect to
• username in database
• password in database
• database table to import
Apache sqoop
Sqoop import command – an example
• database to connect to
• username in database
• password in database
• database table to import
The result of this command will be a comma-separated CSV file where each row is stored in a single line.
1,UK,London 2,France,Paris …
This file will be stored into the HDFS (not in the local filesystem)
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities
14
Apache sqoop
Sqoop import command – the process
The table to be imported is examined.
At this point no data transfer. Just queries DBMS for the metadata of the
table.
JAVA code is generated (a class for a table, a method for an attribute, plus methods to interact with JDBC)
Connects to Hadoop cluster and submits a MapReduce job (specified by the JAVA code), which will transfer the data from DBMS to HDFS in parallel.
15
Apache sqoop
Sqoop import command – parallelism
More mappers can transfer data in parallel faster, but increase the
number of concurrent queries to DBMS (trade-off needed).
Parameter m is a hint to Sqoop for the number of mappers to use.
Split-by specifies the attribute used to partition the table, so that each part corresponds to a mapper
Reading table in parallel
• Split the table horizontally, w.r.t. “split-by” attribute with parameter: split-by=id
• Use “m” mappers, each retrieves some rows
Select id, A1, A2 from table where id>=0 AND id<2000 Select id, A1, A2 from table where id>=2000 AND id<4000 ...
Select id, A1, A2 from table where id>=8000 AND id<10000
id
A1
A2
1
...
1999
2000
...
...
10000
16
Apache sqoop
Sqoop import - more examples Import “widgets” table into HDFS
%scoop import --connect jdbc:mysql://localhost/hadoopguide \
--username U --password P --table widgets -m 8 --columns “price,design_date”
%scoop import --connect jdbc:mysql://localhost/hadoopguide \ --username U --password P –m 8 \
--query “SELECT * FROM widgets WHERE price>3.00“ –direct
Vendor specific connectors with “–direct” https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#connectors
17
Number of mappers (level of parallelism)
Uses mysql connector for better performance
Apache sqoop
Sqoop import example – updates
o What if we insert new rows after importing “widgets”? — incremental append
parameters:
–check column (column name to be checked for updates) –last-value (the last value that was
successfully imported into HDFS)
oWhat if we update existing rows?
sqoop import \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table visits \
–incremental append \
–check-column id \
–last-value 1
–incremental lastmodified \ –check-column update_date \ –last-value “2013-05-22 01:01:01″
update_date: column with date of last update
18
Apache sqoop
Sqoop export – an example
Export transfers data from HDFS into relational databases.
• database to connect to
• username in database
• password in database
• database table to export to
• HDFS directory whose data will be transferred to table cities
sqoop export \
–connect jdbc:mysql://mysql.example.com/sqoop \ –username sqoop \
–password sqoop \
–table cities \
–export-dir cities
19
Apache sqoop
(3) The JAVA code is used in the submitted MapReduce Job that will export the data.
(4) For efficiency, “m” mappers write data in parallel, an INSERT command may transfer multiple rows
(2) Generates JAVA code to parse records from text files and generate INSERT statements
(1) Picks up a strategy for the target table
Sqoop export – the process
20
Apache sqoop
Sqoop export
We first need to create a mysql table and/or db – why?
Both char(64) and varchar(64) can hold Java String, but different to the db.
Then, we use export command
21
Source-sink connector Apache flume
System for collecting, aggregating, and moving data
from different sources
server logs, databases, social media, IoT devices
into a centralized (big) data store distributed file system or NoSQL database
Advantages over ad-hoc solutions
Reliable, Scalable, High performance
Manageable, Customizable
Low-cost installation, operation and maintenance
Slides based on: Chapter 5.3.1 from Big Data Science & Analytics
A. Prabhakar Planning and Deploying Apache Flume
https://flume.apache.org/FlumeUserGuide.html
Book for Flume (not required, suggested if you want to learn more):
http://shop.oreilly.com/product/0636920030348.do
22
Apache flume
Architecture
Distributed pipeline architecture (i.e., agents can be connected to each other and multiple agents can receive or output data to different destinations (e.g., file systems) in parallel).
Optimized for different data sources and destinations
Built-in support for contextual routing (i.e., we can specify the route
of events to any particular source).
Fully customizable and extendable
23
Apache flume
Event: a unit of data flow having byte payload (a byte array) and possibly a set of attributes (headers)
Headers
Payload
Payload is opaque to Flume Headers
can be used for contextual routing (e.g., which events should be transferred first)
are an unordered collection (map) of string key-value pairs
24
Apache flume
Agent: a process that hosts components through which the events flow / transported from a place to another
Source: receives data from data generators and transfers it to one or more channels
requires at least one channel to function
specialized sources for integration with well-known systems
25
Apache flume
Different types of sources • Avro
• Thrift
• Exec
• Twitter
• NetCat • HTTP
• Custom •…
# Describing/Configuring the source TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource TwitterAgent.sources.Twitter.consumerKey =
Your OAuth consumer key TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret TwitterAgent.sources.Twitter.accessToken =
Your OAuth consumer key access token TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret TwitterAgent.sources.Twitter.keywords =
bigdata, mapreduce, mahout, hbase, nosql
It runs a web server that listens in a particular port and writes events, based on the HTTP (POST) requests sent to it, into its associated channel(s).
Data serialization frameworks
Data from stdout Tweets from Twitter API
Data from port written by netcat
HTTP POST events
26
Apache flume
Agent
Channel: a transient store which buffers events until they are consumed by sinks
a channel can work with any number of sources and sinks
different channels offer different levels of durability (memory, file, db)
27
Apache flume
Why Channels?
Insulate downstream from load spikes (e.g., sink fails and cannot consume data, or too much data come from source due to peak hour)
Persistent data store, in case the process restarts
Provides transactional guarantees about the data that are written in a channel (more on this later)
28
Apache flume
Different types of channels • Memory
• File
• JDBC
• Custom •…
Stores events in memory (queue). Provides high throughput.
Data lost if agent fails.
Stores events in local files in disk(s). Uses checkpoint file (state of an in-memory queue)
Stores events in a db. Durable channel that is ideal for flows where recoverability is important.
29
Apache flume
Agent
Sink: removes events from a channel and transmits them to their next hop destination
different types of sinks
requires exactly one channel to function
30
Apache flume
Different types of sinks • HDFS
Data written as a file (e.g., SequenceFile, DataStream). Stops based on time, size, #events.
• Hbase
• File Roll
• Avro
• Thrift
• Custom •…
Data written as an Hbase (column-oriented db) table. Data written as a local file
New file is written
31
Apache flume
Agent
Allows for different configurations, multi-hop flows
Web server
Source
Channel Channel
Sink Sink
Source
Channel
Sink
HDFS
32
Apache flume
Agent
Allows for different configurations, multi-hop flows
33
Apache flume
Additional components of an agent Interceptor
Channel processor
Sink selector
Interceptor
Interceptor: Applied to source to modify, filter, or drop events Different types to add different information to headers
Timestamp, host , static marker (e.g., country), regular expression
Custom
34
Apache flume
Additional components of an agent
Source
Channel Channel
Sink Sink
Channel selector
Channel selector: When there are multiple channels, it defines policy about distributing events to the channels
If there are interceptors, the channel selector it is applied after them, using the events that were modified by the interceptors
35
Apache flume
Different types of channel selector
Replicating: The default option. Replicates (i.e., duplicates)
events to all connected channels.
Multiplexing: Distributes events to all connected channels based
on header attribute A of event and mapping properties (values of A)
Custom: It can use dynamic criteria.
Events with state=CZ will go to channel c1
36
Apache flume
Additional components of an agent
Sink processor: invokes one sink from a specified group of sinks (i.e., determines which of the sinks will pull events out of its channel).
Default: no sink processor, single channel
Source
Channel Channel
Sink Sink
Sink processor
37
Apache flume
Different types of sink processor
Load balancing: provides load balancing capabilities over all sinks inside the group.
Load is distributed to sinks selected randomly or in a round robin fashion
If sink fails, next available sink is selected
Thus, the next agents may not all receive the same amount of data, which is
bad. This is addressed by failsafe processor (next slide)
Failed sinks can be blacklisted for a given timeout that increases exponentially if sink is still failed after the timeout
38
Apache flume
Different types of sink processor
Failover: guarantees that events will be processed, as
long as there are available sinks
Unique priorities are assigned to sinks
The sink with the highest priority writes data until it fails
If a sink fails while sending an event, it is moved to a pool to “cool down” for a maxpenalty time period, and the next sink with highest priority is tried.
39
Apache flume
1. Interceptor
Source
Channel Channel
Agent
Sink Sink
Web Server
Client (web server) send events to agent
HDFS
File roll
2. Channel selector
Source(s) operating with the Agent
receive events
pass events through interceptors
events put on the channel identified by channel selector
sink processor invokes a sink
Invoked sink takes events from channel and sends them to the next hop destination
If event transmission fails, sink processor takes secondary action
3. Sink processor
40
Apache flume
Transactional data exchange to guarantee no data loss
Source uses transactions to write to channel. Transaction commits, after events are written to the channel, otherwise transaction is rolled back.
Sink uses transactions to read from channel. Transaction commits after successful data transfer, and then the events can be removed from the channel. If data transfer fails, transaction is rolled back.
41
Messaging queues again
Messaging Queue
Producers push data to the queue
Consumers pull the data from the queue
Producers and consumers do not need to be aware of each other
Producer Producer
Producer
Messaging queue
Consumer
42
Messaging queues again
Messaging Queues prevent consumers from being overloaded due to high velocity of BigData. That is, they provide load levelling.
Producer Producer
Producer
Messaging queue
Consumer
Load levelling because they act as buffers
allow decoupling between producers and consumer
allow producers and consumers to operate asynchronously
allow dynamic scaling up/down in terms of consumers
offer reliability and availability (e.g., consumer fails but producers keep
pushing messages to the queue)
43
Messaging queues again
Messaging Queues balance load between multiple consumers
Producer Producer
Producer
Messaging queue
Consumer Consumer
Consumer
Load balancing because they
allow scaling up in terms of consumers, each consumer gets equal load offer reliability and availability
(e.g., the queue delivers a message to multiple consumers but a consumer does not know which other consumers got it. If the consumer fails while processing the message, the message becomes visible to another consumer).
44
Example of messaging queue: ZeroMQ
Asynchronous messaging library for distributed or concurrent applications.
Runs without a dedicated message broker.
This means you don’t see the “Messaging queue” in the architecture. You do your own auditing and recovery if something fails.
Special patterns for reliability and for load balancing
Can make an application elastic on a large network:
Routes messages in a variety of patterns (including push/pull, pub/sub) Queues messages automatically
Can deal with over-full queues
No imposed format on messages
Delivers whole messages
45
ZeroMQ
ZeroMQ allows implementing different messaging patterns over different channels (e.g., TCP).
Library for many languages (e.g., C, Java, Python). Can run on any OS. Based on sockets* which carry messages (i.e., carry data)
It uses functions for socket programming:
to create, destroy, and connect sockets with each other to send and receive messages
* a socket allows two applications that are usually on different parts of the network to communicate 46
Extra slides: ZeroMQ
Extra slides are not part of the course, they are useful if you want to learn more on the topic of ZeroMQ and try it yourself.
To install ZeroMQ for python: pip install zmq
FREE Guide for ZeroMQ http://zguide.zeromq.org/page:all
We will examine the pipeline pattern, but there are many other patterns
(e.g., http://zguide.zeromq.org/page:all#advanced-request-reply for reliability
and http://zguide.zeromq.org/page:all for load balancing patterns) 47
ZeroMQ
Simple pipeline pattern
Ventilator: PUSHes task that is processed in parallel Workers: Do the processing work
Two types of sockets zmq.PUSH
Sends messages to downstream pipeline nodes. Messages are round-robined to all connected downstream nodes.
zmq.PULL
Receives messages from upstream pipeline nodes. Messages
are fair-queued (interleaved) from among all connected upstream nodes.
48
Extra slides: ZeroMQ
Simple pipeline example
One ventilator sends messages to one worker
Recall the messaging queue is taken care of internally by ZeroMQ
import sys
import zmq
from multiprocessing import Process import time
def worker():
context = zmq.Context() # creates a Context (needed for socket creation)
work_receiver = context.socket(zmq.PULL) #creates socket of type zmq.PULL work_receiver.connect(“tcp://127.0.0.1:5557”) #connects to remote socket of ventilator
for task_nbr in range(10000000):
message = work_receiver.recv() #receives a message from the remote socket of ventilator
sys.exit(1)
Code continues on next page
49
Extra slides: ZeroMQ
Simple pipeline example (continues from previous slide)
def main():
#see https://docs.python.org/2/library/multiprocessing.html#the-process-class
#for details on multiprocessing
Process(target=worker, args=()).start()
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH) #creates a socket of type zmq.PUSH
ventilator_send.bind(“tcp://127.0.0.1:5557″)
for num in range(10000000): ventilator_send.send(b”MESSAGE”)
#socket listens so that worker’s socket can #connect
#send a message on ventilator’s socket (so that #worker’s can receive it)
if __name__ == “__main__”:
start_time = time.time()
main()
end_time = time.time()
duration = end_time – start_time
msg_per_sec = 10000000 / duration print(“Duration: %s”,duration)
print(“Messages Per Second: %s”, msg_per_sec)
50
Extra slides: ZeroMQ
Pipeline example with a Sink
This is a more complex example.
Again:
• A ventilator that produces tasks that can be done in parallel
But now
• A set of workers that process tasks in parallel
• A sink collects results from the workers
FREE Guide for ZeroMQ http://zguide.zeromq.org/page:all 51
Extra slides: ZeroMQ
Example push/pull application
• Ventilator generates 100 tasks, each telling a worker to sleep for some ms
import zmq, random, time try:
raw_input
except NameError: # Python 3 raw_input = input
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH) sender.bind(“tcp://*:5557”)
# Socket with direct access to the sink: used to synchronize start of batch [Sync is a must] sink = context.socket(zmq.PUSH) sink.connect(“tcp://localhost:5558”)
print(“Press Enter when the workers are ready: “) _ = raw_input()
print(“Sending tasks to workers…”)
# The first message is “0” and signals start of batch sink.send(b’0′)
# Initialize random number generator
random.seed()
# Send 100 tasks [Scaling to more is easy]
total_msec = 0
for task_nbr in range(100):
# Random workload from 1 to 100 msecs
workload = random.randint(1, 100) total_msec += workload sender.send_string(u’%i’ % workload)
print(“Total expected cost: %s msec” % total_msec)
# Give 0MQ time to deliver
time.sleep(1)
52
Extra slides: ZeroMQ
Example push/pull application
• Worker receives a message, sleeps, and signals that it’s finished.
import sys, time, zmq context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL) receiver.connect(“tcp://localhost:5557”)
# Socket to send messages on
sender = context.socket(zmq.PUSH) sender.connect(“tcp://localhost:5558”)
# Process tasks forever
while True:
s = receiver.recv()
# Simple progress indicator for the user to #see
sys.stdout.write(‘.’)
sys.stdout.flush()
# Do the work
time.sleep(int(s)*0.001) # Send results to sink sender.send(b’’)
53
Extra slides: ZeroMQ
Example push/pull application
• Sink collects the 100 tasks and calculates how long the overall processing took.
import sys, time, zmq context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL) receiver.connect(“tcp://*:5558”)
# Wait for start of batch
s = receiver.recv()
# Start our clock
tstart=time.time()
# Process 100 confirmations
for task_nbr in range(100): s = receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(‘:’) else:
sys.stdout.write(‘.’) sys.stdout.flush()
# Calculate and report duration of batch
tend = time.time()
print(“Total elapsed time: %d msec” %
((tend-tstart)*1000))
The sink’s PULL socket collects results from workers evenly. This is called fair-queuing.
54