Big Data – Hadoop/MapReduce Sambit Sahu
2
Logistics
§ Quiz 1 next week. Sample released.
§ Assignment 2 (tentative) draft uploaded.
§ In-depth exploration topics
– Kubernets and DevOps: you will explore containers, kubernetes and its application to cloud
application devOps pipeline.
– Kafka and Events/Logs: you will explore how kafka and kafka stream used for handling events/logs
including internals, use cases and scale.
– Elastic Search and data pipeline: understand in detail Elastic Search how it is used for handling
unstructured data and build data pipeline to process and query such data efficiently.
– Spark and cluster computing: learn how to effectively use Spark at scale and build use cases for
demonstrating compute in a cluster including scale, administration, performance tuning. § Paper reviews
3
Cluster Computing
§ Designing a cluster
§ Automation of tasks in a cluster
§ Programming in a cluster – Hadoop/MapRedue
– Spark
Big Data System Architecture
§Current system modules for each software layer
Spark
Streaming GraphX MLbase
Spark
Tachyon
BlinkDB Shark HIVE
… Stor MP Data
m I Processing
, HBASE Data Management
Resource Management
2/2 5/1
Mesos, Yarn
4
5
Agenda
§Why Big Data? §Apache Hadoop
–Introduction –Architecture –Programming
6
Hypothetical Job
§You just got an awesome job at data-mining start-up .. Congratulations !!
–Free Snacks, Soda and Coffee — Yayy!!
§Your first day of work you are given a task
–The company has a new algorithm they want you to test.
–Your boss gives you
• The algorithm library
• A test machine and • 1GB input data file
Java Document Scorer Program
Read Input Process Data
Throughput 1GB per hour.
What if we wanted to process 10GB data set? 10hours!! How can we improve the performance?
7
Some Options
1. FasterCPU
2. MoreMemory
3. Increasethenumberofcores
4. Increasethenumberofthreads
5. Increasethenumberofthreadsandcores
8
Java Document Scorer Program – Multi Threaded
Throughput 4GB per hour.
How long for 100GB? What else can we do?
9
Get An Even Faster Machine with more Cores?
Source: MIT Open Courseware
Current Tools
¢ Programming models
l Shared memory (pthreads)
l Message passing (MPI) ¢ Design Patterns
l Master-slaves
l Producer-consumer flows l Shared work queues
Shared Memory
Message Passing
master
slaves
producer consumer
P1 P2 P3 P4 P5
P1 P2 P3 P4 P5
work queue
producer consumer
Memory
Where the rubber meets the road
¢ Concurrency is difficult to reason about
¢ Concurrency is even more difficult to reason about
l At the scale of datacenters (even across datacenters) l In the presence of failures
l In terms of multiple interacting services
¢ Not to mention debugging… ¢ The reality:
l Lots of one-off solutions, custom code
l Write you own dedicated library, then program with it
l Burden on the programmer to explicitly manage everything
What’s the common theme?
¢ To improve performance, you have to re-write the code ¢ The code has to adapt to the expected performance.
l This doesn’t work since you may not know the amount of data beforehand.
¢ The actual Intellectual Property (IP) of the company is the analytic algorithm
l However a lot of effort is spent on scaling the analytic
Big Data – Motivation
§Google processes 20 PB a day (2008)
§Wayback Machine has 3 PB + 100 TB/month (3/2009) §Facebook has 2.5 PB of user data + 15 TB/day (4/2009) §eBay has 6.5 PB of user data + 50 TB/day (5/2009) §CERN’s LHC will generate 15 PB a year
640K ought to be enough for anybody.
14
15
Enter .. Apache Hadoop
§Hadoop is a high-level Open Source project
– Under Apache Software Foundation
– Inspired by Google’s MapReduce and GFS papers
§It contains several individual projects – HDFS
– MapReduce – Yarn
§It also has a slew of related projects – PIG
– HIVE – Hbase
§Has been implemented for the most part in Java.
Big Data System Architecture
• Currentsystemmodulesforeachsoftwarelayer
Spark Streaming
BlinkDB
GraphX MLbase
HIVE … Storm MPI , HBASE
Data Processing
Data Management
Resource Management
Shark
Spark
Tachyon
2/25/15
COMS4121 Computer Systems for Data Science
16
Mesos, Yarn
A closer look
Partition Work
Combine Results
Divide and Conquer
“Work”
w1 w2 w3
“worker” “worker” “worker”
r1 r2 r3
Partition
“Result”
Combine
Parallelization Challenges
¢ How do we assign work units to workers?
¢ What if we have more work units than workers? ¢ What if workers need to share partial results?
¢ How do we aggregate partial results?
¢ How do we know all the workers have finished? ¢ What if workers die?
What is the common theme of all of these problems?
What’s the point?
¢ It’s all about the right level of abstraction
l The von Neumann architecture has served us well, but is no longer
appropriate for the multi-core/cluster environment ¢ Hide system-level details from the developers
l No more race conditions, lock contention, etc. ¢ Separating the what from how
l Developer specifies the computation that needs to be performed l Execution framework (“runtime”) handles actual execution
The datacenter is the computer!
“Big Ideas”
¢ Scale “out”, not “up”
l Limits of SMP and large shared-memory machines
¢ Move processing to the data l Cluster have limited bandwidth
¢ Process data sequentially, avoid random access l Seeks are expensive, disk throughput is reasonable
¢ Seamless scalability
l From the mythical man-month to the tradable machine-hour
Hadoop
§ Platform for distributed storage and computation – HDFS
– MapReduce – Ecosystem
22
Source: Hadoop in Practice, Alex Holmes, Manning Publications Co., 2012
What are we missing here?
23
Sequential File Read
Partition Work
Combine Results
Hadoop
§ Platform for distributed storage and computation – HDFS
– MapReduce – Ecosystem
24
Source: Hadoop in Practice, Alex Holmes, Manning Publications Co., 2012
How do we get data to the workers?
NAS
SAN
Compute Nodes
What’s the problem here?
26
HDFS: Assumptions
§ Commodity hardware over “exotic” hardware – Scale “out”, not “up”
§ High component failure rates
– Inexpensive commodity components fail all the time
§ “Modest” number of huge files
– Multi-gigabyte files are common, if not encouraged
§ Files are write-once, read many – Perhaps concurrently
§ Large streaming reads over random access – High sustained throughput over low latency
GFS slides adapted from material by (Ghemawat et al., SOSP 2003)
HDFS Architecture
HDFS namenode
Application
/foo/bar
File namespace
block 3df2
HDFS Client
(block id, block location)
(block id, byte range)
HDFS datanode
HDFS datanode
Linux file system
Linux file system
(file name, block id)
block data
instructions to datanode datanode state
……
Adapted from (Ghemawat et al., SOSP 2003)
How HDFS works
¢ When an input file is added to HDFS
l File is split into smaller blocks of fixed size
l Each block is replicated
l Each replicated block is stored on a different host
¢ Block size is configurable. Default is 128/256MB.
¢ Replication level is configurable. Default is 3 l Replication is necessary for
• Scaling
• HighAvailability
¢ In case a host crashes or is removed
l All blocks on that host are automatically replicated to other hosts ¢ In case a host is added
l Blocks will be rebalanced so that some blocks from other hosts will be placed on the new host
29
HDFS Component Responsibilities
§ Name Node
– Managing the file system namespace:
• Holdsfile/directorystructure,metadata,file-to-blockmapping,accesspermissions,etc. – Coordinating file operations:
• Directsclientstodatanodesforreadsandwrites
• Nodataismovedthroughthenamenode – Maintaining overall health:
• Periodiccommunicationwiththedatanodes • Blockre-replicationandrebalancing
• Garbagecollection
§ Data Node
– Actual storage and management of data block on a single host – Provides clients with access to data
30
HDFS
HDFS Components in Cluster
master node
namenode
namenode daemon
datanode daemon
Linux file system
…
slave node
datanode daemon
Linux file system
…
slave node
datanode daemon
Linux file system
…
slave node
31
Hadoop
§ Platform for distributed storage and computation
– HDFS
– MapReduce
– Ecosystem
32
Source: Hadoop in Practice, Alex Holmes, Manning Publications Co., 2012
MapReduce (MR) can refer to…
¢ The execution framework (aka “runtime”) ¢ The programming model
¢ The specific implementation
Usage is usually clear from context!
34
MR Framework Components
§Job Tracker
–Central component responsible for managing job lifecycles –One Job Tracker per MR framework instance
–Accepts job submissions, queries etc. from clients –Enqueues jobs and schedules individual tasks. –Communicates with Task Trackers to deploy and run tasks –Attempts to assign tasks to support Data Locality.
§Task Tracker
–One Task Tracker per host
–Runs and manages individual tasks
–Communicates progress of tasks back to Job Tracker.
MR Programming Model
¢ Programmers specify two functions:
map (k, v) →
reduce (k’, v’) →
l All values with the same key are sent to the same reducer
¢ The MR Execution framework handles everything else… What’s “everything else”?
MapReduce
§Everything Else
§ Handles scheduling
– Assigns workers to map
and reduce tasks
§ Handles “data distribution”
– Moves processes to data
§ Handles synchronization
– Gathers, sorts, and shuffles
intermediate data
§ Handles errors and faults – Detects worker failures
and restarts
§ Everything happens on top of a distributed FS (HDFS)
36
Our Scoring Algorithm as a Map Reduce Program
37
Our Analytic
Basic Hadoop API*
¢ Mapper
l void map(K1 key, V1 value, OutputCollector
Reporter reporter)
l void configure(JobConf job)
l void close() throws IOException
¢ Reducer/Combiner
l void reduce(K2 key, Iterator
OutputCollector
l void close() throws IOException
¢ Partitioner
l void getPartition(K2 key, V2 value, int numPartitions)
*Note: forthcoming API changes…
k1
v1
k2
v2
k3
v3
k4
v4
k5
v5
k6
v6
map map
reduce
reduce
map map
reduce
a
1
b
2
c
3
c
6
a
5
c
2
b
7
c
8
Shuffle and Sort: aggregate values by keys
a
1
5
b
2
7
c
2
3
6
8
r1
s1
r2
s2
r3
s3
Lets Talk Numbers
¢ How many mappers?
l Depends on the size of input data
l Typically 1 mapper per data block
l So 1 GB input data will have around 8 Mappers
• Assuming 128MB block size ¢ How many reducers?
l Depends on cluster reducer capacity
l Can be set depending on the expected number of keys l For large data sets, set it to cluster reducer capacity
MapReduce
¢ Programmers specify two functions:
map (k, v) →
reduce (k’, v’) →
l All values with the same key are reduced together
¢ The execution framework handles everything else… ¢ Not quite…usually, programmers also specify:
combine (k’, v’) →
l Mini-reducers that run in memory after the map phase l Used as an optimization to reduce network traffic partition (k’, number of partitions) → partition for k’
l Often a simple hash of the key, e.g., hash(k’) mod n
l Divides up key space for parallel reduce operations
Two more details…
¢ Barrier between map and reduce phases
l But we can begin copying intermediate data earlier
¢ Keys arrive at each reducer in sorted order l No enforced ordering across reducers
k1
v1
k2
v2
k3
v3
k4
v4
k5
v5
k6
v6
map
combine
map
combine
map
combine
map
combine
a
1
b
2
c
3
c
6
a
5
c
2
b
7
c
8
a
1
b
2
c
9
a
5
c
2
b
7
c
8
partition
partition
Shuffle and Sort: aggregate values by keys
partition
partition
a
1
5
b
2
7
c
2
9
8
reduce
reduce
reduce
r1
s1
r2
s2
r3
s3
Input To Mappers
Input File
InputSplit InputSplit InputSplit
RecordReader RecordReader RecordReader
InputSplit InputSplit
RecordReader RecordReader
Mapper Mapper Mapper Mapper Mapper
Intermediates Intermediates Intermediates Intermediates Intermediates
Source: redrawn from a slide by Cloduera, cc-licensed
Input File
InputFormat
Shuffle and Sort
Mapper
intermediate files (on disk)
merged spills (on disk)
Reducer
circular buffer (in memory)
spills (on disk)
other reducers
other mappers
Shuffle and Sort in Hadoop
¢ Probably the most complex aspect of MapReduce! ¢ Map side
l Map outputs are buffered in memory in a circular buffer
l When buffer reaches threshold, contents are “spilled” to disk
l Spills merged in a single, partitioned file (sorted within each partition): combiner runs here
¢ Reduce side
l First, map outputs are copied over to reducer machine
l “Sort” is a multi-pass merge of map outputs (happens in memory and on disk): combiner runs here
l Final merge pass goes directly into reducer
Mapper
Intermediates
Partitioner
(combiners omitted here)
Mapper
Intermediates
Partitioner
Intermediates
Reducer
Mapper
Intermediates
Partitioner
Intermediates
Reducer
Mapper
Intermediates
Partitioner
Intermediates
Reduce
Mapper
Intermediates
Partitioner
Source: redrawn from a slide by Cloduera, cc-licensed
Reducer to Output
Reducer Reducer Reduce
RecordWriter RecordWriter RecordWriter
Source: redrawn from a slide by Cloduera, cc-licensed
Output File
Output File
Output File
OutputFormat
Input and Output
¢ InputFormat:
l TextInputFormat
l KeyValueTextInputFormat l SequenceFileInputFormat l…
¢ OutputFormat:
l TextOutputFormat
l SequenceFileOutputFormat l…
Putting everything together…
namenode
namenode daemon
job submission node
jobtracker
tasktracker
…
datanode daemon
Linux file system
slave node
tasktracker
…
datanode daemon
Linux file system
slave node
tasktracker
…
datanode daemon
Linux file system
slave node
50
HADOOP Architecture
51
§ Master
– NameNode
– JobTracker
§ Slaves
– Data Node
– Compute Node – Why together?
• DataLocality
52
One More Thing
§Distributed Cache
–Usually used for files of small size
–Provides a convenient way to propagate applications and configuration
files
–HDFS is not used handle such files due to their small size –Shared across all nodes in the MapReduce cluster
53
Dizzy Yet?
§ OK, we went through a lot of details
§ Whatever happened to the simplicity of programming??
§ Do I really have to write a MapReduce program every time I want to run a new analytic?
We went from..
Multi-Threaded
Map-Reduce
54
55
Enter PIG … Oink!
§ High Level Languages for Map-Reduce – PIG
• DevelopedbyYahoo – HIVE
• DevelopedbyFacebook – JAQL
• DevelopedbyIBM
§ All of these languages provide similar functionality
§ All of them allow users to plug in their own user defined functions (UDFs)
Lets get Practical – From Setup to Results Setting up a Hadoop Cluster
§ Minimum recommended configuration (4 Hosts)
– 1 Host Dedicated for Management Services (Job Tracker, Name Node etc) – 3 Hosts as Slave nodes (Data Node , Task Trackers)
§ Data nodes should have high capacity local disks attached. – This is where all your data is going to be
§ How much total disk space?
– Depends on input data to be processed
– Effective Storage Space Recommended: Typically 3 times your input data size – Actual Storage Space: Effective Storage Space * 3 (replication level)
§ Single node installation is fine for development/testing on very small data – Perhaps not the best for testing performance
§ Installation instructions vary from provider to provider 56
Some cluster configuration parameters
§ HDFS configuration parameters – Stored in hdfs-site.xml
– Block size
– Default replication count
§ MapReduce configuration parameters – Stored In “mapred-site.xml”
– Java heap size for mappers/reducers – Number of mappers/reducers per host
• Seehttp://wiki.apache.org/hadoop/HowManyMapsAndReduces
§ IMPORTANT
– Job Tracker URL: http://
57
Job Tracker Web Page (port 50030)
58
59
Working with data
§ Lets say you have 1 GB of data in your local filesystem (mydata.txt)
§ Load into HDFS
– hadoop fs –mkdir /path/mydirectory
– hadoop fs –put mydata.txt /path/mydirectory – where /path/mydirectory is in HDFS
§ List the file you just uploaded
– hadoop fs –ls /path/mydirectory
§ “hadoop fs” works similar to linux filesystem commands – However HDFS is not POSIX compliant.
– It cannot be mounted as a regular filesystem
Writing your program .. see the simplicity!!
§ JAQL program for running our scorer
§ PIG program for running our scorer
60
61
All languages provide similar functionality
§ LOAD (various data formats) § JOIN
§ FOR-EACH
§ GROUP
§ SORT
§ FILTER
§ Pluggable UDFs
62
Hadoop Programming Tips
§Thinking at scale
– Filter unwanted data earlier in the flow
– Store intermediate data
– Use “sequence” format for storing data.
§These are not iterative languages – i.e. No for or while loops
§Watch out for obvious bottlenecks
– Single key for all mapper output will send data to one reducer – Too much data sent to a UDF will result in OOM errors
63
Submitting a Job
§ Create and save your PIG script (myscript.pig)
§ To deploy (pig command will be in your installation) – pig –f myscipt.pig
– Command will complete once your job completes
§ To check the status of your job
– Use the Job Tracker URL (easiest) OR
– hadoop job –list (will print all job ids)
– hadoop job –status
§ To get the results
– hadoop fs –get /path/results.txt .
Anatomy of a Job
¢ MapReduce program in Hadoop = Hadoop job
l Jobs are divided into map and reduce tasks
l An instance of running a task is called a task attempt l Multiple jobs can be composed into a workflow
¢ Job submission process
l Client (i.e., driver program) creates a job, configures it, and
submits it to job tracker
l JobClient computes input splits (on client end)
l Job data (jar, configuration XML) are sent to JobTracker
l JobTracker puts job data in shared location, enqueues tasks l TaskTrackers poll for tasks
l Off to the races…
A simple illustration of MR process
Hadoop Workflow
2. Develop code locally
1. Load data into HDFS
You
3. Submit MapReduce job 3a. Go back to Step 2
Hadoop Cluster
4. Retrieve data from HDFS
Larger View…
MR Patterns Examples
¢ Jimmy Lin’s book
¢ Jeffrey Ulman’s book
¢ An excellent blog: https://highlyscalable.wordpress.com/2012/02/01/mapreduce- patterns/
Uh Oh.. My Job Failed…Now what?
¢ First, take a deep breath
¢ Start small, start locally
¢ Strategies
l Learn to use the webapp
l Where does println go?
l Don’t use println, use logging l Throw RuntimeExceptions
¢ Logs are most easily accessible via the Job Tracker URL
Time for a Raise
¢ Finally you have mastered Hadoop Big Data ¢ Your applications are scaling.
l You deserve a raise!! ¢ Boss
l Can we query the data for specific entities? l How long will that take?
¢ Problem
l Remember this is still sequential access
l To find a specific entity, you still need to read the entire data set. ¢ What now?
l How is this solved in traditional systems? Databases
Other projects based on Hadoop
¢ HBase ¢ Hive
¢ PIG
¢ Spark ¢ Mahout
Hive – a SQL-like data warehouse on
Hadoop https://cwiki.apache.org/confluence/display/Hive/Tutorial
¢ Supports a SQL-like data warehouse on top of Hadoop – began at Facebook
¢ Provides SQL users the capability of big data without requiring lower level programming for a wide range of tasks
¢ Fewer lines of code!
¢ /bin/hive –help
Wordcount Example
Hive – SQL-like data warehouse on top of Hadoop
¢ A data warehouse with SQL like (HiveQL) interface on Hadoop for processing and managing structured data
¢ Hive hides the complexity of MR programming details from user with interest for processing structured data
INSERT OVERWRITE TABLE user_active SELECT user.*
FROM user
WHERE user.active =1;
¢ Keycomponents
l Metastore
l Parser/planner/optimizer l Interface
Hive Installation and config
¢ Just download recent Hive tarball and extract it. ¢ Need to set up few directories
l bin/hadoop fs -mkdir /tmp
l bin/hadoop fs -mkdir /user/hive/warehouse
¢ Hive manages all the data under this directory
¢ Hive stores metadata in a standard relational database
Hive examples
¢ 11.1.2
Data model – column partition
¢ Tables as fundamental data model – stored under /usr/hive/warehouse
¢ Hive uses concept of partition columns – partitions data by column index, i.e., state column, date column partition..
/user/hive/warehouse/users/date=20090901/state=CA /user/hive/warehouse/users/date=20090901/state=NY /user/hive/warehouse/users/date=20090901/state=TX …
/user/hive/warehouse/users/date=20090902/state=CA /user/hive/warehouse/users/date=20090902/state=NY /user/hive/warehouse/users/date=20090902/state=TX …
/user/hive/warehouse/users/date=20090903/state=CA /user/hive/warehouse/users/date=20090903/state=NY /user/hive/warehouse/users/date=20090903/state=TX …
Hive Data Model – partition and cluster
¢ Tables stored under user/hive/warehouse in HDFS ¢ Partition columns
¢ Buckets – allows to create smaller range partitions
Data model – buckets
¢ concept of buckets, which provide efficiency to queries that can work well on a random sample of data
¢ Bucketing divides data into a specified number of files based on the hash of the bucket column
/user/hive/warehouse/users/date=20090901/state=CA/part-00000 … /user/hive/warehouse/users/date=20090901/state=CA/part-00031 /user/hive/warehouse/users/date=20090901/state=NY/part-00000 … /user/hive/warehouse/users/date=20090901/state=NY/part-00031 /user/hive/warehouse/users/date=20090901/state=TX/part-00000
¢ Example-pageview
Resources
¢ Papers
l Google File System, 2003 l Google MapReduce, 2004 l Google Bigtable, 2006
¢ URLS
l Apache Hadoop: http://hadoop.apache.org
¢ Available Hadoop Distributions
l Apache, IBM, Cloudera, Hortonworks
Hive Architecture
¢ Main components
l SQL interface l Parser/Planner l Metastore
l Driver