程序代写代做代考 Java algorithm compiler Hive html database graph hadoop JDBC 7CCSMBDT – Big Data Technologies Week 7

7CCSMBDT – Big Data Technologies Week 7
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1

Objectives
 Hive
 Chapter 9.2 Bagha
 Thusoo et al. Hive – a petabyte scale data warehouse using Hadoop.
ICDE ‘10. https://doi.org/10.1109/ICDE.2010.5447738
 http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
 SparkSQL
 Chapter 9.1 Bagha
 M. Armbrust et al. Spark SQL: Relational Data Processing in Spark
SIGMOD ‘15.
http://dl.acm.org/citation.cfm?id=2742797&CFID=726556118&CFTOKEN=24317961  http://spark.apache.org/docs/1.6.0/sql-programming-guide.html
2

HIVE
 Motivation
 Data warehousing
 Not scalable enough
 Prohibitively expensive at web scale
 Up to $200K/TB
 Little control over execution method  Query optimization is hard
 Parallel environment  Little or no statistics
 Lots of UDFs
3

HIVE
 Motivation
 Can MapReduce help?
 MapReduce is scalable
 Only parallelizable operations  No transactions
 MapReduce runs on cheap commodity hardware  Clusters, cloud
 MapReduce is procedural
 A pipe for processing data
4

HIVE
 Motivation
 Can MapReduce help?
 But
1. Data flow is rigid (one-input, two-stage)
and is complex to change (workarounds below)
5

HIVE
 Motivation
 Can MapReduce help?
 But
2. custom code has to be written for even the most
common operations
e.g., join, sorting, distinct (Recall MR patterns)
 not all users willing/able to do it
3. Map and reduce functions are opaque difficult to maintain, extend, optimize
6

HIVE
 Motivation
 Can MapReduce help?
 Main limitation of MapReduce that HIVE tries to address
The map-reduce programming model is very low level and requires developers to write custom programs which are hard to maintain and reuse.
7

HIVE
 Motivation
 Can MapReduce help?
 Solution
 Develop higher-level data processing languages
that “compile down” to Hadoop jobs
 HIVE, PIG
8

HIVE
Intuition
* Make the unstructured data look like tables regardless of how it really lays out
* SQL based query can be directly against these tables
* Generate specific execution plan for this query
What is HIVE?
*An open-source data warehousing solution built on top of Hadoop, which supports queries that are
(i) expressed in HiveQL (an SQL-like declarative language) (ii) compiled into MapReduce jobs that are executed
using Hadoop
9

HIVE
History
• Started at Facebook, out of need
• More data (15TB in 2007, 700TB in 2009)
• Jobs using RDBMS took days
• Hadoop jobs much more efficient, but
took too long to write  lowered peoples’ productivity
• Aimed to bring familiar concepts for structured data (tables, columns, partitions, SQL)
to the unstructured data of Hadoop
10

HIVE
Applications
• Log processing
• Text mining
• Document indexing
• Customer-facing business intelligence (Google analytics)
• Predictive modeling
• Hypothesis testing
Slide adapted from:
http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
11

HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
12

HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
13

HIVE
Components of HIVE * Optimizer:
Optimizes the tasks (improves HiveQL)
* Executor:
(i) executes the tasks in the right order (ii) Interacts with Hadoop
* Metastore:
(i) Acts as the system catalog. That is,
it stores information about tables, partitions, locations in HDFS etc.
(ii) It runs on an RDBMS. Not on HDFS, because it needs to have very low latency.
* Thrift Server:
Provides interface between clients and Metastore (so that clients can query or Modify the information in Metastore)
14

HIVE
Data model: comprised of the following data units
* Table: each table consists of a number of rows, and each row consists of a specified
number of columns
Basic column types: int, float, boolean Complex column types:
List, Map, Struct
Arbitrary data types:
(i) They are created by composing basic & complex types
list >
(i) They are useful for external and legacy data
15

HIVE
Data model: comprised of the following data units
* Partition: the result of decomposing a table into partitions, based on values
Creating a partitioned table
– The partitioning columns are not part of table data but behave like regular columns Adding new partition to partitioned table
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
16

HIVE
Data model: comprised of the following data units * Bucket: the result of decomposing a table into buckets
– bucketing is useful when partitions are too many and small (Hive will not create them)
Bucketing is useful for sampling
e.g., a 1/96 sample can be generated efficiently by looking at the first bucket
17

