一、yarn任务执行流程图
在分析任务之前先走一下yarn内部的流程细节。
二、RM 内部处理提交阶段运行流程
如上图流程所示:
1.client 提交任务给yarn,yarn 这边会获取任务的AM相关资源,client在提交阶段会上传job.split (数据切分,相应的map数量), job.xml (配置相关)等信息,以及认证信息,封装成AM启动的资源,AM启动过程会去下载。
接下来会执行 rmAppManager.submitApplication ,内部调用createAndPopulateNewRMApp ,yarn内部会创建一个RMAppImpl内存对象,并维护在内存中。这里会判断是否是CS调度器,如果是CS调度器还会检查用户是否有队列提交权限。
并且做初始的认证工作,并renew token ,并触发一个RMAppEvent事件,时间类型 RMAppEventType.START。
2.上面这个start事件会通过RM第一层的异步事件分发器进行分发:如下8种事件类型对应8种事件处理器。RMAppEventType 对应 ApplicationEventDispatcher 处理器,这个事件处理器也是一个事件分发器,进行第二次转发。
scheduler.event.SchedulerEventType" -> {EventDispatcher@8556} "Service SchedulerEventDispatcher rmapp.RMAppEventType" -> {ResourceManager$ApplicationEventDispatcher@8557} rmapp.attempt.RMAppAttemptEventType" -> {AsyncDispatcher$MultiListenerHandler@8558} rmnode.RMNodeEventType" -> {ResourceManager$NodeEventDispatcher@8559} RMFatalEventType" -> {ResourceManager$RMFatalEventDispatcher@8560} RMAppManagerEventType" -> {RMAppManager@8561} NodesListManagerEventType" -> {NodesListManager@8562} "Service NodesListManager amlauncher.AMLauncherEventType" -> {ApplicationMasterLauncher@8563} "Service ApplicationMasterLauncher
3. 经过上面这层转发,调用的是刚刚创建的 RMAppImpl handle方法。RMAppImpl 本身维护了一个状态机,根据当前状态调用相应的状态转换方法。此时 调用该状态转换过程:
Transition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) ,执行 RMAppNewlySavingTransition.transition 方法。
4.上面方法会将app相关提交信息持久化,以便于恢复,这一步也会产生一个事件 :RMStateStoreAppEvent,事件类型:RMStateStoreEventType.STORE_APP,该事件会进入RMStateStore 的 AsyncDispatcher 事件分发器,
dispatcher = new AsyncDispatcher("RM StateStore dispatcher");
dispatcher.init(conf);
rmStateStoreEventHandler = new ForwardingEventHandler();
dispatcher.register(RMStateStoreEventType.class, rmStateStoreEventHandler);
最终会调用 ForwardingEventHandler 的handle方法。
RMStateStore 的 Transition(RMStateStoreState.ACTIVE,EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_APP, new StoreAppTransition())
StoreAppTransition.transition 方法。 存储完app信息,产生RMAppEvent 事件,事件类型 RMAppEventType.APP_NEW_SAVED。
这个事件会进入第一层事件分发器 rmDispatcher , 最终会执行RMAppImpl 的状态转换方法,此时app 状态 APP_NEW_SAVED 。
HADOOP-878 输出Application Report信息到本地磁盘 任务监控的数据逻辑如下框所示,所以说,只有在改app有相关事件处理时,才会记录app运行状态的数据。
根据这个事件类型 ,就会进行app的调度执行 :Transition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
AddApplicationToSchedulerTransition 直接产生 AppAddedSchedulerEvent 这个事件,事件类型:SchedulerEventType.APP_ADDED 。这个事件也会进入到第一层的事件分发器,根据上面的8种事件类型,最终 EventDispatcher 内部handler :ResourceScheduler 进行处理。
CS 与 FS 调度器提交到队列的流程基本一致,区别看后面的分配流程,队列监控组件会添加到内部内存对象里,同时会在调度器内部构建一个调度的app 对象SchedulerApplication。
判断相应的队列 ,以及队列的提交权限,都符合,就会getMetrics 增加提交的应用,并产生 RMAppEvent ,事件类型:RMAppEventType.APP_ACCEPTED 。
Transition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
StartAppAttemptTransition.transition 会创建 RMAppAttemptImpl ,维护在当前 RMAppImpl 内,后续如果有重试,就会产生多个。 并且会产生 RMAppStartAttemptEvent 事件 ,事件类型 :RMAppAttemptEventType.START 。
ApplicationAttemptEventDispatcher 处理,最终调用 RMAppAttemptImpl 的handle。 RMAppAttemptImpl 也维护了一个状态机。 根据事件类型,执行的是这个转换过程:
Transition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition())
AttemptStartedTransition.transition . 此时会注册 到内部
appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId);
并会 产生 AppAttemptAddedSchedulerEvent 事件,事件类型:SchedulerEventType.APP_ATTEMPT_ADDED。
FairScheduler handle
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
queue.addApp(attempt, runnable);
if (runnable) {
runnableApps.add(app);
} else {
nonRunnableApps.add(app);
}
incUsedResource(app.getResourceUsage());
产生 RMAppAttemptEvent事件,事件类型:RMAppAttemptEventType.ATTEMPT_ADDED
Transition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())
ScheduleTransition.transition 进行 AM资源的申请
这里会调用具体的调度器进行处理,这里面就会更新 CS 调度器内部维护的app 等待被调度的资源。
具体看看修改的内存对象
到此为止,RM 内部处理提交阶段的流程走完了。后面就是NM RM心跳交互触发调度的流程。
三、NM 与 RM 心跳流程
在分析RM 与NM 心跳流程前,先看看经过RM处理提交阶段后RM内存内部关键的内存对象
RM本身会维护一个Application的对象RMAppimpl, 可以看到CS调度器内存中维护一个调度的app SchedulerApplication对象 , 这个SchedulerApplication等待的资源是 memory:2048 , Cores:1 。
接下来看的是NM 与 RM 心跳,触发资源分配的逻辑,NM 与 RM 通信的协议是 ResourceTracker,只有这三个方法,最主要是nodeHeartbeat方法。
继续跟踪代码 看看RM怎么处理NM心跳
上面的代码会产生一个 nodeStatusEvent
看看具体的处理器 NodeEventDispatcher
它的处理逻辑跟RMApp的处理逻辑一样,找到具体的RMNodeImpl进行处理
RMNodeImpl 本身也维护了一个状态机,根据当前是Running 状态,调用具体的转换方法
最终会触发一个调度事件
事件类型为 SchedulerEventType.NODE_UPDATE。
它的处理器 SchedulerEventDispatcher
他这边也是一个生产者与消费者模型,放到一个队列
有一个线程进行消费,这个handle就是RM里面的调度器CapacityScheduler 。
看看CapacityScheduler 具体的处理逻辑
到这里 allocateContainersToNode ,基本就是进行资源的分配。
继续往下看 ,看看具体的资源分配逻辑 ,前面会做很多复杂的判断,省略,最终会调用到,ParentQueue,这里会判断是否有等待分配的资源
最终还是要分配到叶子队列,会一直递归找到叶子队列,具体调度器的分配流程,后面再详细分析。
同样通过NM心跳就能获取分配的结果。