程序代写代做代考 algorithm jvm cache PowerPoint Presentation

PowerPoint Presentation

Spark Internals

RDDs and partitions
Programmer specifies number of partitions for an RDD (Default value used if unspecified)
more partitions: more parallelism but also more overhead

Data Partitioning
RDDs are stored in partitions. When performing computations on RDDs, these partitions can be operated on in parallel.
You get better parallelism when the partitions are balanced.
When RDDs are first created, the partitions are balanced.
However, partitions may get out of balance after certain transformations.

Example: Finding Prime Numbers
Algorithm:
take every number from 2 to n
find all multiples of these numbers that are smaller than or equal to n (containing duplicates, but that’s ok)
subtract from all numbers these composite numbers
We see that all tasks but one finished quickly, while the last one takes a long time.

How data is partitioned?
allnumbers: balanced partitions.
flatMap blows up each element into different numbers of elements, turning it into an RDD with partitions having very different sizes. This is why one partition had most of the data and took the greatest amount of time.
prime is balanced (explained later).
How to fix?
Repartition our data!
The second version had to do some extra work to repartition the data, but that modification reduced the overall time because it achieved better resource utilization.

Partitions
Properties of partitions:
Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine.
Each machine in the cluster contains one or more partitions.
The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes (except when load an RDD from an HDFS/WASB file)
Two kinds of partitioning available in Spark:
Hash partitioning
Range partitioning

Hash partitioning
Back to the prime number of example
We can view the contents of each partition:
e.g., prime.glom().collect()[1][0:4]
We see that it hashed all numbers x such that x mod 8 = 1 to partition #1
In general, hash partitioning allocates tuple (k, v) to partition p where
p = k.hashCode() % numPartitions
Usually works well but be aware of bad inputs!

Partitioning Data Using Transformations
Partitioner from parent RDD:
Pair RDDs that are the result of a transformation on a partitioned Pair RDD typically is configured to use the hash partitioner that was used to construct it.
Automatically-set partitioners:
Some operations on RDDs automatically result in an RDD with a known partitioner – for when it makes sense.
For example, by default, when using sortByKey, a RangePartitioner is used.
Further, the default partitioner when using groupByKey, is a HashPartitioner.

Partitioning Data: Custom partition function
Invoking partitionBy creates an RDD with a custom partition function.
Specify the partition function in transformations like reduceByKey, groupByKey
See example
This can be useful when the default partition function (hashing) doesn’t work well

Range partitioning
For data types that have or ordering defined
Examples: Int, Char, String, …
Internally, Spark samples the data so as to produce more balanced partitions.
Used by default after sorting
Example:
An RDD with keys [8, 96, 240, 400, 401, 800],
Number of partitions: 4
In this case, hash partitioning distributes the keys as follows among the partitions:
partition 0: [8, 96, 240, 400, 800]
partition 1: [401]
partition 2: []
partition 3: []
Range partitioning would improve the partitioning significantly

Partitioner inheritance
Operations on Pair RDDs that hold to (and propagate) a partitioner:
mapValues (if parent has a partitioner)
flatMapValues (if parent has a partitioner)
filter (if parent has a partitioner)

Partitioning Data Using Transformations
All other operations will produce a result without a partitioner. Why?
Consider the map transformation. Given that we have a hash partitioned Pair RDD, why would it make sense for map to lose the partitioner in its result RDD?
Because it’s possible for map to change the key . e.g.,: rdd.map(lambda k, v: ”doh!”, v)
Hence mapValues. It enables us to still do map transformations without changing the keys, thereby preserving the partitioner.

Examples

Operations on RDDs
HDFS files: Input RDD, one partition for each block of the file
Map: Transforms each record of the RDD
Filter: Select a subset of records
Union: Returns the union of two RDDs
Join: Narrow or wide dependency

14

Job Scheduling

The scheduler examines the RDD’s lineage graph to build a DAG of stages.
The boundaries are the shuffle stages.
Pipelined parallel execution within one stage.

15

Shuffle operations
Spark uses shuffles to implement wide dependencies
Examples: reduceByKey, repartition, coalesce, join (on RDDs not partitioned using the same partitioner)
Spark generates sets of tasks – map tasks to organize the data, and a set of reduce tasks to group/aggregate it.
Internally, Spark builds a hash table within each task to perform the grouping.
If the hash table is too large, Spark will spill these tables to disk, incurring the additional overhead of disk I/O
RDDs resulting from shuffles are automatically cached.

PageRank revisited: Which operations cause shuffles?
lines
links
ranks
links
links
ranks
ranks
narrow join
contribs
contribs
narrow join
You can monitor the execution of jobs at http://:4040

Example: Join order optimization
Three tables:
Waybills(waybill, customer)
Customers(customer, phone)
Waybill_status(waybill, version)
How to join them together?

Job Scheduling
Job: a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action.
Multiple parallel jobs can run simultaneously if they are submitted from separate threads.
Pagerank: only one job
k-means: multiple jobs, but sequentially submitted
Example with multiple threads submitting multiple jobs in parallel
Spark’s scheduler runs jobs in FIFO fashion (default)
Also possible to configure fair sharing between jobs, i.e., round robin.
Also supports Scheduler Pools:
FAIR among the pools, controlled by user-defined weights.
FAIR or FIFO inside a pool

Task Scheduling within a Job
Each job consists of multiple stages
A stage can only start after all its parent stages have completed
Each stage has many tasks
Spark assigns tasks to machines based on data locality
If a task needs to process a partition that is available in memory on a node, we send the code of the task to that node.
Different levels of locality are used
PROCESS_LOCAL data is in the same JVM as the running code
NODE_LOCAL data is on the same node
RACK_LOCAL data is on the same rack of servers.
ANY
Spark will wait a bit for a free executor before switching to the next locality level.

Memory Management
Two types of memory usages for applications:
Execution memory: for computation in shuffles, joins, sorts and aggregations
Storage memory: for caching and propagating internal data across the cluster
Execution and storage share a unified region (M)
Default is 0.6 * (total memory available to JVM – 300MB)
Execution may evict storage if necessary
But storage memory is guaranteed to be R, default R = 0.5*M
Storage may not evict execution 
Remarks
Applications that do not use caching can use the entire memory space for execution
Applications that do use caching can reserve a minimum storage space (R)

/docProps/thumbnail.jpeg