Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

点击关注上方“


知了小巷



”,

设为“置顶或星标”,第一时间送达干货。

ApplicationMasterResourceManager

“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
ClientResourceManager
客户端程序与RM进行交互,通过YarnClient对象来实现。
ApplicationMasterResourceManager
AM与RM进行交互,通过AMRMClientAsync对象来实现,
AMRMClientAsync.CallbackHandler异步处理事件信息。
ApplicationMasterNodeManager
AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,
NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
1.以Flink中Yarn per-job模式下
JobManager——
进程YarnJobClusterEntrypoint为例

// 起点是
YarnJobClusterEntrypoint#main
方法

// 落点是
YarnResourceManager

/**

 * The yarn implementation of the resource manager. Used when the system is started

 * via the resource framework YARN.

 */

public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>

        implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {

    // 传说中的ApplicationMaster

    ...

    /** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */

    private AMRMClientAsync resourceManagerClient;


/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; ... }

AMRMClientAsync
abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。
需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。
2.简单实例MyCallbackHandler

AMRMClientAsync客户端生命周期


3.AMRMClientAsync部分源码


4.AMRMClientAsyncImpl部分源码


5.AMRMClient部分源码

package org.apache.hadoop.yarn.client.api;


import ...
// 抽象类AMRMClient public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { ... }

6.AMRMClientImpl部分源码


7.ApplicationMasterProtocol部分源码
ApplicationMasterProtocol接口比较简单,只有三个方法

package org.apache.hadoop.yarn.api;


import ... // 接口ApplicationMasterProtocol public interface ApplicationMasterProtocol { // 向RM注册自己(AM) public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException; // 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了 public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException; // AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse // 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager) // 最多执行一次,不会重复和过度分配 public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException; }

8.ApplicationMasterProtocolPBClientImpl部分源码

package org.apache.hadoop.yarn.api.impl.pb.client;


import ...
// 客户端ApplicationMasterProtocol接口的实现 public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {
private ApplicationMasterProtocolPB proxy;
public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class); // 底层会调用java.lang.reflect.Proxy#newProxyInstance proxy = (ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion, addr, conf); } ... }

9.ApplicationMasterProtocolPB

package org.apache.hadoop.yarn.api;


import ...
@Private @Unstable @ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB", protocolVersion = 1) public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {
}

10.ApplicationMasterProtocolService的定义
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto

option java_package = "org.apache.hadoop.yarn.proto";

option java_outer_classname = "ApplicationMasterProtocol";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

package hadoop.yarn;


import "yarn_service_protos.proto";
service ApplicationMasterProtocolService { rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto); rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto); rpc allocate (AllocateRequestProto) returns (AllocateResponseProto); }

11.ApplicationMasterProtocolPBServiceImpl部分源码
ApplicationMasterProtocolPB接口的服务端(RM)实现

package org.apache.hadoop.yarn.api.impl.pb.service;


import ...
@Private public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {
private ApplicationMasterProtocol real; // ResourceManager启动时会通过此构造方法初始化real对象 public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) { this.real = impl; } ... }

12.ApplicationMasterService部分源码

package org.apache.hadoop.yarn.server.resourcemanager;


import ...
@SuppressWarnings("unchecked") @Private public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { // 最终会调用到这里的方法并返回结果 ... }

【END】