COMP5349 – Cloud Computing
Week 8: Spark DataFrame and Execution Plan
Dr. Ying Zhou School of Computer Science
Week 7
n We covered basics of Spark execution
The driver program, which is the main script that creates SparkContext, defining the data flow DAG, etc needs to run somewhere, inside spark cluster or on an external host
The actual data flow DAG runs inside spark in a number of executors The executor number and capacity can be configured by developer
or use a preset default value
n What happens when Spark application runs on YARN?
Spark has its own ApplicationMaster, runs in a container
Each executor runs in a container
The driver program, in cluster mode, runs alongside AM in the same container
The driver program, in client mode, runs on a client machine and communicate with the AM in the cluster.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-2
Week 7
n What happens inside executor
All executors stay through out the program life cycle
An executor can run multiple threads, controlled by property executor-cores
One executor runs SparkAM and/or driver program
All others run tasks belonging to some stages
Each task represents a sequence of transformations that can operate on single or limited parent partitions
Executors can run tasks concurrently and subsequently Spark offers PROCESS_LOCAL data locality
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-3
Outline
n Data Sharing in Spark Understanding Closure
Accumulator and Broadcast Variables RDD persistence
n Spark DataFrame
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-4
Recap: Data Sharing in MapReduce
n The mappers and reducers are designed to run independently on different partitions of the input data on different nodes
They are written as separate classes/functions n Common data sharing mechanisms:
Simple typed read-only data are shared as properties attached to configuration object
Small files can be put in distributed cache (not covered)
Global read/write data can be implemented as counter (not covered)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-5
Data Sharing in Spark
n Spark program may be written as a single script But different pieces run in different places
¡ Driver and executors
Data sharing happens implicitly through closure or just by referring to
the variable by name
We need to make things explicit sometimes to ensure ¡ Correctness of the result
¡ Better performance
counter = 0 driver
rdd = sc.parallelize(data)
# Wrong: Don’t do this!!
def increment_counter(x):
global counter
counter += x rdd.foreach(increment_counter) executor
print(“Counter value: “, counter)
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
executor
driver
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-6
Understand Closure
n “Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.”
n Each executor gets a copy of the variables within the closure, they will update the copy independently
n The variables declared in the driver program stays in the memory of the node that runs the driver, they are not updated by executor
n In rare case when everything runs in the same executor, the variables may get updated.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-7
Understand Closure (cont’d)
The variables declared in the driver program
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don’t do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print(“Counter value: “, counter)
It is referenced in a function
When the function is sent to execute in an executor, this variable becomes part of the closure of the task, a copy of it is created and sent to the task. If there are many tasks, each gets its own copy
This is the original variable defined in the driver program
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-8
Implications
n The implicit closure based share can be used to share small, read-only program parameters
n Some read-only data needs to be shared is relatively large E.g. a dictionary for spelling check
n Sharing large read-only data through closure is not efficient
The data is copied for each task and shipped to the node running the task
If we have 20 tasks that needs to use the dictionary ¡ 20 copies will be made
If we only use 10 executors, and they run on 6 nodes ¡ Only 6 copies are required
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-9
Broadcast Variables
n Broadcast variables allow the programmer to keep a read- only variable cached on each machine rather than shipping a copy of it with tasks
n The variables are distributed to each node using efficient broadcast algorithms
n Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
>>> broadcastVar = sc.broadcast([1, 2, 3])
[1, 2, 3]
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-10
Accumulators
n Spark does not support general read-write shared variables
n Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.
Similar as counters in MapReduce
Spark natively supports accumulators of numeric types, and
programmers can add support for new types.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-11
Sharing RDD
n In program code, RDDs can be assigned to variables They are different to variables of programming language supported
types
n RDD is a distributed data structure and is materialized in a lazy manner
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.
By default, each transformed RDD may be recomputed each time you run an action on it.
Spark is able to skip unnecessary re-computation sometimes
lines = sc.textFile(“data.txt”)
lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-12
RDD persistence
n RDD persistence is a way to share RDD across jobs
n When you persist an RDD, each node stores any partitions of it that it computes in memory or storage and reuses them in other actions on that dataset (or datasets derived from it)
n An RDD can be persisted using the persist() or cache() methods on it.
The first time it is computed in an action, it will be kept in memory on the nodes.
A key tool for fast iterative algorithms
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-13
Outline
n Data Sharing in Spark
n Spark DataFrame
Basic concept
A simple example with common operations
¡ Query vs. Job
¡ Filter-GroupBy query execution plan ¡ One query two job example
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-14
RDD API
n RDD based API is in the first release of Spark
n They are quite primitive compared with APIs of recent data analytic
packages
Lack of sophisticated way to handle structured or semi structured data format
Lack of popular data manipulation operations, e.g. slicing n Performance tuning requires largely manual efforts
Program should be carefully designed
¡ Spark will execute the DAG as they are expressed as code
¡ Combiners will be automatically used, which is an improvement compared with MapReduce
Execution parameters should be carefully chosen
¡ Number of executors to use, number of partitions at each stage are either
specified or default value is used
n More flexible, gives better control, allows better understanding of the Spark internals
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-15
Spark SQL
n Spark SQL is Spark’s interface for working with structured and semistructured data.
Data with schema
Known set of fields for each record, and their types
n Spark SQL makes loading and querying such data much easier and more efficient
Provides DataFrame abstraction
¡ Can read data in a variety of formats
Query can be expressed using SQL
An optimization engine to pick the best physical query plan
n A Spark DataFrame contains an RDD of Row objects, each representing a record. A DataFrame also knows the schema of its rows.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-16
DataFrame
n The DataFrame concept comes from statistics software and is gradually accepted in other data analytic software
It is a way to organize tabular data: rows, named columns
n Spark’s DataFrame concept is quite similar to those used in R and in Pandas
n A DataFrame can be created from an existing RDD, or from various data sources
n DataFrame is the common data structure in all spark supporting languages
n Java and Scala have another version called DataSet DataSet provides strong typing supports
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-17
DataFrame APIs
n The entry point of Spark SQL is SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName(“Python Spark SQL basic example”) \ .config(“spark.some.config.option”, “some-value”) \ .getOrCreate()
n Spark DataFrame supports operations like filtering, slicing, groupBy, etc
DataFram support larger set of operations than those of RDD operations
More restrictive in terms of parameters
Most RDD operations take a user defined function as parameter
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-18
Example DF API vs RDD API
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-19
An Example
n We use the movie rating data set
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-20
Query, Job, Stage, task in DataFrame
n DataFrame is part of SparkSQL API
At programming level there is the query concept
At execution level, the concept of job and stage, tasks are still there and have the same definition
There is no one to one mapping between query and job ¡ All depends on programming logic
• If we only need to return the results to driver after a few queries, we may have one job containing many queries
¡ Optimization mechanism in SparkSQL
• Spark optimization may break down a single query into multiple jobs
New query tab for SparkSQL applications in history server
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-21
An example (job break down)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-22
An Example: show data frame content
The csv file is 500+ MB ~ 5 HDFS blocks Only one task is used, very likely the result of
SparkSQL optimization
Because we only need to show 5 records, one partition is needed and not all data need to be
COMP5349 Cloud Computing (2020) Dr. Y. Zhou read in 08-23
An Example: describe and show
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-24
An Example: filter-groupby
Stage boundary
File paritioned by HDFS
spark.sql.shuffle.partitions=200
Stage 3 takes much longer time to finish than stage 4.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-25
filter-groupBy physical
execution plan
File I/O can be saved by caching the previous result
ratings = spark.read.csv(rating_data,header=False, schema=rating_schema) ratings.describe([‘rate’]).show() ratings.filter(“mid<=5").groupBy('mid').avg('rate').collect()
Local reduce, similar effect as combiner
The first stage has 5 tasks, the first 4 running on 128M data, the last one running on smaller sized data, each task does the filtering, and local groupby- average task
Each stage 1 task produce 5 records (mid, sum, count), in total 25 records are produced
Stage 2 only has to process 25 records of (mid, sum, count) to produce the final (mid, avg), yet it has 200 tasks!
Final reduce stage
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-26
What do the 200 tasks end up?
n Most of them are doing nothing, in fact at most 5 tasks are needed because there are only 5 keys after applying the filter (mid <= 5 )
n The default 200 partition is set with much larger cluster and data set in mind.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-27
Effect of partition number
Running on single node cluster
Running on a cluster with two worker nodes
spark.default.parallelism=32
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-28
Effect of partition number
5 * 5 records
32 * 5 records
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-29
Parallelism Properties
n Input partitions
n Spark.default.parallelism
Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
Usually set to the number of cores on all executor nodes An RDD API feature, but PySpark does use it on SparkSQL
n Spark.sql.shuffle.partitions
Configures the number of partitions to use when shuffling data for
joins or aggregations.
Only applicable in Spark SQL
n Explicit set in various transformations
repartition(numPartitions),
groupByKey([numPartitions]), join(otherDataset, [numPartitions]),
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-30
Window Function
n Window functions operate on a set of rows and return a single value for each row from the underlying query
n It is particularly useful if we need to operate based on groupBy or aggregateBy results
E.g. remove all movies with less than or equal to 100 ratings
n It involves defining the window/frame, then apply query on
the window
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-31
Window Function Execution
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-32
Example of one query two jobs
n Load the movie data to find romance movie in 1939 n For each movie find out the average rating
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-33
Execution Summary
Created by execution engine
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-34
Execution Plan of the join query
“When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred”
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-35
DataFrame and RDD
n DataFrame and RDD can be easily converted to each other n To convert a DataFrame as RDD
Call rdd or JavaRDD on the DataFrame
E.g. teenagers.rdd.map(lambda p: "Name: " + p.name)
n To convert a RDD to DataFrame Spark.createDataFrame(RDD, optionalSchema)
E.g. schemaPeople = spark.createDataFrame(people)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-36
References
n Spark RDD programming guide
https://spark.apache.org/docs/latest/rdd-programming-guide.html
n Spark SQL, DataFrame and DataSets Guide
https://spark.apache.org/docs/latest/sql-programming-guide.html
n Physical Plans in Spark SQL
https://databricks.com/session_eu19/physical-plans-in-spark-sql
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-37