程序代写代做代考 database hbase hadoop SQL Hive distributed system Java PowerPoint Presentation

PowerPoint Presentation

HBase and Hive

1

HBase: Overview
An open-source version of Google BigTable
HBase is a distributed column-oriented data store built on top of HDFS
HBase is an Apache open source project whose goal is to provide storage for Hadoop Distributed Computing
Data is logically organized into tables, rows and columns
2

HBase: Part of Hadoop’s Ecosystem
3

HBase is built on top of HDFS
HBase files are internally stored in HDFS

HBase vs. HDFS
Both are distributed systems that scale to hundreds or thousands of nodes

HDFS is good for batch processing (scans over big files)
Not good for record lookup
Not good for incremental addition of small batches
Not good for updates
4

HBase vs. HDFS (Cont’d)
HBase is designed to efficiently address the above points
Fast record lookup
Support for record-level insertion
Support for updates (not in place)

HBase updates are done by creating new versions of values
5

HBase vs. HDFS (Cont’d)
6

If application has neither random reads or writes  Stick to HDFS

HBase Data Model
7

HBase Data Model
HBase is based on Google’s Bigtable model
Key-Value pairs

8

HBase Logical View
9

HBase: Keys and Column Families
10

Each row has a Key
Each record is divided into Column Families
Each column family consists of one or more Columns

Key
Byte array
Serves as the primary key for the table
Indexed far fast lookup
Column Family
Has a name (string)
Contains one or more related columns
Column
Belongs to one column family
Included inside the row
familyName:columnName

11

Column family named “Contents”
Column family named “anchor”
Column named “apache.com”

Version Number
Unique within each key
By default System’s timestamp
Data type is Long
Value (Cell)
Byte array

12

Version number for each row
value

Notes on Data Model
HBase schema consists of several Tables
Each table consists of a set of Column Families
Columns are not part of the schema
HBase has Dynamic Columns
Because column names are encoded inside the cells
Different cells can have different columns

13

“Roles” column family has different columns in different cells

Notes on Data Model (Cont’d)
The version number can be user-supplied
Even does not have to be inserted in increasing order
Version number are unique within each key
Table can be very sparse
Many cells are empty
Keys are indexed as the primary key

Has two columns
[cnnsi.com & my.look.ca]

HBase Physical Model
15

HBase Physical Model
Each column family is stored in a separate file (called HTables)
Key & Version numbers are replicated with each column family
Empty cells are not stored
16

HBase maintains a multi-level index on values:

Example
17

Column Families
18

HBase Regions
Each HTable (column family) is partitioned horizontally into regions
Regions are counterpart to HDFS blocks
19

Each will be one region

HBase Architecture
20

Three Major Components
21
The HBaseMaster
One master

The HRegionServer
Many region servers

The HBase client

HBase Components
Region
A subset of a table’s rows, like horizontal range partitioning
Automatically done
RegionServer (many slaves)
Manages data regions
Serves data for reads and writes (using a log)
Master
Responsible for coordinating the slaves
Assigns regions, detects failures
Admin functions
22

Big Picture
23

Demo: HBase on Azure
Create an HBase cluster
Log into the cluster using ssh
Start Hbase shell:
hbase shell
list existing tables
list
get all rows
scan ‘Contacts’
get one row
get ‘Contacts’, ‘1000‘
update one row
put ‘Contacts’, ‘1000’, ‘Personal:Name’, ‘New Name‘
get ‘Contacts’, ‘1000’
24

HBase vs. HDFS
25

HBase vs. RDBMS
26

When to use HBase
27

HBase: Joins
HBase does not support joins
Can be done in the application layer
Using scan() and get() operations
Hive and Spark
28

Hive

29

Hive
Hive: data warehousing application in Hadoop
Query language is HQL, variant of SQL
Tables stored on HDFS as flat files
Developed by Facebook, now open source
Common idea:
Provide higher-level language to facilitate large-data processing
Higher-level language “compiles down” to MapReduce jobs

Data Model
Tables
Typed columns (int, float, string, boolean)
Also, list: map (for JSON-like data)
Partitions
For example, range-partition tables by date
Buckets
Hash partitions within ranges (useful for sampling, join optimization)
Source: cc-licensed slide by Cloudera

Metastore
Database: namespace containing a set of tables
Holds table definitions (column types, physical layout)
Holds partitioning information
Can be stored in Derby, MySQL, and many other relational databases
Source: cc-licensed slide by Cloudera

