程序代写代做 data structure cache hadoop algorithm chain jvm flex html file system hbase go COMP5349 – Cloud Computing

COMP5349 – Cloud Computing
Week 6: Distributed Execution: HDFS and YARN
Dr. Ying Zhou School of Computer Science

Outline
n Distributed Execution Basics n GFS and HDFS
n MapReduce on YARN
06-2

Last Two Weeks
n In Last two weeks we cover basic APIs of MapReduce and Spark framework
„We run them on single node without much parallelisation
n Spark is a data flow system, data analytic workload is designed as data flowing along a sequence of operations
„The original data structure and data flow operations closely follow the MapReduce design principles
„But MapReduce’s workflow only contains two operations: map and reduce
„Spark has larger set of APIs
¡ A number of map like operations: map, flatMap, mapToPair, filter, etc.
¡ A number of reduce like operations: reduceByKey, groupByKey, aggregateByKey, join, etc…
¡ Spark is memory based, MapReduce is storage based ¡
06-3

High level view of Distributed Execution
n Assumptions:
„ Very large input data
„ A lot of machines that can execute operations in parallel each on a partition of the data
n MapReduce Distributed Execution
„ A lot of machines run mappers on input data in parallel
„ Mapper outputs are shuffled to multiple reducers
„ A lot of machines run reducers on mapper output in parallel
n Spark Distributed Execution
„ A lot of machines run some operations in parallel
„ When intermediate result regrouping is needed, outputs are shuffled to machines running next sequences of operations
„ Repeat until an action is executed to return data to driver program
n Large workload is expected to run on multiple machines for an extended period of time
06-4

Basic Requirements and Supports
n A way to divide large input data into smaller partitions
n A mechanism to allocate machines to run those operations in parallel
and to organize data transfer when needed
n A mechanism to support continuation of the execution when something goes wrong
„ Data reading error
„ Data transmission error „ Machine execution error
n Basic Supports:
„ A distributed file system that will
¡ automatically partition and replicate data „ A resource scheduling system that will
¡ monitor workload of each machine in the system and allocate workload accordingly
¡ monitor execution of application and kick in fault-tolerance or optimization mechanism when needed
06-5

Systems that are in use
n Distributed File System „Google File System / HDFS
¡ Supported by MapReduce and Spark
„Many other storage systems supported by various framework
¡ HBase, Azure Blob Storage, S3, etc… n Resource Scheduling System
„YARN (Yet Another Resource Negotiator)
¡ Can schedule MapReduce and Spark applications
„Other scheduling system
¡ Early MR1 job tracker for MR
¡ Spark standalone, Mesos, etc…
06-6

Outline
n Distributed Execution Basics
n GFS and HDFS
„Overall Architecture „Read/Write Operation „HA Strategy
„Chunk Placement Strategy
n MapReduce on YARN
06-7

Distributed File System Basics
n A distributed file systems allow clients to access files on remote servers “transparently”.
n We are using at least two popular ones in SCS
„Your home directory is physically located on some file servers. „Your home directory is mounted on all Linux servers using NFS „Your home directory is mapped as U drive in Windows using Samba
n Constrains
„ No sufficient mechanism in terms of reliability and availability
¡ Replica, migration, etc
¡ You may have encountered login hiccups like your U drive is not mapped when you login to one of the lab machine…
06-8

GFS Design Constraints
n Component failures are the norm
„1000s of components (servers)
„Bugs, human errors, failures of memory, disk, connectors, networking, and power supplies
„Monitoring, error detection, fault tolerance, automatic recovery
n Files are huge by traditional standards „Multi-GB files are common
„Billions of objects
n Workload features (Co-design of application and the file system)
„Most files are mutated by appending new data rather than overwriting existing data.
¡ E.g. log data, web crawling data, etc…
„Files are often read sequentially rather than randomly.
06-9

