程序代写代做 data structure html Java cache flex algorithm COMP5349 – Cloud Computing

COMP5349 – Cloud Computing
Week 8: Spark DataFrame and Execution Plan
Dr. Ying Zhou School of Computer Science

Week 7
n We covered basics of Spark execution
„The driver program, which is the main script that creates SparkContext, defining the data flow DAG, etc needs to run somewhere, inside spark cluster or on an external host
„The actual data flow DAG runs inside spark in a number of executors „ The executor number and capacity can be configured by developer
or use a preset default value
n What happens when Spark application runs on YARN?
„Spark has its own ApplicationMaster, runs in a container
„Each executor runs in a container
„The driver program, in cluster mode, runs alongside AM in the same container
„The driver program, in client mode, runs on a client machine and communicate with the AM in the cluster.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-2

Week 7
n What happens inside executor
„All executors stay through out the program life cycle
„An executor can run multiple threads, controlled by property executor-cores
„One executor runs SparkAM and/or driver program
„All others run tasks belonging to some stages
„Each task represents a sequence of transformations that can operate on single or limited parent partitions
„Executors can run tasks concurrently and subsequently „ Spark offers PROCESS_LOCAL data locality
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-3

Outline
n Data Sharing in Spark „Understanding Closure
„Accumulator and Broadcast Variables „RDD persistence
n Spark DataFrame
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-4

Recap: Data Sharing in MapReduce
n The mappers and reducers are designed to run independently on different partitions of the input data on different nodes
„They are written as separate classes/functions n Common data sharing mechanisms:
„Simple typed read-only data are shared as properties attached to configuration object
„Small files can be put in distributed cache (not covered)
„Global read/write data can be implemented as counter (not covered)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-5

Data Sharing in Spark
n Spark program may be written as a single script „But different pieces run in different places
¡ Driver and executors
„Data sharing happens implicitly through closure or just by referring to
the variable by name
„We need to make things explicit sometimes to ensure ¡ Correctness of the result
¡ Better performance
counter = 0 driver
rdd = sc.parallelize(data)
# Wrong: Don’t do this!!
def increment_counter(x):
global counter
counter += x rdd.foreach(increment_counter) executor
print(“Counter value: “, counter)
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
executor
driver
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-6

Understand Closure
n “Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.”
n Each executor gets a copy of the variables within the closure, they will update the copy independently
n The variables declared in the driver program stays in the memory of the node that runs the driver, they are not updated by executor
n In rare case when everything runs in the same executor, the variables may get updated.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-7

Understand Closure (cont’d)
The variables declared in the driver program
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don’t do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print(“Counter value: “, counter)
It is referenced in a function
When the function is sent to execute in an executor, this variable becomes part of the closure of the task, a copy of it is created and sent to the task. If there are many tasks, each gets its own copy
This is the original variable defined in the driver program
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-8

Implications
n The implicit closure based share can be used to share small, read-only program parameters
n Some read-only data needs to be shared is relatively large „ E.g. a dictionary for spelling check
n Sharing large read-only data through closure is not efficient
„ The data is copied for each task and shipped to the node running the task
„ If we have 20 tasks that needs to use the dictionary ¡ 20 copies will be made
„ If we only use 10 executors, and they run on 6 nodes ¡ Only 6 copies are required
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-9

Broadcast Variables
n Broadcast variables allow the programmer to keep a read- only variable cached on each machine rather than shipping a copy of it with tasks
n The variables are distributed to each node using efficient broadcast algorithms
n Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
>>> broadcastVar = sc.broadcast([1, 2, 3]) >>> broadcastVar.value
[1, 2, 3]
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-10

Accumulators
n Spark does not support general read-write shared variables
n Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.
„Similar as counters in MapReduce
„Spark natively supports accumulators of numeric types, and
programmers can add support for new types.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-11

Sharing RDD
n In program code, RDDs can be assigned to variables „They are different to variables of programming language supported
types
n RDD is a distributed data structure and is materialized in a lazy manner
„All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.
„By default, each transformed RDD may be recomputed each time you run an action on it.
„Spark is able to skip unnecessary re-computation sometimes
lines = sc.textFile(“data.txt”)
lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-12

RDD persistence
n RDD persistence is a way to share RDD across jobs
n When you persist an RDD, each node stores any partitions of it that it computes in memory or storage and reuses them in other actions on that dataset (or datasets derived from it)
n An RDD can be persisted using the persist() or cache() methods on it.
„The first time it is computed in an action, it will be kept in memory on the nodes.
„A key tool for fast iterative algorithms
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-13

Outline
n Data Sharing in Spark
n Spark DataFrame
„Basic concept
„A simple example with common operations
¡ Query vs. Job
¡ Filter-GroupBy query execution plan ¡ One query two job example
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-14

RDD API
n RDD based API is in the first release of Spark
n They are quite primitive compared with APIs of recent data analytic
packages
„ Lack of sophisticated way to handle structured or semi structured data format
„ Lack of popular data manipulation operations, e.g. slicing n Performance tuning requires largely manual efforts
„ Program should be carefully designed
¡ Spark will execute the DAG as they are expressed as code
¡ Combiners will be automatically used, which is an improvement compared with MapReduce
„ Execution parameters should be carefully chosen
¡ Number of executors to use, number of partitions at each stage are either
specified or default value is used
n More flexible, gives better control, allows better understanding of the Spark internals
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-15

