COMP9313: Big Data Management
Course web site: http://www.cse.unsw.edu.au/~cs9313/
Chapter 5.2: V
Part 1: APIs
A Brief Review of RDD
❖ The RDD is the most basic abstraction in Spark. There are three vital characteristics associated with an RDD:
➢ Dependencies (lineage)
When necessary to reproduce results, Spark can recreate an RDD from the dependencies and replicate operations on it. This characteristic gives RDDs resiliency.
➢ Partitions (with some locality information)
Partitions provide Spark the ability to split the work to
parallelize computation on partitions across executors
Reading from HDFS—Spark will use locality information to send work to executors close to the data
➢ Compute function: Partition => Iterator[T]
An RDD has a compute function that produces an Iterator[T]
for the data that will be stored in the RDD.
❖ Assume that we want to aggregate all the ages for each name, group by name, and then compute the average age for each name
dataRDD.map(x=> (x._1, (x._2, 1)))
.reduceByKey((a, b)=> (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, x._2._1.toDouble/x._2._2)))
Values for Each Key
val dataRDD = sc.parallelize(List((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25)))
Problems of RDD Computation Model
❖ The compute function (or computation) is opaque to Spark
➢ Whether you are performing a join, filter, select, or aggregation,
Spark only sees it as a lambda expression
dataRDD.map(x=> (x._1, (x._2, 1)))
❖ Spark has no way to optimize the expression, because it’s unable to
inspect the computation or expression in the function.
❖ Spark has no knowledge of the specific data type in RDD
➢ To Spark it’s an opaque object; it has no idea if you are accessing a column of a certain type within an object
Spark’s Structured APIs
❖ Spark 2.x introduced a few key schemes for structuring Spark,
❖ This specificity is further narrowed through the use of a set of common operators in a DSL (domain specific language), including the Dataset APIs and DataFrame APIs
➢ These operators let you tell Spark what you wish to compute with your data
➢ It can construct an efficient query plan for execution.
❖ Structure yields a number of benefits, including better performance
and space efficiency across Spark components
Spark’s Structured APIs
❖ E.g, for the average age problem, using the DataFrame APIs:
❖ Spark now knows exactly what we wish to do: group people by their names, aggregate their ages, and then compute the average age of all people with the same name.
❖ Spark can inspect or parse this query and understand our intention, and thus it can optimize or arrange the operations for efficient execution.
import spark.implicits._
val data_df = List((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25)).toDF(“name”, “age”)
data_df.groupBy(“name”).agg(avg(“age”)).show()
Datasets and
❖ A Dataset is a distributed collection of data
➢ provides the benefits of RDDs (e.g., strong typing) with the
benefits of QL’s optimized execution engine
➢ A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, etc.)
❖ A DataFrame is a Dataset organized into named columns
➢ It is conceptually equivalent to a table in a relational database or a
data frame in R/Python, but with richer optimizations
➢ An abstraction for selecting, filtering, aggregating and plotting structured data
➢ A DataFrame can be represented by a Dataset of Rows
Scala: DataFrame is simply a type alias of Dataset[Row] Java: use Dataset
DataFrames
❖ Frames are like distributed in-memory tables with named columns and schemas, where each column has a specific data type.
❖ When data is visualized as a structured table, it’s not only easy to digest but also easy to work with
The table-like format of a DataFrame
Difference between
❖ DataFrame more like a traditional database of two-dimensional form, in addition to data, but also to grasp the structural information of the data, that is, schema
➢ RDD[Person] although with Person for type parameters, but the Spark framework itself does not understand internal structure of Person class
➢ DataFrame has provided a detailed structural information, making QL can clearly know what columns are included in the dataset, and what is the name and type of each column. Thus,
QL query optimizer can target optimization
Data Sources
❖ QL’s Data Source API can read and write DataFrames using a variety of formats.
➢ E.g., structured data files, tables in Hive, external databases, or existing RDDs
➢ In the PI, DataFrame is simply a type alias of Dataset[Row]
❖ We first need to import “spark.implicits._”. The implicits object gives implicit conversions for converting Scala objects (incl. RDDs) into a Dataset or DataFrame
❖ You can also convert an RDD into a DataFrame
DataFrames
import spark.implicits._
// Given a list of pairs including names and ages
val data = List((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25))
// Create DataFrame’ from ‘RDD’ and the schema
val dataDF = spark.createDataFrame(data)
import spark.implicits._
// Given a list of pairs including name and age
val data = sc.parallelize(Seq((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25)))
// Create DataFrame’ from ‘RDD’ and the schema
val dataDF = spark.createDataFrame(data)
❖ Using the above method, we can get the DataFrame as below:
❖ We can see that the schema is not defined, and the columns have no meaningful names. To define the names for columns, we can use the the toDF() method
val dataDF = spark.createDataFrame(data).toDF(“name”, “age”)
❖ We can also write (data could be a list or an RDD): val dataDF = data.toDF(“name”, “age”)
DataFrames
Schemas in Spark
❖ A schema in Spark defines the column names and associated data types for a DataFrame
❖ Defining a schema up front offers three benefits
➢ You relieve Spark from the onus of inferring data types.
➢ You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
➢ You can detect errors early if data doesn’t match the schema.
❖ Define a DataFrame programmatically with three named columns, author, title, and pages
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField(“author”, StringType, false), StructField(“title”, StringType, false),
StructField(“pages”, IntegerType, false)))
Spark’s Basic Data Types
❖ Spark supports basic internal data types, which can be declared in your Spark application or defined in your schema
Value assigned in PI to instantiate
DataTypes.ByteType
DataTypes.ShortType
IntegerType
DataTypes.IntegerType
DataTypes.LongType
DataTypes.FloatType
DoubleType
DataTypes.DoubleType
StringType
DataTypes.StringType
BooleanType
DataTypes.BooleanType
DecimalType
java.math.BigDecimal
DecimalType
Spark’s Structured and Complex Data Types
❖ For complex data analytics, you’ll need Spark to handle complex data types, such as maps, arrays, structs, dates, timestamps, fields, etc.
Value assigned in PI to instantiate
BinaryType
Array[Byte]
DataTypes.BinaryType
Timestamp Type
java.sql.Timestamp
DataTypes.TimestampType
java.sql.Date
DataTypes.DateType
scala.collection.Seq
DataTypes.createArrayType(E lementType)
scala.collection.Map
DataTypes.createMapType(ke yType, valueType)
StructType
org.apache.spark.sql.Row
StructType(ArrayType[fieldTyp es])
StructField
A value type corresponding to the type of this field
StructField(name, dataType, [nullable])
❖ We can use spark.createDataFrame(data, schema) to create DataFrame, after the schema is defined for the data.
➢ The first argument data must be of type RDD[Row]
➢ The second argument schema must of type StructType
DataFrames
with Schema
import org.apache.spark.sql.types._
import org.apache.spark.sql._
// Create the schema
val schema = StructType(Array(StructField(“name”, StringType, false), StructField(“age”, IntegerType, false)))
// Given a list of pairs including names and ages
val data = List((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25))
// Create ‘Row’ from ‘Seq’
val row = Row.fromSeq(data)
// Create ‘RDD’ from ‘Row’
val rdd = spark.sparkContext.makeRDD(List(row))
// Create DataFrame’ from ‘RDD’ and the schema
val dataDF = spark.createDataFrame(rdd, schema)
❖ In order to convert the List to RDD[Row], you can also do as below
DataFrames
with Schema
import org.apache.spark.sql.types._ import org.apache.spark.sql._
// Create the schema
val schema = StructType(Array(StructField(“name”, StringType, false), StructField(“age”, IntegerType, false)))
// Given a list of pairs including names and ages
val data = List((“Brooke”, 20), (“Denny”, 31), (“Jules”, 30),(“TD”, 35), (“Brooke”, 25))
// Create ‘RDD’ from ‘List’
val rdd = spark.sparkContext.parallelize(data)
// Transform the pair (String, Integer) to a Row object
val rddRow = rdd.map(x => Row(x._1, x._2))
// Create DataFrame’ from ‘RDD’ and the schema
val dataDF = spark.createDataFrame(rddRow, schema)
❖ You can also create a DataFrame from a json file:
val blogsDF = spark.read.schema(schema).json(jsonFile)
❖ Each column describe a type of field
❖ We can list all the columns by their names, and we can perform operations on their values using relational or computational expressions
➢ List all the columns
➢ Access a particular column with col and it returns a Column type
➢ We can also use logical or mathematical expressions on columns
❖ A row in Spark is a generic Row object, containing one or more columns
❖ Row is an object in Spark and an ordered collection of fields, we can access its fields by an index starting at 0
❖ Row objects can be used to create DataFrames
Transformations, Actions, and Lazy Evaluation
❖ Frame operations can also be classified into two types: transformations and actions.
➢ All transformations are evaluated lazily – their results are not computed immediately, but they are recorded or remembered as a lineage
➢ An action triggers the lazy evaluation of all the recorded transformations
Narrow and Wide Transformations
❖ Transformations can be classified as having either narrow dependencies or wide dependencies
➢ Any transformation where a single output partition can be computed from a single input partition is a narrow transformation, like filter()
➢ Any transformation where data from other partitions is read in, combined, and written to disk is a wide transformation, like groupBy()
val fileRDD = spark.sparkContext.textFile(“file:///home/comp9313/inputText”) val wordsDF = fileRDD.flatMap(_.split(” “)).toDF
val countDF = wordsDF.groupBy(“Value”).count()
countDF.collect.foreach(println)
countDF.write.format(“csv”).save(“file:///home/comp9313/output”)
❖ Spark 2.0 unified the DataFrame and Dataset APIs as Structured APIs with similar interfaces
❖ Datasets take on two characteristics: typed and untyped APIs
❖ Conceptually, you can think of a DataFrame in Scala as an alias for Dataset[Row]
val fileDS = spark.read.textFile(“file:///home/comp9313/inputText”) val wordsDS = fileDS.flatMap(_.split(” “))
val countDF = wordsDS.groupBy(“Value”).count()
countDF.collect.foreach(println)
countDF.write.format(“csv”).save(“file:///home/comp9313/output”)
DataFrames
Versus Datasets
❖ If you want to tell Spark what to do, not how to do it, use DataFrames or Datasets.
❖ If you want rich semantics, high-level abstractions, and DSL operators, use DataFrames or Datasets.
❖ If your processing demands high-level expressions, filters, maps, aggregations, computing averages or sums, SQL queries, columnar access, or use of relational operators on semi-structured data, use DataFrames or Datasets.
❖ If your processing dictates relational transformations similar to SQL- like queries, use DataFrames.
❖ If you want unification, code optimization, and simplification of APIs across Spark components, use DataFrames.
❖ If you want space and speed efficiency, use DataFrames.
❖ More examples of DataFrame usage could be found at:
https://github.com/databricks/LearningSparkV2
Part 1: QL
QL Overview
❖ Part of the core distribution since Spark 1.0, Transform RDDs using SQL in early versions (April 2014)
❖ Tightly integrated way to work with structured data (tables with rows/columns)
❖ Data source integration: Hive, Parquet, JSON, and more ❖ QL is not about SQL.
➢ Aims to Create and Run Spark Programs Faster:
Starting Point:
SparkSession
❖ The entry point into all functionality in Spark is the SparkSession class.
➢ SparkSession since Spark 2.0 provides built-in support for Hive features including the ability to write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName(” QL basic example”) .config(“spark.some.config.option”, “some-value”) .getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
❖ With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
➢ creates a DataFrame based on the content of a JSON file:
DataFrames
val df = spark.read.json(“examples/src/main/resources/people.json”)
// Displays the content of the DataFrame to stdout
// +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
❖ DataFrames are just Dataset of Rows in Scala and Java API.
➢ These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets
Operations
df.printSchema()
// |– age: long (nullable = true) // |– name: string (nullable = true)
// Select only the “name” column df.select(“name”).show()
// +——-+
// | name|
// +——-+
// |Michael|
// | Andy|
// | Justin|
// +——-+
// Select everybody, but increment the age by 1 df.select($”name”, $”age” + 1).show()
// +——-+———+
// | name|(age + 1)|
// +——-+———+ // |Michael| null| //| Andy| 31| // | Justin| 20| // +——-+———+
// Select people older df.filter($”age” > 21).show() // +—+—-+
// |age|name|
// +—+—-+
// | 30|Andy|
// +—+—-+
// Count people by age df.groupBy(“age”).count().show() // +—-+—–+
// | age|count|
// +—-+—–+
// |null| 1|
// +—-+—–+
Operations
Running SQL Queries Programmatically
❖ The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(“people”)
val sqlDF = spark.sql(“SELECT * FROM people”) sqlDF.show()
// +—-+——-+
// | age| name|
// +—-+——-+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +—-+——-+
Global Temporary View
❖ Temporary views in QL are session-scoped and will disappear if the session that creates it terminates
❖ Global temporary view: a temporary view that is shared among all sessions and keep alive until the Spark application terminates
❖ Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1
Global Temporary View Example
// Register the DataFrame as a global temporary view df.createGlobalTempView(“people”)
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql(“SELECT // +—-+——-+ // | age| name| // +—-+——-+ // |null|Michael| //| 30| Andy| // | 19| Justin| // +—-+——-+
* FROM global_temp.people”).show()
// Global temporary view is cross-session spark.newSession().sql(“SELECT * FROM global_temp.people”).show() // +—-+——-+
// | age| name|
// +—-+——-+
// |null|Michael|
//| 30| Andy|
// | 19| Justin|
// +—-+——-+
Find full example code at
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/sp ark/examples/sql/SparkSQLExample.scala 5.37
Error Detection of Structured
❖ If you want errors caught during compilation rather than at runtime, choose the appropriate API
Part 2: treaming
Motivation
❖ Many important applications must process large streams of live data and provide results in near-real-time
➢ Social network trends ➢ Website statistics
➢ Ad impressions
❖ Distributed stream processing framework is required to ➢ Scale to large clusters (100s of machines)
➢ Achieve low latency (few seconds)
What is treaming
❖ treaming is an extension of the core PI that enables scalable, high-throughput, fault-tolerant stream processing of live data streams
❖ Receive data streams from input sources, process them in a cluster, push out to filesystems, databases, and live dashboards
➢ Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets
➢ Data can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window
➢ Processed data can be pushed out to filesystems, databases, and live dashboards
How does treaming Work
❖ Run a streaming computation as a series of very small, deterministic batch jobs
➢ Chop up the live stream into batches of X seconds
➢ Spark treats each batch of data as RDDs and processes them
using RDD operations
➢ Finally, the processed results of the RDD operations are returned in batches
treaming Programming Model
❖ treaming provides a high-level abstraction called discretized stream (Dstream)
➢ Represents a continuous stream of data.
➢ DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high- level operations on other DStreams.
➢ Internally, a DStream is represented as a sequence of RDDs.
❖ DStreams API very similar to RDD API
➢ Functional APIs in Scala, Java
➢ Create input DStreams from different sources ➢ Apply parallel operations
An Example: Streaming
❖ Use StreamingContext, rather then SparkContext
import org.apache.spark._
import org.apache.spark.streaming._
object NetworkWordCount { val conf = new
SparkConf().setMaster(“local[2]”).setAppName(“NetworkWordCount”) val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream(“localhost”, 9999)
val words = lines.flatMap(_.split(” “))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
ssc.start()
ssc.awaitTermination()
❖ Linking with Apache Spark
➢ The first step is to explicitly import the required spark classes into
your Spark program
❖ Create a local StreamingContext with two working thread and batch interval of 10 second.
➢ A StreamingContext object has to be created which is the main entry point of all treaming functionality.
➢ At least two local threads must be used (two cores) ➢ Do the count for each 10 seconds
The batch interval must be set based on the latency requirements of your application and available cluster resour