HIVE
The mapping of data units into the HDFS name space
• A table is stored in a directory in HDFS
• A partition of the table is stored in a subdirectory within a table’s HDFS directory
• A bucket is stored in a file within the partition’s or table’s directory depending on whether the table is a partitioned table or not.
TableHDFS directory
Partitionssubdirectory of the table’s HDFS directory Buckets  file within dir or subdir
Warehouse root directory
Example:
table test_part mapped to /user/hive/warehouse/test_part partition(ds=‘2009-02-02’,hr=11) of test_part mapped to
/user/hive/warehouse/test_part/ds=2009-02-02/hr=11 bucket1 mapped to file bucket1 in /user/hive/…. /hr=11
18

HIVE
How the mapping helps efficiency
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
Hive prunes the data by scanning only the required sub-directories
Bucketing is useful for sampling
 Hive uses the file corresponding to a bucket
19

HIVE
Query language
Comprises of a subset of SQL (recall, Hive aim of peoples’ productivity)
– From
– Joins (inner, left outer, right outer, outer)
– Group-by
– Aggregation
– Union all
Metadata browsing capabilities
– show tables
– explain plan
Reference:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
20

HIVE
Query language
Some differences/limitations
1. Hive does not support inserting into an existing table or data partition and all inserts overwrite existing data
2. OnlyequalitypredicatesaresupportedinJOIN
21

HIVE
HiveQL
Shows more readable output about the table
Shows details about the table
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
22

HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
23

HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
24

HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
25

HIVE
 GROUP BY operation
• External table stores files on the HDFS server but tables are not linked to the source file completely.
• If you delete an external table the file still remains on the HDFS server.
26

HIVE
 JOIN operation
 Mapper sends all rows with the same key to a single
reducer
 Reducer does the join
 What if many rows have the same key? (e.g., power-law distribution)
 Efficiency drops
27

HIVE
 Efficient Map-side JOIN operation
 Applied for joining a small table with a large table
 Keep smaller table data in memory first
 Join with a chunk of larger table data each time
 Everything done in memory, without the reduce step required in the more common join scenarios
Default size (in bytes)
for a table to be considered as small
28

HIVE
 Serializer/Deserializer (SerDe)
 Describes how to load the data from the file into a
representation that makes it looks like a table
 Several built-in, many other third-party SerDes
 Implemented using JAVA. JAVA is expensive to create objects
 Lazy SerDe
 Does not fully materialize an object, until individual
attributes are necessary.
 Reduces the overhead to create unnecessary objects in Hive.
 Increase performance
29

HIVE
 Pros
 A easy way to process large scale data
 Support SQL-based queries
 Provide more user defined interfaces to extend
 Programmability
 Efficient execution plans for performance
 Interoperability with other database
30

HIVE
 Cons
 Noeasywaytoappenddata  Files in HDFS are immutable
31

SparkSQL
Spark SQL
 A new module in Apache Spark that integrates relational processing with Spark’s functional programming API.
 It lets Spark programmers leverage the benefits of relational processing (e.g., declarative queries and optimized storage)
• It lets SQL users call complex analytics libraries in Spark (e.g., machine learning).
 Part of the core distribution since April 2014
 Runs SQL / HiveQL queries, optionally alongside or replacing existing Hive deployments
32

SparkSQL
 Motivation
 MapReduce
low-level programming language
 programmers needed to use manual optimizations to achieve performance
 Declarative languages SQL  Users want to perform
 ETL to and from various (semi- or unstructured) data sources
Advanced analytics (machine learning, graph processing) that are hard to express in relational systems
33

SparkSQL
 Motivation
 SparkSQL tries to bridge the two models
Main goal: Extend relational processing to cover native RDDs in Spark and a much wider range of data sources, by:
1. Supporting relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer-friendly API.
2. Providing high performance using established DBMS techniques.
3. Easily supporting new data sources, including semi-structured data and external
databases amenable to query federation.
4. Enabling extension with advanced analytics algorithms such as graph processing and machine learning.
34

SparkSQL
 Motivation
 SparkSQL tries to bridge the two models with:
1. A DataFrame API that can perform relational operations on both external data sources and Spark’s built-in RDDs.
2. A highly extensible optimizer, Catalyst, that uses features of Scala to add composable rule, control code gen., and define extensions.
35

Load SQLContext
Read data and
Create RDDs as in “usual” Spark
https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i
36

Create Data Frame Register it as a table
tcp_interactions = sqlContext.sql(“”” SELECT duration, dst_bytes FROM interactions WHERE protocol_type = ‘tcp’ AND duration > 1000 AND dst_bytes = 0 “””)
Run SQL to the table and get back an RDD
tcp_interactions_out = tcp_interactions.map(lambda p: “Duration: {}, Dest. bytes: {}”.format(p.duration,
for ti_out in tcp_interactions_out.collect(): print ti_out
p.dst_bytes))
https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i
37

