DS/CMPSC 410 Topic 5:
Run Spark in Cluster Mode, Persist, and GroupBy
Copyrights @ 2021 by , All Rights Reserved
Copyright By PowCoder代写 加微信 powcoder
Two Ways to Run Spark-submit
• Spark-submit (local mode): Invokes Spark ONLY on a “local machine”
• The execution of Spark is on ONE machine, NOT distributed to the cluster
• Useful for testing spark programs before deploying it on the cluster.
• Spark-Submit: Runs Spark ON a Cluster
• Used for deploying Spark data analytics for massive datasets
• For pyspark code, they can be used directly.
• For scala or Java code, requires packaging spark code first (e.g.,
using sbt or Maven)
• Can be configured (e.g., specifying the number of workers, the size
of memory for each worker, etc). Copyrights @ 2021 by , All Rights Reserved
Learning Objectives of the Course
Run Spark in cluster mode
Scalability Evaluation of Spark design choices in cluster mode
Debug Spark in local mode
Use persist and unpersist
Modify Spark code (local mode) for cluster mode
Syntax of RDD DataFrame
Lazy Evaluation
Copyrights @ 2021 by , All Rights Reserved 3
Read Tweets
Parse Tweets to Tokens
Filter for Hashtags
Aggregate count by Hashtags
SaveAsText
Filter for Twitter Users mentioned
Aggregate count by TwitterIDs
SaveAsText
Copyrights @ 2021 by , All Rights Reserved 4
When SaveAsText for hashtag count is executed, …
Tweets RDD
Tweets RDD
Hashtag RDD
Hashtag Count RDD
Read Tweets
Parse Tweets to Tokens
Filter for Hashtags
Aggregate count by Hashtags
SaveAsText for hashtag count
Filter for Twitter Users mentioned
Aggregate count by TwitterIDs
SaveAsText for TwitterID count
Copyrights @ 2021 by , All Rights Reserved 5
When SaveAsText for TwitterID count is executed, …
Read Tweets
Parse Tweets to Tokens
Filter for Hashtags
Aggregate count by Hashtags
SaveAsText for hashtag count
Tweets RDD
Filter for Twitter Users mentioned
Aggregate count by TwitterIDs
SaveAsText for TwitterID count
re inefficient
TwitterID RDD
Copyrights @ 2021 by , All Rights Reserved 6
TwitterID Count RDD
When SaveAsText for hashtag count is executed, but Tweets RDD.persist() …
Tweets RDD
Tweets RDD
Hashtag RDD
Hashtag Count RDD
Read Tweets
Parse Tweets to Tokens
Filter for Hashtags
Aggregate count by Hashtags
SaveAsText for hashtag count
Filter for Twitter Users mentioned
Aggregate count by TwitterIDs
SaveAsText for TwitterID count
Copyrights @ 2021 by , All Rights Reserved 7
When SaveAsText for TwitterID count is executed, …
Read Tweets
Parse Tweets to Tokens
Filter for Hashtags
Aggregate count by Hashtags
SaveAsText for hashtag count
Tweets RDD
Filter for Twitter Users mentioned
TwitterID RDD
Aggregate count by TwitterIDs
SaveAsText for TwitterID count
Not needed
Copyrights @ 2021 by , All Rights Reserved 8
TwitterID Count RDD
Storage Level of Persist 1. Persist on RDD
• Default: MEMORY_ONLY 2. Persist on DataFrame
• Default: MEMORY_AND_DISK_DESER • Storage Levels:
• MEMORY_ONLY: Saves in the memory only, if possible. If it does not fit into the memory, will NOT be saved.
• MEMORY_AND_DISK_DESER: Saves in the memory and, if needed, in disk.
Copyrights @ 2021 by , All Rights Reserved 9
• Group the rows of a DataFrame by a column.
• The result is GroupedData, which can be aggregated in several ways:
• count() : Count the number of rows for each group.
• avg(col1, col2, ..): Calculate the average for each numeric columns
specified for each group
• sum(col1, col2, …): Calculate the sum for each numeric columns specified for each group
Copyrights @ 2021 by , All Rights Reserved 10
A Motivating Problem
• Problem: Want to identify Twitter Users who send tweets containing top hashtag during the Boston Marathon Bombing (2013). Use space as the delimiter.
Copyrights @ 2021 by , All Rights Reserved 11
An Example
BMB parsed Tweets DF
Filter for a top hashtag
groupBy UserID
Tweets of the top hashtag DF
UserID, count DF
Copyrights @ 2021 by , All Rights Reserved
Running Spark in a Cluster Mode
• spark-submit can use one of several cluster managers:
• Spark standalone cluster (this is the one we will use in Labs)
• Hadoop YARN: for running spark-submit in Hadoop using HDFS.
• Apache Mesos: a general cluster manager
• Kubernetes: an open-source system for containerized
applications.
Copyrights @ 2021 by , All Rights Reserved 13
Spark with Hadoop
Copyrights @ 2021 by , All Rights Reserved 14
• Driver Program
• The process running the main() function of the application and creating the SparkContext
• Cluster Manager
• External service for acquiring resources on the cluster
• Apache Mesos
• Hadoop YARN
• Worker Nodes
• Any node that can run application code in the cluster
• Executor: A process launched for an application on a worker node.
• Runs tasks
• Keeps data in memory or disk storage across them
• Each application has its own executor
Copyrights @ 2021 by , All Rights Reserved 15
Modify Spark code (local mode) for Cluster mode (spark-submit)
• Remove .master(“local”) in SparkSession or SparkContext
• Modify input path (often needed because the data for cluster
mode is typically larger).
• Modify output path (needed so that the output directories of cluster mode do not conflict with those generated by local mode)
• Remove all actions and Python codes used for debugging (or data exploration) purpose in the local mode, e.g.,
• show(), take(), printSchema(), print a variable
• Add persist() for reused RDD/DataFrame, and unpersist() when
they are not needed.
Copyrights @ 2021 by , All Rights Reserved 16
Steps to run spark-submit in ICDS cluster
• Open a terminal window (Command Prompt in PC) • Type the following ssh command to the prompt
Copyrights @ 2021 by , All Rights Reserved 17
Use Penn State password and 2 Factor Authentication
If you see the message above, Enter: yes
Type ssh command again
Copyrights @ 2021 by , All Rights Reserved 18
After 2 Factor Authentication, you should log in successfully.
Copyrights @ 2021 by , All Rights Reserved 19
You are in your home directory of ICDS Roar.
• The same directory as you see in JupyterLab through ICDS portal:
• /storage/home/
Copyrights @ 2021 by , All Rights Reserved 20
Request Resources for a Batch Job on ICDS Roar
Uppercase I of ICE Lowercase L/l of “LEAF/leaf”
qsub –I –A open –l nodes=5:ppn=4 –l pmem=4gb –l walltime=1:00:00
Notice the column
Suggested parameters: 5 nodes, 4 cores per node (ppn=4), 4gb of memory per node (pmem=4bf), 1 hour of walltime
It takes some time for approving the request, because it goes
into a queue.
Copyrights @ 2021 by , All Rights Reserved 21
Resource is Ready
• When the resource requested is made available to you, you will see a response similar to the one above.
Copyrights @ 2021 by , All Rights Reserved 22
Use change directory (cd) command to switch to the directory that contains your .py file for PySpark code
• pwd: print the path of current/working directory
• ls : (list) shows files and subdirectories in your current
directory.
• cd Lab5 : change the current directory to be Lab5 subdirectory
Copyrights @ 2021 by , All Rights Reserved 23
Specify module to be used 1. Type the following to the terminal prompt
module use /gpfs/group/RISE/sw7/modules
2. Type the following to the terminal prompt to load spark
module load spark
Copyrights @ 2021 by , All Rights Reserved 24
pbs-spark-submit
• We will use pbs-spark-submit
• We want to get run-time information, so we add time in front of the pbs-spark-submit.
• We want to save the execution log (including run time information at the end) in a log file so that we can investigate its content (for debug, performance comparison).
•
• > redirects the output (from terminal) to the log file.
• & indicates to run the command in the background.
• (time pbs-spark-submit Lab5B.py ) &> Lab5B_log.txt
Copyrights @ 2021 by , All Rights Reserved 25
Copyrights @ 2021 by , All Rights Reserved
Computing Environment of Spark (cluster mode)
Cluster Worker #1
Cluster Manager (e.g., YARN)
Cluster Worker #2
Cluster Worker #3
Copyrights @ 2021 by , All Rights Reserved
What is an Executor?
• It is a generalized version of mapper and reducer in MapReduce
• It is a basic computing unit in Spark framework that executes “tasks” from a Spark application on a cluster.
Copyrights @ 2021 by , All Rights Reserved 28
Mappers and Reducers in MapReduce
mapper mapper
mapper mapper
reducer reducer
reducer reducer
Copyrights @ 2021 by , All Rights Reserved
Pool: What do mappers and reducers correspond to in your PC?
• A: A program (e.g., Python or Java) running on your PC. • B: Processes generated by a program, running on your
• C: CPUs (e.g., 2 cores) on your PC. • D: Memory on your PC.
Copyrights @ 2021 by , All Rights Reserved 30
You can view processes running on Windows using Task Manager.
In Mac, you can view processes running using Activity Monitor.
Copyrights @ 2021 by , All Rights Reserved 31
Program vs Processes
• A program can generate multiple processes, e.g.,
• An input-event listening process to respond to mouse clicking events and other input events.
• A streaming input process to receive streaming inputs (e.g., real- time tweets through a Twitter API)
• A program can have multiple threads to support multiple data processing or computation that are independent with each other.
• A process needs to run on a CPU (or GPU) with some amount of memory.
Copyrights @ 2021 by , All Rights Reserved 32
Mapper and Reducers are Processes in a Hadoop Cluster
• Mapper: Processes generated by the “map” step of a MapReduce application.
• Reducer: Processes generated by the “reduce” step of a MapReduce application.
• Each mapper/reducer needs to run on a CPU (core) with a sufficient amount of memory.
Copyrights @ 2021 by , All Rights Reserved 33
Executors in Spark are Generalized Version of Mappers and Reducers
• Each executor in Spark can be assigned ANY data transformation task (including, but not limited to, map, reduceByKey, etc) or action task.
• Each executor in Spark needs to run on CPU (core) in a cluster with a sufficient amount of memory.
Copyrights @ 2021 by , All Rights Reserved 34
Important Decisions for Spark-submit • How many cores for each executors?
• If the number is too small (e.g., 1), the executor may not be powerful enough for some stages.
• How many executors?
• The total number of cores for all executors should not exceed the total
number of cores available to you in the cluster. • How much memory for each executor?
• The size of the memory for each executor should be large enough to allow the Big Data to be processed efficiently.
• However, the total memory size for each executor should not exceed the physical memory for each node available to you in the cluster.
Copyrights @ 2021 by , All Rights Reserved 36
An Example of parameters of Spark- submit for submitting to a cluster
Spark-submit
–master
Copyrights @ 2021 by , All Rights Reserved 37
master parameter for Spark-submit • 3rd-party cluster manager: Mesos, YARN (for Hadoop)
• Standalone Mode: This is NOT a local mode. It is a simpler way to launch spark into a cluster without a 3rd party cluster manager.
• http://spark.apache.org/docs/latest/spark-standalone.html Copyrights @ 2021 by , All Rights Reserved 38
Deploy-mode parameter for Spark-submit
• Client: Launch the driver program locally (typically referred to as a Hadoop gateway)
• Cluster: Launch the driver program on one of the worker machines in the cluster.
Copyrights @ 2021 by , All Rights Reserved 39
in a Cluster • When an action is invoked on an RDD
• The Directed Acyclic Graph (DAG) Scheduler (DS) examines the RDD’s lineage graph and constructs a DAG of stages.
• Each stage has only narrow dependencies, operations that require wide dependencies become stage boundaries.
• Fault tolerant: DS launch tasks to compute missing partitions at different stages of DAG to reconstruct the whole RDD.
• The DS submits the stages of task objects to the task scheduler (TS)
• Tasks are shipped to a worker node (i.e., starts an executor) if
• it is the preferred locations specified in the RDD (e.g., blocks of HDFS input file)
• its memory contains information required by the RDD
Copyrights @ 2021 by , All Rights Reserved 40
Worker Node of Spark
• An worker node is responsible for receiving task objects and invoking the run method on them.
• The worker reports exceptions/failures to the TaskSetManager (TSM).
• Block Manager (BM): the component of a worker for serving cached RDDs and for receiving shuffle data; write-once key value store in each worker.
• Communication Manager (CM): An asynchronous networking library for BM to fetch remote blocks.
• Map Output Tracker (MOT): Keep track of where each map task ran and communicates this to the reducers.
Copyrights @ 2021 by , All Rights Reserved 41
Stages of Task
Tasks within a stage with “narrow dependencies” can be distributed on different workers.
Copyrights @ 2021 by , All Rights Reserved 42
Copyrights @ 2021 by , All Rights Reserved 43
Copyrights @ 2021 by , All Rights Reserved 44
• An Executors is a Spark process that executes a stage of a Spark code in a Worker node.
• Spark-submit specifies the total number, the core, and the memory of executors.
• Choices of these parameters can affect the performance of Big Data Analytics in a cluster.
• Being able to choose these parameters to optimize the performance of a Big Data Analytics project is a core competency of Data Scientists/Engineers.
Copyrights @ 2021 by , All Rights Reserved 45
Linux Terminal Window: Needed for login to bridges2 and file transfer
• On PC: Download and install PuTTY
• On Mac: Click on Other in Launchpad, click on Terminal to
launch a Terminal Window
Copyrights @ 2021 by , All Rights Reserved 46
程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com