PowerPoint Presentation
HBase and Hive
1
HBase: Overview
An open-source version of Google BigTable
HBase is a distributed column-oriented data store built on top of HDFS
HBase is an Apache open source project whose goal is to provide storage for Hadoop Distributed Computing
Data is logically organized into tables, rows and columns
2
HBase: Part of Hadoop’s Ecosystem
3
HBase is built on top of HDFS
HBase files are internally stored in HDFS
HBase vs. HDFS
Both are distributed systems that scale to hundreds or thousands of nodes
HDFS is good for batch processing (scans over big files)
Not good for record lookup
Not good for incremental addition of small batches
Not good for updates
4
HBase vs. HDFS (Cont’d)
HBase is designed to efficiently address the above points
Fast record lookup
Support for record-level insertion
Support for updates (not in place)
HBase updates are done by creating new versions of values
5
HBase vs. HDFS (Cont’d)
6
If application has neither random reads or writes Stick to HDFS
HBase Data Model
7
HBase Data Model
HBase is based on Google’s Bigtable model
Key-Value pairs
8
HBase Logical View
9
HBase: Keys and Column Families
10
Each row has a Key
Each record is divided into Column Families
Each column family consists of one or more Columns
Key
Byte array
Serves as the primary key for the table
Indexed far fast lookup
Column Family
Has a name (string)
Contains one or more related columns
Column
Belongs to one column family
Included inside the row
familyName:columnName
11
Column family named “Contents”
Column family named “anchor”
Column named “apache.com”
Version Number
Unique within each key
By default System’s timestamp
Data type is Long
Value (Cell)
Byte array
12
Version number for each row
value
Notes on Data Model
HBase schema consists of several Tables
Each table consists of a set of Column Families
Columns are not part of the schema
HBase has Dynamic Columns
Because column names are encoded inside the cells
Different cells can have different columns
13
“Roles” column family has different columns in different cells
Notes on Data Model (Cont’d)
The version number can be user-supplied
Even does not have to be inserted in increasing order
Version number are unique within each key
Table can be very sparse
Many cells are empty
Keys are indexed as the primary key
Has two columns
[cnnsi.com & my.look.ca]
HBase Physical Model
15
HBase Physical Model
Each column family is stored in a separate file (called HTables)
Key & Version numbers are replicated with each column family
Empty cells are not stored
16
HBase maintains a multi-level index on values:
Example
17
Column Families
18
HBase Regions
Each HTable (column family) is partitioned horizontally into regions
Regions are counterpart to HDFS blocks
19
Each will be one region
HBase Architecture
20
Three Major Components
21
The HBaseMaster
One master
The HRegionServer
Many region servers
The HBase client
HBase Components
Region
A subset of a table’s rows, like horizontal range partitioning
Automatically done
RegionServer (many slaves)
Manages data regions
Serves data for reads and writes (using a log)
Master
Responsible for coordinating the slaves
Assigns regions, detects failures
Admin functions
22
Big Picture
23
Demo: HBase on Azure
Create an HBase cluster
Log into the cluster using ssh
Start Hbase shell:
hbase shell
list existing tables
list
get all rows
scan ‘Contacts’
get one row
get ‘Contacts’, ‘1000‘
update one row
put ‘Contacts’, ‘1000’, ‘Personal:Name’, ‘New Name‘
get ‘Contacts’, ‘1000’
24
HBase vs. HDFS
25
HBase vs. RDBMS
26
When to use HBase
27
HBase: Joins
HBase does not support joins
Can be done in the application layer
Using scan() and get() operations
Hive and Spark
28
Hive
29
Hive
Hive: data warehousing application in Hadoop
Query language is HQL, variant of SQL
Tables stored on HDFS as flat files
Developed by Facebook, now open source
Common idea:
Provide higher-level language to facilitate large-data processing
Higher-level language “compiles down” to MapReduce jobs
Data Model
Tables
Typed columns (int, float, string, boolean)
Also, list: map (for JSON-like data)
Partitions
For example, range-partition tables by date
Buckets
Hash partitions within ranges (useful for sampling, join optimization)
Source: cc-licensed slide by Cloudera
Metastore
Database: namespace containing a set of tables
Holds table definitions (column types, physical layout)
Holds partitioning information
Can be stored in Derby, MySQL, and many other relational databases
Source: cc-licensed slide by Cloudera
Physical Layout
Warehouse directory in HDFS
E.g., /user/hive/warehouse
Tables stored in subdirectories of warehouse
Partitions form subdirectories of tables
Actual data stored in flat files
Control char-delimited text, or SequenceFiles
With custom SerDe, can use arbitrary format
Source: cc-licensed slide by Cloudera
Hive: Example
Hive looks similar to an SQL database
Support Joins
Source: Material drawn from Cloudera training VM
Hive: Behind the Scenes
SELECT s.word, s.freq, k.freq FROM shakespeare s
JOIN bible k ON (s.word = k.word) WHERE s.freq >= 1 AND k.freq >= 1
ORDER BY s.freq DESC LIMIT 10;
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF shakespeare s) (TOK_TABREF bible k) (= (. (TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) freq)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL k) freq))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) freq) 1) (>= (. (TOK_TABLE_OR_COL k) freq) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. (TOK_TABLE_OR_COL s) freq))) (TOK_LIMIT 10)))
(one or more of MapReduce jobs)
(Abstract Syntax Tree)
MapReduce Code
Pig Slides adapted from Olston et al.
Hive + HBase
Integration
How it works:
Hive can use tables that already exist in HBase or manage its own ones, but they still all reside in the same HBase instance
HBase
Hive table definitions
Points to an existing table
Manages this table from Hive
38
Integration
How it works:
When using an already existing table, defined as EXTERNAL, you can create multiple Hive tables that point to it
HBase
Hive table definitions
Points to some column
Points to other columns, different names
39
Integration
How it works:
Columns are mapped however you want, changing names and giving types
HBase table
Hive table definition
name STRING
age INT
siblings MAP
d:fullname
d:age
d:address
f:
persons
people
a column name was changed, one isn’t used, and a map points to a family
40
Accessing HBase from Spark
https://github.com/hortonworks-spark/shc
41
Row key
Column Family
value TimeStamp
Row key
Column Family
value TimeStamp
Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12
“
…”
t11
“
…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m” “CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6
“
…”
t5
“
…”
t3
“
…”
Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12
“
…”
t11
“
…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m”
“CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6
“
…”
t5
“
…”
t3
“
…”
Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12
“
…”
t11
“
…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m” “CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6
“
…”
t5
“
…”
t3
“
…”
Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12
“
…”
t11
“
…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m”
“CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6
“
…”
t5
“
…”
t3
“
…”
�
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
i
mport org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.a
pache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
imp
ort org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobC
ontrol;
import org.apache.hadoop.mapred.lib.IdentityMapper;
public class MRExample {
public static class LoadPages extends MapReduceBase
implements Mapper
public void map(LongWritable k, Text val,
OutputCollector
Reporter reporter) throws IOException {
// Pull the key out
String line = val.toString();
int firstComma = line.indexOf(‘,’);
String key = line.sub
string(0, firstComma);
String value = line.substring(firstComma + 1);
Text outKey = new Text(key);
// Prepend an index to the value so we know which file
// it came from.
Text outVal = new Text(“1
” + value);
oc.collect(outKey, outVal);
}
}
public static class LoadAndFilterUsers extends MapReduceBase
implements Mapper
public void map(LongWritable k, Text val,
OutputCollector
Reporter reporter) throws IOException {
// Pull the key out
String line = val.toString();
int firstComma = line.indexOf(‘,’);
String value = line.substring(
firstComma + 1);
int age = Integer.parseInt(value);
if (age < 18 || age > 25) return;
String key = line.substring(0, firstComma);
Text outKey = new Text(key);
// Prepend an index to the value so w
e know which file
// it came from.
Text outVal = new Text(“2” + value);
oc.collect(outKey, outVal);
}
}
public static class Join extends MapReduceBase
implements Reducer
public void reduce(Text key,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// For each value, figure out which file it’s from and
store it
// accordingly.
List
List
while (iter.hasNext()) {
Text t = iter.next();
String value = t.to
String();
if (value.charAt(0) == ‘1’)
first.add(value.substring(1));
else second.add(value.substring(1));
reporter.setStatus(“OK”);
}
// Do the cross product and collect the values
for (String s1 : first) {
for (String s2 : second) {
String outval = key + “,” + s1 + “,” + s2;
oc.collect(null, new Text(outval));
reporter.setStatus(“OK”);
}
}
}
}
public static class LoadJoined extends MapReduceBase
implements Mapper
public void map(
Text k,
Text val,
OutputColle
ctor
Reporter reporter) throws IOException {
// Find the url
String line = val.toString();
int firstComma = line.indexOf(‘,’);
int secondComma = line.indexOf(‘,’, first
Comma);
String key = line.substring(firstComma, secondComma);
// drop the rest of the record, I don’t need it anymore,
// just pass a 1 for the combiner/reducer to sum instead.
Text outKey = new Text(key);
oc.collect(outKey, new LongWritable(1L));
}
}
public static class ReduceUrls extends MapReduceBase
implements Reducer
public void reduce(
Text ke
y,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// Add up all the values we see
long sum = 0;
wh
ile (iter.hasNext()) {
sum += iter.next().get();
reporter.setStatus(“OK”);
}
oc.collect(key, new LongWritable(sum));
}
}
public static class LoadClicks extends MapReduceBase
i
mplements Mapper
public void map(
WritableComparable key,
Writable val,
OutputCollector
Reporter reporter)
throws IOException {
oc.collect((LongWritable)val, (Text)key);
}
}
public static class LimitClicks extends MapReduceBase
implements Reducer
int count = 0;
public
void reduce(
LongWritable key,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// Only output the first 100 records
while (count
< 100 && iter.hasNext()) {
oc.collect(key, iter.next());
count++;
}
}
}
public static void main(String[] args) throws IOException {
JobConf lp = new JobConf(MRExample.class);
lp.se
tJobName("Load Pages");
lp.setInputFormat(TextInputFormat.class);
lp.setOutputKeyClass(Text.class);
lp.setOutputValueClass(Text.class);
lp.setMapperClass(LoadPages.class);
FileInputFormat.addInputPath(lp, new
Path("/
user/gates/pages"));
FileOutputFormat.setOutputPath(lp,
new Path("/user/gates/tmp/indexed_pages"));
lp.setNumReduceTasks(0);
Job loadPages = new Job(lp);
JobConf lfu = new JobConf(MRExample.class);
lfu.s
etJobName("Load and Filter Users");
lfu.setInputFormat(TextInputFormat.class);
lfu.setOutputKeyClass(Text.class);
lfu.setOutputValueClass(Text.class);
lfu.setMapperClass(LoadAndFilterUsers.class);
FileInputFormat.add
InputPath(lfu, new
Path("/user/gates/users"));
FileOutputFormat.setOutputPath(lfu,
new Path("/user/gates/tmp/filtered_users"));
lfu.setNumReduceTasks(0);
Job loadUsers = new Job(lfu);
JobConf join = new JobConf(
MRExample.class);
join.setJobName("Join Users and Pages");
join.setInputFormat(KeyValueTextInputFormat.class);
join.setOutputKeyClass(Text.class);
join.setOutputValueClass(Text.class);
join.setMapperClass(IdentityMap
per.class);
join.setReducerClass(Join.class);
FileInputFormat.addInputPath(join, new
Path("/user/gates/tmp/indexed_pages"));
FileInputFormat.addInputPath(join, new
Path("/user/gates/tmp/filtered_users"));
FileOutputFormat.se
tOutputPath(join, new
Path("/user/gates/tmp/joined"));
join.setNumReduceTasks(50);
Job joinJob = new Job(join);
joinJob.addDependingJob(loadPages);
joinJob.addDependingJob(loadUsers);
JobConf group = new JobConf(MRE
xample.class);
group.setJobName("Group URLs");
group.setInputFormat(KeyValueTextInputFormat.class);
group.setOutputKeyClass(Text.class);
group.setOutputValueClass(LongWritable.class);
group.setOutputFormat(SequenceFi
leOutputFormat.class);
group.setMapperClass(LoadJoined.class);
group.setCombinerClass(ReduceUrls.class);
group.setReducerClass(ReduceUrls.class);
FileInputFormat.addInputPath(group, new
Path("/user/gates/tmp/joined"));
FileOutputFormat.setOutputPath(group, new
Path("/user/gates/tmp/grouped"));
group.setNumReduceTasks(50);
Job groupJob = new Job(group);
groupJob.addDependingJob(joinJob);
JobConf top100 = new JobConf(MRExample.class);
top100.setJobName("Top 100 sites");
top100.setInputFormat(SequenceFileInputFormat.class);
top100.setOutputKeyClass(LongWritable.class);
top100.setOutputValueClass(Text.class);
top100.setOutputFormat(SequenceFileOutputF
ormat.class);
top100.setMapperClass(LoadClicks.class);
top100.setCombinerClass(LimitClicks.class);
top100.setReducerClass(LimitClicks.class);
FileInputFormat.addInputPath(top100, new
Path("/user/gates/tmp/grouped"));
FileOutputFormat.setOutputPath(top100, new
Path("/user/gates/top100sitesforusers18to25"));
top100.setNumReduceTasks(1);
Job limit = new Job(top100);
limit.addDependingJob(groupJob);
JobControl jc = new JobControl("Find top
100 sites for users
18 to 25");
jc.addJob(loadPages);
jc.addJob(loadUsers);
jc.addJob(joinJob);
jc.addJob(groupJob);
jc.addJob(limit);
jc.run();
}
}
import java
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.IdentityMapper;
public class MRExample {
public static class LoadPages extends MapReduceBase
implements Mapper
public void map(LongWritable k, Text val,
OutputCollector
Reporter reporter) throws IOException {
// Pull the key out
String line = val.toString();
int firstComma = line.indexOf(‘,’);
String key = line.substring(0, firstComma);
String value = line.substring(firstComma + 1);
Text outKey = new Text(key);
// Prepend an index to the value so we know which file
// it came from.
Text outVal = new Text(“1” + value);
oc.collect(outKey, outVal);
}
}
public static class LoadAndFilterUsers extends MapReduceBase
implements Mapper
public void map(LongWritable k, Text val,
OutputCollector
Reporter reporter) throws IOException {
// Pull the key out
String line = val.toString();
int firstComma = line.indexOf(‘,’);
String value = line.substring(firstComma + 1);
int age = Integer.parseInt(value);
if (age < 18 || age > 25) return;
String key = line.substring(0, firstComma);
Text outKey = new Text(key);
// Prepend an index to the value so we know which file
// it came from.
Text outVal = new Text(“2” + value);
oc.collect(outKey, outVal);
}
}
public static class Join extends MapReduceBase
implements Reducer
public void reduce(Text key,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// For each value, figure out which file it’s from and store it
// accordingly.
List
List
while (iter.hasNext()) {
Text t = iter.next();
String value = t.toString();
if (value.charAt(0) == ‘1’) first.add(value.substring(1));
else second.add(value.substring(1));
reporter.setStatus(“OK”);
}
// Do the cross product and collect the values
for (String s1 : first) {
for (String s2 : second) {
String outval = key + “,” + s1 + “,” + s2;
oc.collect(null, new Text(outval));
reporter.setStatus(“OK”);
}
}
}
}
public static class LoadJoined extends MapReduceBase
implements Mapper
public void map(
Text k,
Text val,
OutputCollector
Reporter reporter) throws IOException {
// Find the url
String line = val.toString();
int firstComma = line.indexOf(‘,’);
int secondComma = line.indexOf(‘,’, firstComma);
String key = line.substring(firstComma, secondComma);
// drop the rest of the record, I don’t need it anymore,
// just pass a 1 for the combiner/reducer to sum instead.
Text outKey = new Text(key);
oc.collect(outKey, new LongWritable(1L));
}
}
public static class ReduceUrls extends MapReduceBase
implements Reducer
public void reduce(
Text key,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// Add up all the values we see
long sum = 0;
while (iter.hasNext()) {
sum += iter.next().get();
reporter.setStatus(“OK”);
}
oc.collect(key, new LongWritable(sum));
}
}
public static class LoadClicks extends MapReduceBase
implements Mapper
public void map(
WritableComparable key,
Writable val,
OutputCollector
Reporter reporter) throws IOException {
oc.collect((LongWritable)val, (Text)key);
}
}
public static class LimitClicks extends MapReduceBase
implements Reducer
int count = 0;
public void reduce(
LongWritable key,
Iterator
OutputCollector
Reporter reporter) throws IOException {
// Only output the first 100 records
while (count < 100 && iter.hasNext()) { oc.collect(key, iter.next()); count++; } } } public static void main(String[] args) throws IOException { JobConf lp = new JobConf(MRExample.class); lp.setJobName("Load Pages"); lp.setInputFormat(TextInputFormat.class); lp.setOutputKeyClass(Text.class); lp.setOutputValueClass(Text.class); lp.setMapperClass(LoadPages.class); FileInputFormat.addInputPath(lp, new Path("/user/gates/pages")); FileOutputFormat.setOutputPath(lp, new Path("/user/gates/tmp/indexed_pages")); lp.setNumReduceTasks(0); Job loadPages = new Job(lp); JobConf lfu = new JobConf(MRExample.class); lfu.setJobName("Load and Filter Users"); lfu.setInputFormat(TextInputFormat.class); lfu.setOutputKeyClass(Text.class); lfu.setOutputValueClass(Text.class); lfu.setMapperClass(LoadAndFilterUsers.class); FileInputFormat.addInputPath(lfu, new Path("/user/gates/users")); FileOutputFormat.setOutputPath(lfu, new Path("/user/gates/tmp/filtered_users")); lfu.setNumReduceTasks(0); Job loadUsers = new Job(lfu); JobConf join = new JobConf(MRExample.class); join.setJobName("Join Users and Pages"); join.setInputFormat(KeyValueTextInputFormat.class); join.setOutputKeyClass(Text.class); join.setOutputValueClass(Text.class); join.setMapperClass(IdentityMapper.class); join.setReducerClass(Join.class); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/indexed_pages")); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/filtered_users")); FileOutputFormat.setOutputPath(join, new Path("/user/gates/tmp/joined")); join.setNumReduceTasks(50); Job joinJob = new Job(join); joinJob.addDependingJob(loadPages); joinJob.addDependingJob(loadUsers); JobConf group = new JobConf(MRExample.class); group.setJobName("Group URLs"); group.setInputFormat(KeyValueTextInputFormat.class); group.setOutputKeyClass(Text.class); group.setOutputValueClass(LongWritable.class); group.setOutputFormat(SequenceFileOutputFormat.class); group.setMapperClass(LoadJoined.class); group.setCombinerClass(ReduceUrls.class); group.setReducerClass(ReduceUrls.class); FileInputFormat.addInputPath(group, new Path("/user/gates/tmp/joined")); FileOutputFormat.setOutputPath(group, new Path("/user/gates/tmp/grouped")); group.setNumReduceTasks(50); Job groupJob = new Job(group); groupJob.addDependingJob(joinJob); JobConf top100 = new JobConf(MRExample.class); top100.setJobName("Top 100 sites"); top100.setInputFormat(SequenceFileInputFormat.class); top100.setOutputKeyClass(LongWritable.class); top100.setOutputValueClass(Text.class); top100.setOutputFormat(SequenceFileOutputFormat.class); top100.setMapperClass(LoadClicks.class); top100.setCombinerClass(LimitClicks.class); top100.setReducerClass(LimitClicks.class); FileInputFormat.addInputPath(top100, new Path("/user/gates/tmp/grouped")); FileOutputFormat.setOutputPath(top100, new Path("/user/gates/top100sitesforusers18to25")); top100.setNumReduceTasks(1); Job limit = new Job(top100); limit.addDependingJob(groupJob); JobControl jc = new JobControl("Find top 100 sites for users 18 to 25"); jc.addJob(loadPages); jc.addJob(loadUsers); jc.addJob(joinJob); jc.addJob(groupJob); jc.addJob(limit); jc.run(); } } /docProps/thumbnail.jpeg