After our first post about re-prioritizing already submitted and running jobs on different queues we have received many questions and feedbacks about the Capacity Scheduler internals. While there is
some documentation available, there is no extensive and deep documentation about how it actually works internally. Since it’s all event based it’s pretty hard to understand the flow – let alone debugging it. At SequenceIQ we are working on a heuristic cluster scheduler – and understanding how YARN schedulers work was essential. This is part of a larger piece of work – which will lead to a fully dynamic Hadoop cluster – orchestrating Cloudbreak – the first open source and Docker based Hadoop as a Service API. As usual for us, this work and what we have already done around Capacity and Fair schedulers will be open sourced (or already contributed back to Apache YARN project).
The Capacity Scheduler internals
The CapacityScheduler is the default scheduler used with Hadoop 2.x. Its purpose is to allow multi-tenancy and share resources between multiple organizations and applications on the same cluster. You can read about the high level abstraction here. In this blog entry we’ll examine it from a deep technical point of view (the implementation can be found here as part of the ResourceManager). I’ll try to keep it short and deal with the most important aspects, preventing to write a book about it (would be still better than twilight).
The animation shows a basic application submission event flow, but don’t worry if you don’t understand it yet, hopefully you will when you’re done with the reading.
It all begins with the configuration. The scheduler consists of a queue hierarchy, something like this (except it’s xml ah.. capacity-scheduler.xml):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Generally, queues are for to prevent applications to consume more resources then they should.
Be careful when determining the capacities, because if you mess it up the
ResourceManager won’t start:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Although it does not imply, but application submission is only allowed to leaf queues. By default all application is submitted to
a queue called
default. One interesting property is the
schedule-asynchronously about which I’ll talk later.
Once the ResourceManager is up and running, the messaging starts. Mostly everything happens via events. These events are distributed with
among the registered event handlers. The down side of the event driven architecture that it’s hard to follow the flow because
events can come from everywhere. The CapacityScheduler itself is registered for many events, and act based on these events.
Code snippets are from branch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
Each time a node joins the cluster the ResourceTrackerService
NodeManager and as part of the transition sends a
NodeAddedSchedulerEvent. The scheduler keeps track of
the global cluster resources and adds the node’s resources to the global.
It is also needed to update all the queue metrics since the cluster got bigger, thus the queue capacities also change. More likely to happen
that a new application can be scheduled. If the
isWorkPreservingRecoveryEnabled is enabled on the
ResourceManager it can recover
containers on a re-joining node.
There can be many reasons that a node is being removed from the cluster, but the scenario is almost the same as adding one. A
NodeRemovedSchedulerEvent is sent and the scheduler subtracts the node’s resources from the global and updates all the queue metrics.
Things can be a little bit complicated since the node was active part of the resource scheduling and can have running containers and
reserved resources. The scheduler will kill these containers and notify the applications so they can request new containers and
unreserve the resources.
1 2 3
On application submission
AppAddedSchedulerEvent is made and the scheduler will decide to accept the application or not. It depends whether it
was submitted to a leaf queue and the user have the appropriate rights (ACL) to submit to this queue and the queue can have more applications. If
any of these fails the scheduler will reject the application by sending an
RMAppRejectedEvent. Otherwise it will register a new
SchedulerApplication and notify the target queue’s parents about it and updates the queue metrics.
The analogy is the same as between
NODE_REMOVED. Updates the queue metrics and notifies the parent’s that an application finished, removes the application and sets its final state.
APP_ADDED event the application is in
inactive mode. It means it won`t get any resources scheduled for – only an attempt to run it. One application can have many attempts as it can fail for many reasons.
1 2 3 4 5 6
Attempt states are transferred from one to another. By sending an
AppAttemptAddedSchedulerEvent the scheduler actually tries to allocate resources. First, the application goes into the pending applications list of the queue and if the queue limits allows it, it goes into the active applications list. This active application list is the one that the queue uses when trying to allocate resources.
It works in FIFO order, but I’ll elaborate on it in the
AppAttemptRemovedSchedulerEvent the scheduler cleans up after the application. Releases all the allocated, acquired, running containers
(in case of
ApplicationMaster restart the running containers won’t get killed), releases all reserved containers,
cleans up pending requests and informs the queues.
1 2 3 4 5
NodeUpdateSchedulerEvents arrive every second from every node. By setting the earlier mentioned
schedule-asynchronously to true
the behavior of this event handling can be altered in a way that container allocations happen asynchronously from these events. Meaning
the CapacityScheduler tries to allocate new containers in every 100ms on a different thread.
Before going into details let’s discuss another important aspect.
method is used as a heartbeat. This is the most important call between the
ApplicationMaster and the scheduler. Instead of using a simple empty message, the heartbeat can contain resource requests of the application. The reason it is important here is that the scheduler, more precisely the queue – when tries to allocate resources – it will check the active applications in FIFO order and see their resource requests.
1 2 3
Let’s examine these requests:
- Priority: requests with higher priority gets served first (mappers got 20, reducer 10, AM 0)
- Capability: denotes the required container’s size
- Location: where the AM would like to get the containers. There are 3 types of locations: node-local, rack-local, off-switch.
The location is based on the location of the blocks of the data. It is the
ApplicationMaster’s duty to locate them. Off-switch means that any node in the cluster is good enough. Typically the
ApplicationMaster’s container request is an off-switch request.
- Relax locality: in case it is set to false, only node-local container can be allocated. By default it is set to true.
Let’s get back to
NODE_UPDATE. What happens at every node update and why it happens so frequently? First of all, nodes are running the containers. Containers start and stop all the time thus the scheduler needs an update about the state of the nodes. Also it updates their resource capabilities as well. After the updates are noted, the scheduler tries to allocate containers on the nodes. The same applies to every node in the cluster. At every
NODE_UPDATE the scheduler checks if an application reserved a container on this node. If there is reservation then tries to fulfill it. Every node can have only one reserved container. After the reservation it tries to allocate more containers by going through all the queues starting from
root. The node can have one more container if it has at least the minimum allocation’s resource capability. Going through the child queues of
root it checks the queue’s active applications and its resource requests by priority order. If the application has request for this node it tries to allocate it. If the relax locality is set to true it could also allocate container even though there is no explicit request for this node, but that’s not what’s going to happen first. There is another term called delay scheduling. The scheduler tries to delay any non-data-local
request, but cannot delay it for so long. The
localityWaitFactor will determine how long to wait until fall back to rack-local then the end to off-switch requests. If everything works out it allocates a container and tracks its resource usage. If there is not enough resource capability one application can reserve a container on this node, and at the next update may can use this reservation. After the allocation is made the
ApplicationMaster will submit a request to the node to launch this container and assign a task to it to run.
The scheduler does not need to know what task the AM will run in the container.
responsibility to check if a container expires and when it does it sends an
ContainerExpiredSchedulerEvent and the scheduler will notify the application to remove the container. The value of how long to wait until a container is considered dead can be configured.
The animation on top shows a basic event flow starting from adding 2 nodes and submitting 2 applications with attempts. Eventually the node updates tries to allocate resources to those applications. After reading this post I hope it makes sense now.
In the next part of this series I’ll compare it with FairScheduler, to see the differences.