Physical Layout
Warehouse directory in HDFS
E.g., /user/hive/warehouse
Tables stored in subdirectories of warehouse
Partitions form subdirectories of tables
Actual data stored in flat files
Control char-delimited text, or SequenceFiles
With custom SerDe, can use arbitrary format
Source: cc-licensed slide by Cloudera

Hive: Example
Hive looks similar to an SQL database
Support Joins

Source: Material drawn from Cloudera training VM

Hive: Behind the Scenes
SELECT s.word, s.freq, k.freq FROM shakespeare s
JOIN bible k ON (s.word = k.word) WHERE s.freq >= 1 AND k.freq >= 1
ORDER BY s.freq DESC LIMIT 10;
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF shakespeare s) (TOK_TABREF bible k) (= (. (TOK_TABLE_OR_COL s) word) (. (TOK_TABLE_OR_COL k) word)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) word)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) freq)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL k) freq))) (TOK_WHERE (AND (>= (. (TOK_TABLE_OR_COL s) freq) 1) (>= (. (TOK_TABLE_OR_COL k) freq) 1))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEDESC (. (TOK_TABLE_OR_COL s) freq))) (TOK_LIMIT 10)))

(one or more of MapReduce jobs)
(Abstract Syntax Tree)

MapReduce Code

Pig Slides adapted from Olston et al.

Hive + HBase

Integration
How it works:
Hive can use tables that already exist in HBase or manage its own ones, but they still all reside in the same HBase instance

HBase
Hive table definitions
Points to an existing table
Manages this table from Hive

38

Integration
How it works:
When using an already existing table, defined as EXTERNAL, you can create multiple Hive tables that point to it

HBase
Hive table definitions
Points to some column
Points to other columns, different names

39

Integration
How it works:
Columns are mapped however you want, changing names and giving types

HBase table
Hive table definition

name STRING
age INT
siblings MAP
d:fullname
d:age
d:address
f:
persons
people

a column name was changed, one isn’t used, and a map points to a family
40

Accessing HBase from Spark
https://github.com/hortonworks-spark/shc
41

Row key

Column Family

value TimeStamp

Row key
Column Family
value TimeStamp

Row key
Time

Stamp

Column
“content

s:”
Column “anchor:”

“com.apac
he.ww

w”

t12

…”

t11

…”

t10
“anchor:apache

.com”
“APACH

E”

“com.cnn.w
ww”

t15
“anchor:cnnsi.co

m” “CNN”

t13
“anchor:my.look.

ca”
“CNN.co

m”

t6

…”

t5

…”

t3

…”

Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12

…”
t11

…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m”
“CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6

…”
t5

…”
t3

…”

Row key
Time

Stamp

Column
“content

s:”
Column “anchor:”

“com.apac
he.ww

w”

t12

…”

t11

…”

t10
“anchor:apache

.com”
“APACH

E”

“com.cnn.w
ww”

t15
“anchor:cnnsi.co

m” “CNN”

t13
“anchor:my.look.

ca”
“CNN.co

m”

t6

…”

t5

…”

t3

…”

Row key
Time
Stamp
Column
“content
s:”
Column “anchor:”
“com.apac
he.ww
w”
t12

…”
t11

…”
t10
“anchor:apache
.com”
“APACH
E”
“com.cnn.w
ww”
t15
“anchor:cnnsi.co
m”
“CNN”
t13
“anchor:my.look.
ca”
“CNN.co
m”
t6

…”
t5

…”
t3

…”

import java.io.IOException;

import java.util.ArrayList;

import java.util.Iterator;

import java.util.List;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

i
mport org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.KeyValueTextInputFormat;

import org.a
pache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

imp
ort org.apache.hadoop.mapred.SequenceFileInputFormat;

import org.apache.hadoop.mapred.SequenceFileOutputFormat;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.jobcontrol.Job;

import org.apache.hadoop.mapred.jobcontrol.JobC
ontrol;

import org.apache.hadoop.mapred.lib.IdentityMapper;