Master/Slave Architecture
nA GFS cluster „A single master
¡Maintains all metadata
• Namespace,accesscontrol,file-to-chunkmappings,garbage
collection, chunk migration
¡Periodically communicates with chunkservers in
HeartBeat messages.
„Multiple chunkservers per master
¡Store actual file data
„Running on commodity Linux machines
06-10

Master/Slave Architecture (cont’d)
n A file
„Divided into fixed-sized chunks
¡ Default chunk size is 64 MB
¡ Labeled with 64-bit unique global IDs (chunk handle)
¡ Stored at chunkservers
¡ Chunk is replicated on multiple chunkservers ( by default 3 replicas
• Filesthatneedsfrequentaccesswillhaveahighreplicationfactor
n GFS clients
„Consult master for metadata „Access data from chunk servers
„The equivalent client in HDFS is the hdfs shell script ¡ hdfs dfs –put
06-11

Architecture diagram
File meta data
Actual files HeartBeat message
06-12

HDFS
n Hadoop Distributed File System is the open source implementation of GFS
n It is part of the Hadoop framework
A block, equivalent to trunk in GFS, default size is 128 MB
Equivalent to Master in GFS
A replica reside on another node
Equivalent to ChunkServer in GFS
http://developer.yahoo.com/hadoop/tutorial/module2.html
06-13

GFS v.s. HDFS
GFS
HDFS
Naming
Master
NameNode
Chunk Server
DataNode
Chunk
Block
Functionality
Support random write and record append. In practical workload, the number of random write is very small
Support Write-Once-Read-Many workload
HA strategy requires an external monitoring services, such as Chubby lock service for Master recovery
HA configuration is only available in the latest version
Secondary name node does part of the job GFS master is supposed to do. It is not a shadow master
http://soit-hdp-pro-1.ucc.usyd.edu.au:50070
06-14

Single-Master Design
n Benefits:
„Simple, master can make sophisticated chunk placement
and replication decisions using global knowledge
n Possible disadvantage:
„Single point of failure (availability) „Bottleneck (scalability)
n Solution
„Replicate master states on multiple machines; Chubby is used
to point a master among several shadow masters
„Fast recovery;
„Clients use master only for metadata, not reading/writing actual file
06-15

Metadata
n Three types:
„File and chunk namespaces
„Mapping from files to chunks „Locations of chunk replicas
n All metadata is in memory.
„Large chunk size ensures small meta data size
„First two are kept persistent in an operations log for recovery.
„Third is obtained by querying chunkservers at startup and periodically thereafter.
¡ Chunkservers may come and go or have partial disk failure
n Ask yourself: In a small Hadoop cluster, you may be able to create directory on HDFS but cannot upload files, what could be the issue?
06-16

Operation Log and Master Recovery
nMetadata updates are logged
„Example updates: create new directory, new files „Log replicated on remote machines
nTake global snapshots (checkpoints) to truncate logs
„Memory mapped (no serialization/deserialization) n Recovery
„Latest checkpoint + subsequent log files
06-17

Scalability
nSingle Master is the only possible bottleneck „Minimize Master storage requirement for each file
„Minimize Master involvement in read/write operation ¡ Number of messages
¡ Size of messages
n Design
„Master is only involved at the beginning of Read/Write
operation
„Only limited chunk information is transferred „Actual data movement does not involve master
nA single master is able to server thousands of chunk servers
06-18

Read and Write in storage system with replication
n Consistency and many other issues „Freshness of data, request latency, …
n Many design options depending on targeting apps’ requirements
„How many copies to contact during read/write?
„How many acknowledgements from copies to wait before replying to
client during read/write?
„Order of applying concurrent write requests
„Fault tolerance mechanism „Recovery mechanism
06-19

Overview of GFS read/write operation
n Read from a single replica, the client choses the replica closest to it
n Synchronous write to all replicas „Coordinated by a primary replica
¡ Determine the sequence of concurrent writes
¡ Request and wait for secondary replicas to apply the writes
„The primary replica is not a fixed role ¡ Any replica may become the primary
06-20

GFS Read
1
2
3 4
› Client translates file name and byte offset to chunk index.
› Sends request to master.
› Master replies with chunk handle and location of replicas.
› Client caches this info.
› Client sends request to the closest replica, specifying chunk handle and byte range. › The chunk server sends chunk data to the client
06-21

Lease and Mutation Order
n A mutation is an operation that changes the contents or metadata of a chunk (e.g. a write)
n The master grants a chunk lease to a replica
n The replica holding the lease becomes the primary replica; it
determines the order of updates to all replicas
n Lease
„60 second timeouts
„Can be extended indefinitely
„Extension request are piggybacked on heartbeat messages „After an old lease expires, the master can grant new leases
n Any replica may become primary to coordinate the mutation process
06-22

lease
GFS write
1. The client asks the master which chunkserver holds the current lease for the chunk and the locations of the other replicas.
Grant a new lease if no one holds one
2. The master replies with the identity of the primary and the locations of the other (secondary) replicas
Cached in the client
3. The client pushes the data to all the replicas
4. Once all the replicas have acknowledged receiving the data, the client sends a write request to the primary. The primary assigns consecutive serial numbers to all the mutations it receives, possibly from multiple clients, which provides the necessary serialization.
5. The Primary forwards the write request to all secondary replicas.
6. Secondaries signal completion.
7. Primary replies to client. Errors handled by retrying.
06-23

Data Flow
n Data flow is decoupled from control flow
n Data is pushed from the client to a chain of chunkservers „ Each machine selects the “closest” machine to forward the data
¡ Client pushes data to chunkserver S1 that has replica A and tells it there are two more chunkservers: S2 and S3 in the chain
¡ S1 pushes the data to S2, which is closer than S3 and tells it S3 is in the chain. S2 has primary replica.
¡ S2 pushes the data to S3 and tells it you are the last one. S3 has replica B
n The network topology is simple enough that “distances” can be accurately estimated from IP addresses
n Data is pushed in a pipelined fashion
„Each trunk server starts forwarding immediately after it receives some data
06-24

High Availability (HA) Strategies
nAny server may be down/unavailable at a given time
nHA of the overall system relies on two simple strategies
„fast recovery
¡Restore states quickly when a server (master or
chunk) is back to life
„Replication
¡Putting extra copies of data
06-25

Fast Recovery
n Key Principle: Small meta data n Chunkservers maintain
„Checksums for 64kb blocks of data for data integrity check „Replica/chunk version number for data freshness check
n Master maintain
„Data information (owner, permission, mapping, etc) „Memory image and short operation log enable fast recovery
n It takes only a few seconds to read this metadata from disk before the server is able to answer queries.
n Master may wait a little longer to receive all chunk location information.
06-26

Master Replication
nMaster operation log and checkpoints are replicated on multiple machine.
nIf the master fails, monitoring infrastructure outside GFS starts a new master process elsewhere (Chubby lock service which utilizes Paxos algorithms to elect a new master among a group of living shadow masters).
nShadow masters provides read-only access when the primary master is down.
06-27

Chunk replica
nChunk replica is created within the cluster
nMaster is responsible for chunk replica creation and
placement
„Decide where to put replica
„Decide if a chunk needs new replica
„Decide if a replica needs to migrate to a new location
06-28

Chunk Replica Placement
n Goal
„Maximize data reliability and availability „Maximize network bandwidth
nNeed to spread chunk replicas across machines and racks
„Read can exploit the aggregate bandwidth of multiple racks (read only needs to contact one replica)
„Write has to flow through multiple racks (write has to be pushed to all replicas)
06-29

Outline
n Distributed Execution Basics
n GFS and HDFS
nYARN
„Basic component of YARN
„YARN scheduling
„Data locality constraints
„Hadoop Straggler handling mechanism
06-30

YARN Framework
n Yet Another Resource Negotiator
„A general resource management framework.
Short term processes started by user program
node
node
node
Daemon process
node
06-31

YARN Concepts
n YARN’s world view consists of applications requesting resources
n Resource Manager (one per cluster)
„“Is primarily, a pure scheduler. In essence, it’s strictly limited to arbitrating available resources in the system among the competing applications”
n Node Manager (one per node)
„Responsible for managing resources on individual node
n Application Master (one per application)
„Responsible for requesting resources from the cluster on behalf of
an application
„Monitor the execution of application as well
„Framework specific
¡ MapReduce application’s AM is different to Spark application’s AM
06-32

YARN Resource Concepts
n Resource Request
„In the current model, resource requirements are expressed mainly in
Memory and/or CUP cores „Depends on configured scheduler
n Container
„Resource allocation is in the form of container
„A Container grants rights to an application to use a specific amount of resources (memory, cpu etc.) on a specific host.
„MR framework uses containers to run map or reduce tasks ¡ Each container runs a map or a reduce task
„Spark users containers to run operations. ¡ Each container may run many operations
„Container is launched by NodeManager
06-33

Resource Allocation: MapReduce
n MapReduce Framework’s resource requirements can be worked out based on workload features
„ The same application running on the same data set has the same resource requirements
n The number of map tasks depends on the number of partitions of the input data
n The number of reduce tasks is specified by developer based on estimation of the map output size
n There is always an AM that need to run in a container
n Example: A MapReduce Job with 10 map tasks and 3 reduce tasks would need 14 containers. The first container runs its AM, which would request 10 containers for map tasks (mappers) and 3 containers for reduce tasks (reducers).
n Containers are requested at different time.
„ Containers for map task will be requested early and finishes early
„ Containers for reduce task will be requested late and also finishes late
06-34

Resource Allocation: Spark
n Spark’s resource requirements are more flexible and are specified by developers
n Containers are requested at the same time and stay through out the life time of the application
n Each container can run different number of operations
n Developers can experiment with various setting to optimize performance
06-35

YARN Walkthrough
1. Client submit application and resource requirement
2. RM contacts NM to launches the first container for this app, and run its AM in it
3. AM registers itself with RM, client also get information about AM 4. AM negotiates with RM to request containers.
5. AM contact NM to launch containers and run app code in each 6. App code inside container send progress information to AM
7. Client can communicate with AM to obtain progress information 8. AM deregisters with RM and all containers can be removed
06-36

YARN and MR memory configuration
n YARN needs to know
„The total amount of memories YARN can use
„How to break up the total resources into containers ¡ Minimum container size, maximum container size, etc.
n ApplicationMaster for MapReduce application needs to know
„ Memory requirement for Map and Reduce tasks
¡ We usually assume Reduce tasks need more memory
„Each task is running on a JVM, the JVM heap size needs to be set as well
06-37

Memory Configuration Example
n Node capacity „48G Ram, 12 core
n Memory allocation
„Reserve 8G Ram for OS and others (e.g. HBase)
„ YARN can use up to 40G
„Set Minimum container size to 2G
¡ Each node may have at most 20 containers
„Allow each map task to use 4G and reduce task to use 8G
¡ Each node may run 10 map tasks or 5 reduce task or a combination of the two
06-38

Resource Scheduling
n Resource Scheduling deals with the problem of which requests should get the available resources when there is a queue for resources
n Early MRv1 uses FIFO as default scheduling algorithm „Large job that comes first may starve small jobs
„Users may experience long delay as their jobs are waiting in queue
n YARN is configured to support shared multi-tenant cluster „Capacity Scheduler
„Fair Scheduler
„FIFO Scheduler
06-39

Hadoop: the definitive guide, 4th edition, page 87
06-40

Capacity Scheduler
n Capacity Scheduler is designed for multi-tenancy situation
„Each organization has a dedicated queue with a given fraction of the cluster capacity
¡ Queues may be further divided to set up capacity allocation within organization
¡ A single job does not use more resources than the queue capacity, but if there are more than one job in the queue, and there are idle resources, spare resources can be allocated to jobs in the queue
¡ How much more resources can be allocated to a given queue is configurable
¡ Which users can submit jobs to which queue is configurable
http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
06-41

Fair Scheduler
n The principle of fair scheduling is to allocate resources so that all running applications get similar share of resources
„E.g. if there are R units of resources and J jobs, each job should get around R /J units of resources.
n YARN fair scheduler works between queues: each queue gets a fair share of the cluster resources
„Within queue, the scheduling policy is configurable: FIFO or Fair
Hadoop: the definitive guide, 4th edition, page 91
06-42

Fair Scheduler – Queue Placement
n Fair scheduler uses a rule based system to determine which queue an application should be placed in
„Example queue placement policy


n All queues in capacity scheduler needs to be specified beforehand by cluster administrator; Queues in fair scheduler can be created on the fly
06-43

Data Locality Constrains
n Data intensive workloads (MapReduce/Spark) „have storage attached to computers.
„Scheduling tasks near data improves performance.
n The requirements of fairness and locality often conflict
„A strategy that achieves optimal data locality will typically delay a job
until its ideal resources are available
„Fairness benefits from allocating the best available resources to a job as soon as possible after they are requested
n General third party resource management system, e.g. YARN, might not follow the data locality preference
06-44

Hadoop’s straggler handling mechanism
n Speculative execution
„If a node is available but is performing poorly, this is called a
straggler
„MapReduce has a build-in mechanism to run a speculative copy of its task on another machine to finish the computation faster.
„Speculative task attempts could be successful or just a waste
n Who manages this speculative mechanism „YARN resource manager
„YARN node manager
„Application Master?
06-45

A successful speculative task attempt
06-46

A not useful speculative task attempt
The first attempt succeeded while the speculative one is killed
06-47

Progress score
n Hadoop monitors task progress using a progress score to select speculative tasks
„ Map task’s progress score is the fraction of input data read
„ Reduce task’s execution is divided into three phases, each of which account for 1/3 of
the score. In each phases, the score is the fraction of data process
n When a task’s progress score is less than the average for its category minus 0.2 and the task has run for at least one minute, it is marked as a straggler
For a reduce task, the execution is divided into three phases, each of which accounts for 1/3 of the score progress score is the fraction of input data read
Copy phase
sort phase
reduce phase
086-48

References
n Sanjay Ghemawat, Howard Gobioff and Shun-Tak Leung, The Google File System. In Proceedings of the 19th ACM Symposium on Operating Systems Principles (SOSP’03),2003
n Tom White, Hadoop The Definitive Guide, 4th edition, O’Reilly, 2015 „ Chapter 4, YARN
„ Chapter 9 , MapReduce Features
n Apache Hadoop Yarn Introduction
„ http://hortonworks.com/blog/introducing-apache-hadoop-yarn/
„ http://hortonworks.com/blog/apache-hadoop-yarn-background-and-an-overview/
n Determine YARN and MapReduce Memory configuration Settings „ http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-
2.0.6.0/bk_installing_manually_book/content/rpm-chap1-11.html
n How to plan and configure YARN and MapReduce 2 in HDP
„ http://hortonworks.com/blog/how-to-plan-and-configure-yarn-in-hdp-2-0/
06-49

References
n Improving MapReduce Performance in Heterogeneous Environment. Matei, Zaharia, Andy Konwinski, Anthony D. Joseph, Randy Katz, Ion Stoica. OSDI’2008
n Job Scheduling
„ FairScheduler: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-
site/FairScheduler.html
„ CapacityScheduler: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn- site/CapacityScheduler.html
06-50