Spark SQL
n Spark SQL is Spark’s interface for working with structured and semistructured data.
„Data with schema
„Known set of fields for each record, and their types
n Spark SQL makes loading and querying such data much easier and more efficient
„Provides DataFrame abstraction
¡ Can read data in a variety of formats
„Query can be expressed using SQL
„An optimization engine to pick the best physical query plan
n A Spark DataFrame contains an RDD of Row objects, each representing a record. A DataFrame also knows the schema of its rows.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-16

DataFrame
n The DataFrame concept comes from statistics software and is gradually accepted in other data analytic software
„It is a way to organize tabular data: rows, named columns
n Spark’s DataFrame concept is quite similar to those used in R and in Pandas
n A DataFrame can be created from an existing RDD, or from various data sources
n DataFrame is the common data structure in all spark supporting languages
n Java and Scala have another version called DataSet „DataSet provides strong typing supports
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-17

DataFrame APIs
n The entry point of Spark SQL is SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName(“Python Spark SQL basic example”) \ .config(“spark.some.config.option”, “some-value”) \ .getOrCreate()
n Spark DataFrame supports operations like filtering, slicing, groupBy, etc
„DataFram support larger set of operations than those of RDD operations
„More restrictive in terms of parameters
„Most RDD operations take a user defined function as parameter
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-18

Example DF API vs RDD API
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-19

An Example
n We use the movie rating data set
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-20

Query, Job, Stage, task in DataFrame
n DataFrame is part of SparkSQL API
„At programming level there is the query concept
„At execution level, the concept of job and stage, tasks are still there and have the same definition
„There is no one to one mapping between query and job ¡ All depends on programming logic
• If we only need to return the results to driver after a few queries, we may have one job containing many queries
¡ Optimization mechanism in SparkSQL
• Spark optimization may break down a single query into multiple jobs
New query tab for SparkSQL applications in history server
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-21

An example (job break down)
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-22

An Example: show data frame content
The csv file is 500+ MB ~ 5 HDFS blocks Only one task is used, very likely the result of
SparkSQL optimization
Because we only need to show 5 records, one partition is needed and not all data need to be
COMP5349 Cloud Computing (2020) Dr. Y. Zhou read in 08-23

An Example: describe and show
COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-24

An Example: filter-groupby
Stage boundary
File paritioned by HDFS
spark.sql.shuffle.partitions=200
Stage 3 takes much longer time to finish than stage 4.
COMP5349 Cloud Computing (2020) Dr. Y. Zhou
08-25

filter-groupBy physical
execution plan
File I/O can be saved by caching the previous result
ratings = spark.read.csv(rating_data,header=False, schema=rating_schema) ratings.describe([‘rate’]).show() ratings.filter(“mid<=5").groupBy('mid').avg('rate').collect() Local reduce, similar effect as combiner The first stage has 5 tasks, the first 4 running on 128M data, the last one running on smaller sized data, each task does the filtering, and local groupby- average task Each stage 1 task produce 5 records (mid, sum, count), in total 25 records are produced Stage 2 only has to process 25 records of (mid, sum, count) to produce the final (mid, avg), yet it has 200 tasks! Final reduce stage COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-26 What do the 200 tasks end up? n Most of them are doing nothing, in fact at most 5 tasks are needed because there are only 5 keys after applying the filter (mid <= 5 ) n The default 200 partition is set with much larger cluster and data set in mind. COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-27 Effect of partition number Running on single node cluster Running on a cluster with two worker nodes spark.default.parallelism=32 COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-28 Effect of partition number 5 * 5 records 32 * 5 records COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-29 Parallelism Properties n Input partitions n Spark.default.parallelism „Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. „Usually set to the number of cores on all executor nodes „An RDD API feature, but PySpark does use it on SparkSQL n Spark.sql.shuffle.partitions „Configures the number of partitions to use when shuffling data for joins or aggregations. „Only applicable in Spark SQL n Explicit set in various transformations „ repartition(numPartitions), „groupByKey([numPartitions]), join(otherDataset, [numPartitions]), COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-30 Window Function n Window functions operate on a set of rows and return a single value for each row from the underlying query n It is particularly useful if we need to operate based on groupBy or aggregateBy results „E.g. remove all movies with less than or equal to 100 ratings n It involves defining the window/frame, then apply query on the window COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-31 Window Function Execution COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-32 Example of one query two jobs n Load the movie data to find romance movie in 1939 n For each movie find out the average rating COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-33 Execution Summary Created by execution engine COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-34 Execution Plan of the join query “When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred” COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-35 DataFrame and RDD n DataFrame and RDD can be easily converted to each other n To convert a DataFrame as RDD „Call rdd or JavaRDD on the DataFrame „ E.g. teenagers.rdd.map(lambda p: "Name: " + p.name) n To convert a RDD to DataFrame „Spark.createDataFrame(RDD, optionalSchema) „ E.g. schemaPeople = spark.createDataFrame(people) COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-36 References n Spark RDD programming guide „ https://spark.apache.org/docs/latest/rdd-programming-guide.html n Spark SQL, DataFrame and DataSets Guide „ https://spark.apache.org/docs/latest/sql-programming-guide.html n Physical Plans in Spark SQL „ https://databricks.com/session_eu19/physical-plans-in-spark-sql COMP5349 Cloud Computing (2020) Dr. Y. Zhou 08-37