7CCSMBDT – Big Data Technologies Week 7
Grigorios Loukides, PhD (grigorios.loukides@kcl.ac.uk)
Spring 2017/2018
1
Objectives
Hive
Chapter 9.2 Bagha
Thusoo et al. Hive – a petabyte scale data warehouse using Hadoop.
ICDE ‘10. https://doi.org/10.1109/ICDE.2010.5447738
http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
SparkSQL
Chapter 9.1 Bagha
M. Armbrust et al. Spark SQL: Relational Data Processing in Spark
SIGMOD ‘15.
http://dl.acm.org/citation.cfm?id=2742797&CFID=726556118&CFTOKEN=24317961 http://spark.apache.org/docs/1.6.0/sql-programming-guide.html
2
HIVE
Motivation
Data warehousing
Not scalable enough
Prohibitively expensive at web scale
Up to $200K/TB
Little control over execution method Query optimization is hard
Parallel environment Little or no statistics
Lots of UDFs
3
HIVE
Motivation
Can MapReduce help?
MapReduce is scalable
Only parallelizable operations No transactions
MapReduce runs on cheap commodity hardware Clusters, cloud
MapReduce is procedural
A pipe for processing data
4
HIVE
Motivation
Can MapReduce help?
But
1. Data flow is rigid (one-input, two-stage)
and is complex to change (workarounds below)
5
HIVE
Motivation
Can MapReduce help?
But
2. custom code has to be written for even the most
common operations
e.g., join, sorting, distinct (Recall MR patterns)
not all users willing/able to do it
3. Map and reduce functions are opaque difficult to maintain, extend, optimize
6
HIVE
Motivation
Can MapReduce help?
Main limitation of MapReduce that HIVE tries to address
The map-reduce programming model is very low level and requires developers to write custom programs which are hard to maintain and reuse.
7
HIVE
Motivation
Can MapReduce help?
Solution
Develop higher-level data processing languages
that “compile down” to Hadoop jobs
HIVE, PIG
8
HIVE
Intuition
* Make the unstructured data look like tables regardless of how it really lays out
* SQL based query can be directly against these tables
* Generate specific execution plan for this query
What is HIVE?
*An open-source data warehousing solution built on top of Hadoop, which supports queries that are
(i) expressed in HiveQL (an SQL-like declarative language) (ii) compiled into MapReduce jobs that are executed
using Hadoop
9
HIVE
History
• Started at Facebook, out of need
• More data (15TB in 2007, 700TB in 2009)
• Jobs using RDBMS took days
• Hadoop jobs much more efficient, but
took too long to write lowered peoples’ productivity
• Aimed to bring familiar concepts for structured data (tables, columns, partitions, SQL)
to the unstructured data of Hadoop
10
HIVE
Applications
• Log processing
• Text mining
• Document indexing
• Customer-facing business intelligence (Google analytics)
• Predictive modeling
• Hypothesis testing
Slide adapted from:
http://blog.cloudera.com/wp-content/uploads/2010/01/6-IntroToHive.pdf
11
HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
12
HIVE
Components of HIVE
* Client components: Command Line Interface (CLI), the web UI, JDBC/ODBC driver.
* Driver:
(i) Manages the lifecycle of a HiveQL Statement as it moves through HIVE (ii) Maintains a session handle
(iii) Maintains session statistics
* Compiler:
compiles HiveQL into MapReduce tasks
13
HIVE
Components of HIVE * Optimizer:
Optimizes the tasks (improves HiveQL)
* Executor:
(i) executes the tasks in the right order (ii) Interacts with Hadoop
* Metastore:
(i) Acts as the system catalog. That is,
it stores information about tables, partitions, locations in HDFS etc.
(ii) It runs on an RDBMS. Not on HDFS, because it needs to have very low latency.
* Thrift Server:
Provides interface between clients and Metastore (so that clients can query or Modify the information in Metastore)
14
HIVE
Data model: comprised of the following data units
* Table: each table consists of a number of rows, and each row consists of a specified
number of columns
Basic column types: int, float, boolean Complex column types:
List
Arbitrary data types:
(i) They are created by composing basic & complex types
list
HIVE
Data model: comprised of the following data units
* Partition: the result of decomposing a table into partitions, based on values
Creating a partitioned table
– The partitioning columns are not part of table data but behave like regular columns Adding new partition to partitioned table
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
16
HIVE
Data model: comprised of the following data units * Bucket: the result of decomposing a table into buckets
– bucketing is useful when partitions are too many and small (Hive will not create them)
Bucketing is useful for sampling
e.g., a 1/96 sample can be generated efficiently by looking at the first bucket
17
HIVE
The mapping of data units into the HDFS name space
• A table is stored in a directory in HDFS
• A partition of the table is stored in a subdirectory within a table’s HDFS directory
• A bucket is stored in a file within the partition’s or table’s directory depending on whether the table is a partitioned table or not.
TableHDFS directory
Partitionssubdirectory of the table’s HDFS directory Buckets file within dir or subdir
Warehouse root directory
Example:
table test_part mapped to /user/hive/warehouse/test_part partition(ds=‘2009-02-02’,hr=11) of test_part mapped to
/user/hive/warehouse/test_part/ds=2009-02-02/hr=11 bucket1 mapped to file bucket1 in /user/hive/…. /hr=11
18
HIVE
How the mapping helps efficiency
Creating partitions speeds up query processing, because only relevant data in Hadoop are scanned
Hive prunes the data by scanning only the required sub-directories
Bucketing is useful for sampling
Hive uses the file corresponding to a bucket
19
HIVE
Query language
Comprises of a subset of SQL (recall, Hive aim of peoples’ productivity)
– From
– Joins (inner, left outer, right outer, outer)
– Group-by
– Aggregation
– Union all
Metadata browsing capabilities
– show tables
– explain plan
Reference:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
20
HIVE
Query language
Some differences/limitations
1. Hive does not support inserting into an existing table or data partition and all inserts overwrite existing data
2. OnlyequalitypredicatesaresupportedinJOIN
21
HIVE
HiveQL
Shows more readable output about the table
Shows details about the table
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
22
HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
23
HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
24
HIVE
HiveQL
http://2xbbhjxc6wk3v21p62t8n4d4.wpengine.netdna-cdn.com/wp- content/uploads/2013/05/hql_cheat_sheet.pdf
25
HIVE
GROUP BY operation
• External table stores files on the HDFS server but tables are not linked to the source file completely.
• If you delete an external table the file still remains on the HDFS server.
26
HIVE
JOIN operation
Mapper sends all rows with the same key to a single
reducer
Reducer does the join
What if many rows have the same key? (e.g., power-law distribution)
Efficiency drops
27
HIVE
Efficient Map-side JOIN operation
Applied for joining a small table with a large table
Keep smaller table data in memory first
Join with a chunk of larger table data each time
Everything done in memory, without the reduce step required in the more common join scenarios
Default size (in bytes)
for a table to be considered as small
28
HIVE
Serializer/Deserializer (SerDe)
Describes how to load the data from the file into a
representation that makes it looks like a table
Several built-in, many other third-party SerDes
Implemented using JAVA. JAVA is expensive to create objects
Lazy SerDe
Does not fully materialize an object, until individual
attributes are necessary.
Reduces the overhead to create unnecessary objects in Hive.
Increase performance
29
HIVE
Pros
A easy way to process large scale data
Support SQL-based queries
Provide more user defined interfaces to extend
Programmability
Efficient execution plans for performance
Interoperability with other database
30
HIVE
Cons
Noeasywaytoappenddata Files in HDFS are immutable
31
SparkSQL
Spark SQL
A new module in Apache Spark that integrates relational processing with Spark’s functional programming API.
It lets Spark programmers leverage the benefits of relational processing (e.g., declarative queries and optimized storage)
• It lets SQL users call complex analytics libraries in Spark (e.g., machine learning).
Part of the core distribution since April 2014
Runs SQL / HiveQL queries, optionally alongside or replacing existing Hive deployments
32
SparkSQL
Motivation
MapReduce
low-level programming language
programmers needed to use manual optimizations to achieve performance
Declarative languages SQL Users want to perform
ETL to and from various (semi- or unstructured) data sources
Advanced analytics (machine learning, graph processing) that are hard to express in relational systems
33
SparkSQL
Motivation
SparkSQL tries to bridge the two models
Main goal: Extend relational processing to cover native RDDs in Spark and a much wider range of data sources, by:
1. Supporting relational processing both within Spark programs (on native RDDs) and on external data sources using a programmer-friendly API.
2. Providing high performance using established DBMS techniques.
3. Easily supporting new data sources, including semi-structured data and external
databases amenable to query federation.
4. Enabling extension with advanced analytics algorithms such as graph processing and machine learning.
34
SparkSQL
Motivation
SparkSQL tries to bridge the two models with:
1. A DataFrame API that can perform relational operations on both external data sources and Spark’s built-in RDDs.
2. A highly extensible optimizer, Catalyst, that uses features of Scala to add composable rule, control code gen., and define extensions.
35
Load SQLContext
Read data and
Create RDDs as in “usual” Spark
https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i
36
Create Data Frame Register it as a table
tcp_interactions = sqlContext.sql(“”” SELECT duration, dst_bytes FROM interactions WHERE protocol_type = ‘tcp’ AND duration > 1000 AND dst_bytes = 0 “””)
Run SQL to the table and get back an RDD
tcp_interactions_out = tcp_interactions.map(lambda p: “Duration: {}, Dest. bytes: {}”.format(p.duration,
for ti_out in tcp_interactions_out.collect(): print ti_out
p.dst_bytes))
https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i
37
SparkSQL
SparkSQL
• It runs as a library on top of Spark
• It exposes interfaces accessible through JDBC and command-line
• It exposes the DataFrame API which is accessible through different programming languages
38
SparkSQL
DataFrame
A distributed collection of rows with the same schema
Similar to a table in RDBMS
Can be constructed from external data sources or RDDs
Support relational operators (e.g. where, groupby) as well as Spark operations.
Evaluated lazily:
each DataFrame objects represents a logical plan to compute a
dataset but no execution occurs until user calls an output operation Thisenablesoptimization
39
SparkSQL
DataFrame Example
Define a DataFrame users from a Hive table “users”
Derive another DataFrame young based on DataFrame users
Apply Spark’s count() action on the young DataFrame
The Dataframe young represents a logical plan.
When the user calls count, which is an output operation, Spark SQL builds a physical
plan to compute the final result.
This might include optimizations such as using
an index in the data source to count the matching rows.
40
SparkSQL
Data Model
SparkSQL uses a nested data model based on HIVE
SparkSQL supports different data types:
primitive SQL types: boolean, integer, double, decimal, string, data, timestamp
complextypes:structs,arrays,maps,andunions
userdefinedtypes
Supporting these data types allows modeling data from HIVE
RDBMS
JSON
Native objects in JAVA/Python/Scala
Data Model
41
SparkSQL
User operations on DataFrames
Operators applied using a Domain-Specific Language (DSL)
Examples: select, where, join, groupBy
Operators build up an Abstract Syntax Tree (AST) which is optimized by Catalyst
computes the number of female employees in each department
Expression representing the deptId column
Operator for equality test
42
SparkSQL
User operations on DataFrames
Alternatively, DataFrames can be registered as temporary SQL tables and queried using SQL
43
SparkSQL
Advantages of DataFrames over Relational Query Languages
Code can be written in different languages and benefit from optimizations
across the whole logical plan
Programmerscanusecontrolstructures(if,loops)
Logical plan is analyzed eagerly although query results are computed lazily
SparkSQL reports an error as soon as user types an invalid line of code
44
SparkSQL
Querying Native datasets
SparkSQL infers the schema of the native objects of a programming language automatically
In JAVA, it uses reflection (inspects and calls dynamically calls classes, methods, etc. at runtime)
In Python, it samples the dataset
Native objects accessed in-place to avoid expensive data format
transformation.
Benefits of automatic schema inference
Run relational operations on existing Spark programs.
Combine RDDs with external structured data (e.g., HIVE tables)
45
SparkSQL
Catalyst
QueryoptimizerbuiltonScala
UsesScala’spatternmatchingfeatures
Represents Abstract Syntax Trees (ASTs) and applies rules to manipulate them
AST
rule
Add
x + (1 + 2)
x + 3
Attribute(x)
Literal(3)
46
The following slides are just for your reference
47
SparkSQL
• Pattern matching functions (e.g., transform) that transform subtrees into specific structures.
Partial function—skip over subtrees that do not matchno need to modify existing rules when new types of operators are added to the system.
• Multiple patterns in the same transform call. Catalyst knows how to group them
• transform can contain arbitrary Scala code, for custom optimizations
48
SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
DataFrames and SQL share the same optimization/execution pipeline
49
Cost Model
SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
Some attributes are unresolved (of unknown type or not matched to an input table)
e.g. SELECT col FROM sales
the type of col, or even whether it is a valid column name, is not known until we look up the table sales
Analysis applies rules to resolve the attribute types, using Catalog, an object that tracks
tables in all data sources.
50
Cost Model
SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
• Applies standard rule-based optimization
when executing a query we must follow certain predefined rules
• constant folding
• 1+3+x becomes 4+x
51
Cost Model
SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Code Generation
Selected
Physical RDDs
Plan
Analysis
SQL AST
Optimized
Physical Plans
DataFrame
Spark SQL takes a logical plan and generates one or more physical plans, using physical operators that match the Spark execution engine. It then selects a plan using a cost model
e.g., map-side join, where a small table is materialized and sent to all mappers [same idea as in HIVE]
Broadcast Join with Spark
52
Cost Model
SparkSQL
Plan Optimization & Execution
Logical Physical Optimization Planning
Unresolved
Logical Plan Logical Plan Logical Plan
Catalog
Analysis
Code Generation
Selected
Physical RDDs
Plan
SQL AST DataFrame
Optimized
Physical Plans
Java byte-code is generated to run on each machine. This offers speed up compared to interpreting the code.
53
Cost Model