COMP9313:
Big Data Management
Spark SQL
Why Spark SQL?
•Table is one of the most commonly used ways to present data
• Easy to scan, analyze, filter, sort, etc.
• Widely used in communication, research, and data
analysis
•Table has (relatively) stable data structure • 2 Dimension: row and column
• Pre-defined attribute types
•In general, customized/specialized is better! 2
What is Spark SQL?
•Part of the core distribution since Spark 1.0 (April 2014)
•Tightly integrated way to work with structured data
• tables with rows/columns
•Transform RDDs using SQL
•Data source integration: Hive, Parquet, JSON, csv, etc.
• Spark SQL is not about SQL
• Aims to Create and Run Spark Programs Faster
3
Compute Average
source
Use RDD
Use Spark SQL
4
DataFrame
•A DataFrame is a distributed collection of data organized into named columns.
•The DataFrames API is:
• intended to enable wider audiences beyond “Big Data” engineers to leverage the power of distributed processing
• inspired by data frames in R and Python (Pandas) • designed from the ground-up to support modern
big data and data science applications • an extension to the existing RDD API
5
DataFrame
•DataFrames have the following features:
• Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
• Support for a wide array of data formats and storage systems
• State-of-the-art optimization and code generation through the Spark SQL Catalyst optimizer
• Seamless integration with all big data tooling and infrastructure via Spark
• APIs for Python, Java, Scala, and R 6
DataFrame
•For new users familiar with data frames in other programming languages, this API should make them feel at home.
•For existing Spark users, the API will make Spark easier to program.
•For both sets of users, DataFrames will improve performance through intelligent optimizations and code-generation.
7
DataFrame Data Sources
•Spark SQL’s Data Source API can read and write DataFrames using a variety of formats.
• E.g., structured data files, tables in Hive, external databases, or existing RDDs
8
Creating DataFrames
•SparkSession is the entry point to programming Spark with the Dataset and DataFrame API.
• Creates a DataFrame based on the content of a JSON file:
spark = SparkSession.builder.config(conf=conf).getOrCreate() df = spark.read.format(”json”).load(”example.json”)
# Displays the content of the DataFrame to stdout df.show()
// +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
9
DataFrame Operations
df.printSchema()
// root
// |– age: long (nullable = true) // |– name: string (nullable = true)
# Select only the “name” column df.select(“name”).show()
// +——-+
// | name|
// +——-+ // |Michael| // | Andy| // | Justin| // +——-+
# Select everybody, but increment the age by 1 df.select(df[“name”], df[“age”] + 1).show() // +——-+———+
// | name|(age + 1)|
// +——-+———+ // |Michael| null| //| Andy| 31| // | Justin| 20| // +——-+———+
10
DataFrame Operations
# Select people older than 21 df.filter(df[“age”] > 21).show() // +—+—-+
// |age|name|
// +—+—-+ // | 30|Andy| // +—+—-+
# Count people by age df.groupBy(“age”).count().show() // +—-+—–+
// | age|count|
// +—-+—–+
//|19| 1|
// |null| 1|
//|30| 1|
// +—-+—–+
11
DataFrames and Spark SQL
•DataFrames are fundamentally tied to Spark SQL.
• The DataFrames API provides a programmatic interface—really, a domain-specific language (DSL)—for interacting with your data.
• Spark SQL provides a SQL-like interface.
• What you can do in Spark SQL, you can do in
DataFrames
• … and vice versa
12
Spark SQL
•Spark SQL allows you to manipulate distributed data with SQL queries.
•You issue SQL queries through a SparkSession, using the sql() method.
•sql() enables applications to run SQL queries programmatically and returns the result as
a DataFrame.
•You can mix DataFrame methods and SQL queries in the same code.
13
Spark SQL
•To use SQL, you need make a table aliasfor a DataFrame, using registerTempTable()
# Register the DataFrame as a SQL temporary view df.registerTempTable(” people “)
spark.sql(“SELECT // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
* FROM people”).show()
14
More on DataFrames •DataFrames are lazy.
•Transformations contribute to the query plan, but they don’t execute anything.
•Actions cause the execution of the query
Transformation examples
filter select drop intersect join
Action examples
count collect show head take
15
More on DataFrames – Columnar Storage
•DataFrames use columnar storage
• Transpose of row-based storage
• Data in each column (with the same type) are packed together
16
More on DataFrames – Columnar Storage
•Data access is more regular •Denser storage
•Compatibility and zero serialization •More Extensions to GPU/TPU •Efficient query processing
•Not good for transactions
• But we don’t expect many transactions in Spark!
17
DataFrame and RDD
•DataFrames are built on top of the Spark RDD API
• You can use normal RDD operations on DataFrames
•Stick with the DataFrame API if possible
• Using RDD operations will often give you back
an RDD, not a DataFrame
• The DataFrame API is likely to be more efficient, because it can optimize the underlying operations with Catalyst
18
DataFrame and RDD
• DataFrame more like a traditional database of two- dimensional form, in addition to data, but also to grasp the structural information of the data, that is, schema
• RDD[Person] although with Person for type parameters, but the Spark framework itself does not understand internal structure of Person class
• DataFrame has provided a detailed structural information, making Spark SQL can clearly know what columns are included in the dataset, and what is the name and type of each column. Thus Spark SQL query optimizer can target optimization
19
DataFrames can be significantly faster than RDDs. And they perform the same, regardless of language
20
Spark SQL – Plan Optimization & Execution •Logical plan and physical plans
•Execution is lazy, allowing it to be optimized
DataFrames and SQL share the same optimization/execution pipeline
21
Logical Plan
• Logical Plan is an abstract of all transformation steps that need to be performed
• It does not refer anything about the Driver (Master Node) or Executor (Worker Node)
• SparkContext is responsible for generating and storing it
• Logical Plan is divided into three parts: • Unresolved Logical Plan
• Resolved Logical Plan
• Optimized Logical Plan
22
SQL to Resolved Logical Plan
•If the SQL query is unresolvable, then it will be rejected
• Otherwise a resolved logical plan is created
23
Resolved logical plan to Optimized Logical Plan • Catalyst optimizer will try to optimize the plan
24
Physical Plan
• A physical plan describes computation on datasets with specific definitions on how to conduct the computation
• A physical plan is executable
25
Schemas in DataFrame
• A schema is the description of the structure of your data
• column names and types
•DataFrames Have Schemas
• A Parquet table has a schema that Spark can use • Spark can infer a Schema from a JSON file
[ {”name”: “Amy”, ”course”: ”Maths”, ”score”: 75 },
{”name”: “Ravi”, ”course”: ”Biology”, ”score”: 93 },
…]
• What if the data file have no schema?
• Create an RDD of a particular type and let Spark infer the schema
from that type
• Use the API to specify the schema programmatically 26
Specify the Schema
• E.g., Create DataFrame from RDD or list
• The schema must match the real data, or an exception will be thrown at runtime.
• If schema is
• A list of column names, type will be inferred from data • None, will try to infer column names and types
• You can have Spark tell you what it thinks the data schema is, by calling the printSchema()
from pyspark.sql.types import *
schema = StructType([StructField(“name”, StringType(), False), StructField(”course”, StringType(), False),
StructField(”score”, IntegerType(), False)])
df = spark.createDataFrame(data, schema)
27
Show()
•You can look at the first n elements in a DataFrame with the show() method. If not specified, n defaults to 20. This method is an action, it
• reads (or re-reads) the input source
• executes the RDD DAG across the cluster • pulls the n elements back to the driver
• displays those elements in a tabular form
28
cache()
•Spark can cache a DataFrame, using an in- memory columnar format, by calling df.cache()
•Spark will scan only those columns used by the DataFrame and will automatically tune compression to minimize memory usage and GC pressure.
•You can call the unpersist() method to remove the cached data from memory.
29
select()
•select() is like a SQL SELECT, allowing you to limit the results to specific columns
•you can create on-the-fly derived columns
30
alias()
•alias() allows you to rename a column.
• Especially useful for generated columns
31
filter()
•The filter() method allows you to filter rows out of your results
• WHERE in SQL
32
orderBy()
•The orderBy() method allows you to sort the results
• ORDER BY age DESC in SQL
33
groupBy()
•Often followed by aggregation methods (e.g., sum(), count(), …), groupBy() groups data items by a specific column value.
• GROUP BY in SQL
34
join()
•We can load that into a second DataFrame and join it with our first one
35
User Defined Functions
•If we want to force all names to lower case before joining
36
Writing/save DataFrames
•In most cases, if you can read a data format, you can write that data format
•If you’re writing to a text file format (e.g., JSON), you’ll typically get multiple output files
37