public class MRExample {

public static class LoadPages extends MapReduceBase

implements Mapper {

public void map(LongWritable k, Text val,

OutputCollector oc,

Reporter reporter) throws IOException {

// Pull the key out

String line = val.toString();

int firstComma = line.indexOf(‘,’);

String key = line.sub
string(0, firstComma);

String value = line.substring(firstComma + 1);

Text outKey = new Text(key);

// Prepend an index to the value so we know which file

// it came from.

Text outVal = new Text(“1
” + value);

oc.collect(outKey, outVal);

}

}

public static class LoadAndFilterUsers extends MapReduceBase

implements Mapper {

public void map(LongWritable k, Text val,

OutputCollector oc,

Reporter reporter) throws IOException {

// Pull the key out

String line = val.toString();

int firstComma = line.indexOf(‘,’);

String value = line.substring(
firstComma + 1);

int age = Integer.parseInt(value);

if (age < 18 || age > 25) return;

String key = line.substring(0, firstComma);

Text outKey = new Text(key);

// Prepend an index to the value so w
e know which file

// it came from.

Text outVal = new Text(“2” + value);

oc.collect(outKey, outVal);

}

}

public static class Join extends MapReduceBase

implements Reducer {

public void reduce(Text key,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// For each value, figure out which file it’s from and
store it

// accordingly.

List first = new ArrayList();

List second = new ArrayList();

while (iter.hasNext()) {

Text t = iter.next();

String value = t.to
String();

if (value.charAt(0) == ‘1’)
first.add(value.substring(1));

else second.add(value.substring(1));

reporter.setStatus(“OK”);

}

// Do the cross product and collect the values

for (String s1 : first) {

for (String s2 : second) {

String outval = key + “,” + s1 + “,” + s2;

oc.collect(null, new Text(outval));

reporter.setStatus(“OK”);

}

}

}

}

public static class LoadJoined extends MapReduceBase

implements Mapper {

public void map(

Text k,

Text val,

OutputColle
ctor oc,

Reporter reporter) throws IOException {

// Find the url

String line = val.toString();

int firstComma = line.indexOf(‘,’);

int secondComma = line.indexOf(‘,’, first
Comma);

String key = line.substring(firstComma, secondComma);

// drop the rest of the record, I don’t need it anymore,

// just pass a 1 for the combiner/reducer to sum instead.

Text outKey = new Text(key);

oc.collect(outKey, new LongWritable(1L));

}

}

public static class ReduceUrls extends MapReduceBase

implements Reducer {

public void reduce(

Text ke
y,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// Add up all the values we see

long sum = 0;

wh
ile (iter.hasNext()) {

sum += iter.next().get();

reporter.setStatus(“OK”);

}

oc.collect(key, new LongWritable(sum));

}

}

public static class LoadClicks extends MapReduceBase

i
mplements Mapper {

public void map(

WritableComparable key,

Writable val,

OutputCollector oc,

Reporter reporter)
throws IOException {

oc.collect((LongWritable)val, (Text)key);

}

}

public static class LimitClicks extends MapReduceBase

implements Reducer {

int count = 0;

public
void reduce(

LongWritable key,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// Only output the first 100 records

while (count
< 100 && iter.hasNext()) { oc.collect(key, iter.next()); count++; } } } public static void main(String[] args) throws IOException { JobConf lp = new JobConf(MRExample.class); lp.se tJobName("Load Pages"); lp.setInputFormat(TextInputFormat.class); lp.setOutputKeyClass(Text.class); lp.setOutputValueClass(Text.class); lp.setMapperClass(LoadPages.class); FileInputFormat.addInputPath(lp, new Path("/ user/gates/pages")); FileOutputFormat.setOutputPath(lp, new Path("/user/gates/tmp/indexed_pages")); lp.setNumReduceTasks(0); Job loadPages = new Job(lp); JobConf lfu = new JobConf(MRExample.class); lfu.s etJobName("Load and Filter Users"); lfu.setInputFormat(TextInputFormat.class); lfu.setOutputKeyClass(Text.class); lfu.setOutputValueClass(Text.class); lfu.setMapperClass(LoadAndFilterUsers.class); FileInputFormat.add InputPath(lfu, new Path("/user/gates/users")); FileOutputFormat.setOutputPath(lfu, new Path("/user/gates/tmp/filtered_users")); lfu.setNumReduceTasks(0); Job loadUsers = new Job(lfu); JobConf join = new JobConf( MRExample.class); join.setJobName("Join Users and Pages"); join.setInputFormat(KeyValueTextInputFormat.class); join.setOutputKeyClass(Text.class); join.setOutputValueClass(Text.class); join.setMapperClass(IdentityMap per.class); join.setReducerClass(Join.class); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/indexed_pages")); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/filtered_users")); FileOutputFormat.se tOutputPath(join, new Path("/user/gates/tmp/joined")); join.setNumReduceTasks(50); Job joinJob = new Job(join); joinJob.addDependingJob(loadPages); joinJob.addDependingJob(loadUsers); JobConf group = new JobConf(MRE xample.class); group.setJobName("Group URLs"); group.setInputFormat(KeyValueTextInputFormat.class); group.setOutputKeyClass(Text.class); group.setOutputValueClass(LongWritable.class); group.setOutputFormat(SequenceFi leOutputFormat.class); group.setMapperClass(LoadJoined.class); group.setCombinerClass(ReduceUrls.class); group.setReducerClass(ReduceUrls.class); FileInputFormat.addInputPath(group, new Path("/user/gates/tmp/joined")); FileOutputFormat.setOutputPath(group, new Path("/user/gates/tmp/grouped")); group.setNumReduceTasks(50); Job groupJob = new Job(group); groupJob.addDependingJob(joinJob); JobConf top100 = new JobConf(MRExample.class); top100.setJobName("Top 100 sites"); top100.setInputFormat(SequenceFileInputFormat.class); top100.setOutputKeyClass(LongWritable.class); top100.setOutputValueClass(Text.class); top100.setOutputFormat(SequenceFileOutputF ormat.class); top100.setMapperClass(LoadClicks.class); top100.setCombinerClass(LimitClicks.class); top100.setReducerClass(LimitClicks.class); FileInputFormat.addInputPath(top100, new Path("/user/gates/tmp/grouped")); FileOutputFormat.setOutputPath(top100, new Path("/user/gates/top100sitesforusers18to25")); top100.setNumReduceTasks(1); Job limit = new Job(top100); limit.addDependingJob(groupJob); JobControl jc = new JobControl("Find top 100 sites for users 18 to 25"); jc.addJob(loadPages); jc.addJob(loadUsers); jc.addJob(joinJob); jc.addJob(groupJob); jc.addJob(limit); jc.run(); } } import java import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.IdentityMapper; public class MRExample { public static class LoadPages extends MapReduceBase implements Mapper {

public void map(LongWritable k, Text val,

OutputCollector oc,

Reporter reporter) throws IOException {

// Pull the key out

String line = val.toString();

int firstComma = line.indexOf(‘,’);

String key = line.substring(0, firstComma);

String value = line.substring(firstComma + 1);

Text outKey = new Text(key);

// Prepend an index to the value so we know which file

// it came from.

Text outVal = new Text(“1” + value);

oc.collect(outKey, outVal);

}

}

public static class LoadAndFilterUsers extends MapReduceBase

implements Mapper {

public void map(LongWritable k, Text val,

OutputCollector oc,

Reporter reporter) throws IOException {

// Pull the key out

String line = val.toString();

int firstComma = line.indexOf(‘,’);

String value = line.substring(firstComma + 1);

int age = Integer.parseInt(value);

if (age < 18 || age > 25) return;

String key = line.substring(0, firstComma);

Text outKey = new Text(key);

// Prepend an index to the value so we know which file

// it came from.

Text outVal = new Text(“2” + value);

oc.collect(outKey, outVal);

}

}

public static class Join extends MapReduceBase

implements Reducer {

public void reduce(Text key,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// For each value, figure out which file it’s from and store it

// accordingly.

List first = new ArrayList();

List second = new ArrayList();

while (iter.hasNext()) {

Text t = iter.next();

String value = t.toString();

if (value.charAt(0) == ‘1’) first.add(value.substring(1));

else second.add(value.substring(1));

reporter.setStatus(“OK”);

}

// Do the cross product and collect the values

for (String s1 : first) {

for (String s2 : second) {

String outval = key + “,” + s1 + “,” + s2;

oc.collect(null, new Text(outval));

reporter.setStatus(“OK”);

}

}

}

}

public static class LoadJoined extends MapReduceBase

implements Mapper {

public void map(

Text k,

Text val,

OutputCollector oc,

Reporter reporter) throws IOException {

// Find the url

String line = val.toString();

int firstComma = line.indexOf(‘,’);

int secondComma = line.indexOf(‘,’, firstComma);

String key = line.substring(firstComma, secondComma);

// drop the rest of the record, I don’t need it anymore,

// just pass a 1 for the combiner/reducer to sum instead.

Text outKey = new Text(key);

oc.collect(outKey, new LongWritable(1L));

}

}

public static class ReduceUrls extends MapReduceBase

implements Reducer {

public void reduce(

Text key,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// Add up all the values we see

long sum = 0;

while (iter.hasNext()) {

sum += iter.next().get();

reporter.setStatus(“OK”);

}

oc.collect(key, new LongWritable(sum));

}

}

public static class LoadClicks extends MapReduceBase

implements Mapper {

public void map(

WritableComparable key,

Writable val,

OutputCollector oc,

Reporter reporter) throws IOException {

oc.collect((LongWritable)val, (Text)key);

}

}

public static class LimitClicks extends MapReduceBase

implements Reducer {

int count = 0;

public void reduce(

LongWritable key,

Iterator iter,

OutputCollector oc,

Reporter reporter) throws IOException {

// Only output the first 100 records

while (count < 100 && iter.hasNext()) { oc.collect(key, iter.next()); count++; } } } public static void main(String[] args) throws IOException { JobConf lp = new JobConf(MRExample.class); lp.setJobName("Load Pages"); lp.setInputFormat(TextInputFormat.class); lp.setOutputKeyClass(Text.class); lp.setOutputValueClass(Text.class); lp.setMapperClass(LoadPages.class); FileInputFormat.addInputPath(lp, new Path("/user/gates/pages")); FileOutputFormat.setOutputPath(lp, new Path("/user/gates/tmp/indexed_pages")); lp.setNumReduceTasks(0); Job loadPages = new Job(lp); JobConf lfu = new JobConf(MRExample.class); lfu.setJobName("Load and Filter Users"); lfu.setInputFormat(TextInputFormat.class); lfu.setOutputKeyClass(Text.class); lfu.setOutputValueClass(Text.class); lfu.setMapperClass(LoadAndFilterUsers.class); FileInputFormat.addInputPath(lfu, new Path("/user/gates/users")); FileOutputFormat.setOutputPath(lfu, new Path("/user/gates/tmp/filtered_users")); lfu.setNumReduceTasks(0); Job loadUsers = new Job(lfu); JobConf join = new JobConf(MRExample.class); join.setJobName("Join Users and Pages"); join.setInputFormat(KeyValueTextInputFormat.class); join.setOutputKeyClass(Text.class); join.setOutputValueClass(Text.class); join.setMapperClass(IdentityMapper.class); join.setReducerClass(Join.class); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/indexed_pages")); FileInputFormat.addInputPath(join, new Path("/user/gates/tmp/filtered_users")); FileOutputFormat.setOutputPath(join, new Path("/user/gates/tmp/joined")); join.setNumReduceTasks(50); Job joinJob = new Job(join); joinJob.addDependingJob(loadPages); joinJob.addDependingJob(loadUsers); JobConf group = new JobConf(MRExample.class); group.setJobName("Group URLs"); group.setInputFormat(KeyValueTextInputFormat.class); group.setOutputKeyClass(Text.class); group.setOutputValueClass(LongWritable.class); group.setOutputFormat(SequenceFileOutputFormat.class); group.setMapperClass(LoadJoined.class); group.setCombinerClass(ReduceUrls.class); group.setReducerClass(ReduceUrls.class); FileInputFormat.addInputPath(group, new Path("/user/gates/tmp/joined")); FileOutputFormat.setOutputPath(group, new Path("/user/gates/tmp/grouped")); group.setNumReduceTasks(50); Job groupJob = new Job(group); groupJob.addDependingJob(joinJob); JobConf top100 = new JobConf(MRExample.class); top100.setJobName("Top 100 sites"); top100.setInputFormat(SequenceFileInputFormat.class); top100.setOutputKeyClass(LongWritable.class); top100.setOutputValueClass(Text.class); top100.setOutputFormat(SequenceFileOutputFormat.class); top100.setMapperClass(LoadClicks.class); top100.setCombinerClass(LimitClicks.class); top100.setReducerClass(LimitClicks.class); FileInputFormat.addInputPath(top100, new Path("/user/gates/tmp/grouped")); FileOutputFormat.setOutputPath(top100, new Path("/user/gates/top100sitesforusers18to25")); top100.setNumReduceTasks(1); Job limit = new Job(top100); limit.addDependingJob(groupJob); JobControl jc = new JobControl("Find top 100 sites for users 18 to 25"); jc.addJob(loadPages); jc.addJob(loadUsers); jc.addJob(joinJob); jc.addJob(groupJob); jc.addJob(limit); jc.run(); } } /docProps/thumbnail.jpeg