Preview

  • the whole process to allocate, update and reclaim resource
  • mechanism from the view of lock and thread
  • FS Metric
  • performance limitation and what will cause this
  • the process of RM startup
  • artifact: flow char to make everyone know the important detail of FS in terms of performance bottleneck, configuration&metric&log scope of influence, how does each sub component collaborate

Fair scheduler inside

done updateThread

done updateCall

done Preemption

done Allocate

done NMUpdate

doing FS Algorithms and related configuration

doing RM command

YARN HA & Zookeeper
the class hierachy of FS
Yarn Preserve
Resource Preserved
Queue Structure and configuration
Policy, Priority, local&rack local&any
how to extract usful info form RM log? # make a flow char, introduce what kind of log will be generated on each flow point.
summarize the design pattern of RM and its class diagram
introcude how to inplement a state machine in Java&Python, how state machine takes greate effect in YARN?
introcude java concurrent knowledge, lock level, class and example, performance, use case.(inspired by different type of lock usage in RM)
timeline server in YARN?
introduce DRF

The process of allcating container for AM

ResourceRequest:

1
2
3
4
5
priority
resourceName
capacity
numContainers # decrease by 1 everytime this request is satisfied on allocation, if it reaches 0, the request will be removed
relaxLocality: Boolean

Every allocateRequest and Response has a monotone increasing number to ensure its correctness.

  1. FS.releaseContainers(List, SchedulerApplicationAttempt)
    • FS.completedContainer() in a for loop

  2. lock(FSAppAttempt, current application)
    • upsert request in FSAppAttempt.appSchedulingInfo

      app has lots of priorities, each priority has requests for different type of resource, each request has a numer of containers

    • FSAppAttempt.updateBlackList
    • FSAppAttempt.pullNewlyAllocatedContainersAndNMTokens() # it means containers are allocated asynchronously.
  1. AM update interval

How&When to release container?

entrance: (FS.lock) FS.completedContainer(RMContainer, ContainerStatus, RMContainerEventType) # unreserve or release container

  1. FSAppAttempt.containerCompleted()
    • remove from list of newly allocated containers
    • send event to StateMachine to update the container state
    • remove from list of containers
    • update queue resource; update app resource consumption
    • remove the container from preemption cache
  2. remove container from SchedulerNode and add its available resource
  3. update root queue resource.(subqueue metrics might be a little out of date until the update interval)

The process of NodeUpdate

// node resource is updated only on node registration
// what is NodeHeartbeatResponse.nextHeartbeatInterval?

ResourceTracker.nodeHeartbeat(NodeHeartbeatRequest):
Here is the node heartbeat sequence…

  1. Check if it’s a registered node
  2. Check if it’s a valid (i.e. not excluded) node
  3. Check if it’s a ‘fresh’ heartbeat i.e. not duplicate heartbeat
  4. Send healthStatus to RMNode, update containers info of RMNode
    //update response: (RMNode.writeLock.lock()) remove container,application from NM
  5. dispatch the NodeHeartbeatRequest to update the container status
    [E]STATUS_UPDATE –> RMNodeImpl –> [E]NODE_UPDATE –> (FS.lock )FS.nodeUpdate(RMNode)
  6. FS.nodeUpdate()
    • update the information of newly launched containers in AppAttempt
    • completedContainer

//TODO: will the nodeHeartbeat finish until the end of FS.nodeUpdate? Is dispatch sync or async?

// TODO: hightlight the content of NMResponse to show what the NM will do

  1. FS.nodeUpdate
  2. FS.continuousScheduling

core: FSParentQueue.assignContainer(FSSchedulerNode)

1. 

// if FSSchedulerNode.getReservedContainer!=null, this node doesn’t participate the assignment.
ParentQueue.assignContainer –> LeafQueue.assignContainer –> FSApp.assignContainer # depth first search based on priority

The RM command

NodeAction.SHUTDOWN

  • nodeManager version is not invalid
  • node not in hostsList or in excludeList # I find it ok for both hostname and ip
  • node doesn’t has enough minimum allocation resource
    NodeAction.NORMAL

FAQ

  1. what is the AM Command?
  2. what is reserved container & resource? How is it treated differently? # one node can only reserve one container; one app can reserve multiple containers for on different node for different priority; a node’s reserved container can be overridden by that of a more precedent app
  3. what is pending resource in AppSchedulingInfo?
  4. why to deactivate the application? activeUsersManager.deactivateApplication(user, applicationId)
  5. what is unmanged am?
  6. what is content in blackList of FSAppAttempt?
  7. what is the importance of FSAppAttempt headroom which is updated on allocation?
  8. what is clusterTimeStamp?

    1
    2
    3
    4
    5
    6
    7
    8
    if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
    < nmExpireInterval) {
    LOG.info(containerId + " doesn't exist. Add the container"
    + " to the release request cache as it maybe on recovery.");
    synchronized (attempt) {
    attempt.getPendingRelease().add(containerId);
    }
    }
  9. nodeLocality, rackLocality

  10. summarize all the event between FS, NM and AM as well as how those events are handled
  11. important list all the resource name in FSAppAttempt, difference between request locality, LocalityThreshold? (vcore, memory? ANY, rackName, nodeName, off switch?) locality logic in FSApp.assignContainer?
  12. configurationj tuning

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /** Whether to assign multiple containers in one check-in. */
    public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
    protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;

    /** Whether to give more weight to apps requiring many resources. */
    protected static final String SIZE_BASED_WEIGHT = CONF_PREFIX + "sizebasedweight";
    protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;

    /** Maximum number of containers to assign on each check-in. */
    protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
    protected static final int DEFAULT_MAX_ASSIGN = -1;

    /** The update interval for calculating resources in FairScheduler .*/
    public static final String UPDATE_INTERVAL_MS =
    CONF_PREFIX + "update-interval-ms";
    public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
  13. what is applicableDepth and how to change it?

  14. if assignContainerPreCheck(node) in FSLeafQueue.assignContainer is necessary ? this has been checked in the previous step
  15. how to determine the minShare, fairShare and weight of FSApp?
  16. replace the compairison method with the heap sort in FS?
  17. unmanagedAM is the AM runs on independent mode.
  18. what is the essential meaning of fairness, the chance of being scheduled? I find that some app use more resource than others, because of weight or usage based weight? will the resource of the lower weight app be preempted by a higher weight app of the same queue?
  19. what is the initial fair share of app?