SparkSQL
SparkSQL
• It runs as a library on top of Spark
• It exposes interfaces accessible through JDBC and command-line
• It exposes the DataFrame API which is accessible through different programming languages
38

SparkSQL
 DataFrame
 A distributed collection of rows with the same schema
 Similar to a table in RDBMS
 Can be constructed from external data sources or RDDs
 Support relational operators (e.g. where, groupby) as well as Spark operations.
 Evaluated lazily:
 each DataFrame objects represents a logical plan to compute a
dataset but no execution occurs until user calls an output operation  Thisenablesoptimization
39

SparkSQL
 DataFrame  Example
Define a DataFrame users from a Hive table “users”
Derive another DataFrame young based on DataFrame users
Apply Spark’s count() action on the young DataFrame
The Dataframe young represents a logical plan.
When the user calls count, which is an output operation, Spark SQL builds a physical
plan to compute the final result.
This might include optimizations such as using
an index in the data source to count the matching rows.
40

SparkSQL
 Data Model
 SparkSQL uses a nested data model based on HIVE
 SparkSQL supports different data types:
 primitive SQL types: boolean, integer, double, decimal, string, data, timestamp
 complextypes:structs,arrays,maps,andunions
 userdefinedtypes
 Supporting these data types allows modeling data from  HIVE
 RDBMS
 JSON
 Native objects in JAVA/Python/Scala
Data Model
41

SparkSQL
 User operations on DataFrames
 Operators applied using a Domain-Specific Language (DSL)
 Examples: select, where, join, groupBy
 Operators build up an Abstract Syntax Tree (AST) which is optimized by Catalyst
computes the number of female employees in each department
Expression representing the deptId column
Operator for equality test
42

SparkSQL
 User operations on DataFrames
 Alternatively, DataFrames can be registered as temporary SQL tables and queried using SQL
43

SparkSQL
 Advantages of DataFrames over Relational Query Languages
 Code can be written in different languages and benefit from optimizations
across the whole logical plan
 Programmerscanusecontrolstructures(if,loops)
 Logical plan is analyzed eagerly although query results are computed lazily
 SparkSQL reports an error as soon as user types an invalid line of code
44

SparkSQL
 Querying Native datasets
 SparkSQL infers the schema of the native objects of a programming language automatically
 In JAVA, it uses reflection (inspects and calls dynamically calls classes, methods, etc. at runtime)
 In Python, it samples the dataset
 Native objects accessed in-place to avoid expensive data format
transformation.
 Benefits of automatic schema inference
 Run relational operations on existing Spark programs.
 Combine RDDs with external structured data (e.g., HIVE tables)
45

SparkSQL
 Catalyst
 QueryoptimizerbuiltonScala
 UsesScala’spatternmatchingfeatures
 Represents Abstract Syntax Trees (ASTs) and applies rules to manipulate them
AST
rule
Add
x + (1 + 2)
x + 3
Attribute(x)
Literal(3)
46

 The following slides are just for your reference
47

SparkSQL
• Pattern matching functions (e.g., transform) that transform subtrees into specific structures.
 Partial function—skip over subtrees that do not matchno need to modify existing rules when new types of operators are added to the system.
• Multiple patterns in the same transform call. Catalyst knows how to group them
• transform can contain arbitrary Scala code, for custom optimizations
48

SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
DataFrames and SQL share the same optimization/execution pipeline
49
Cost Model

SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
Some attributes are unresolved (of unknown type or not matched to an input table)
e.g. SELECT col FROM sales
the type of col, or even whether it is a valid column name, is not known until we look up the table sales
Analysis applies rules to resolve the attribute types, using Catalog, an object that tracks
tables in all data sources.
50
Cost Model

SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
• Applies standard rule-based optimization
 when executing a query we must follow certain predefined rules
• constant folding
• 1+3+x becomes 4+x
51
Cost Model

SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
Spark SQL takes a logical plan and generates one or more physical plans, using physical operators that match the Spark execution engine. It then selects a plan using a cost model
e.g., map-side join, where a small table is materialized and sent to all mappers [same idea as in HIVE]
Broadcast Join with Spark
52
Cost Model

SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Analysis
Code Generation
Selected
Physical RDDs
Plan
SQL AST DataFrame
Optimized
Physical Plans
Java byte-code is generated to run on each machine. This offers speed up compared to interpreting the code.
53
Cost Model