Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析
点击关注上方“
知了小巷
”,
设为“置顶或星标”,第一时间送达干货。
ApplicationMasterResourceManager
“通用”YARN应用涉及的角色及交互:
RM:ResourceManager
AM:ApplicationMaster
NM:NodeManager
交互中用到的主要通信协议:
ApplicationClientProtocol
ApplicationMasterProtocol
ContainerManagementProtocol
ClientResourceManager
客户端程序与RM进行交互,通过YarnClient对象来实现。
ApplicationMasterResourceManager
AM与RM进行交互,通过AMRMClientAsync对象来实现,
AMRMClientAsync.CallbackHandler异步处理事件信息。
ApplicationMasterNodeManager
AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,
NMClientAsync.CallbackHandler异步处理Container事件。
接口请求和响应的proto message定义:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。
Hadoop版本3.2.1
Flink版本1.10
1.以Flink中Yarn per-job模式下
JobManager——
进程YarnJobClusterEntrypoint为例
// 起点是
YarnJobClusterEntrypoint#main
方法
// 落点是
YarnResourceManager
/** * The yarn implementation of the resource manager. Used when the system is started * via the resource framework YARN. */ public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { // 传说中的ApplicationMaster ... /** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */ private AMRMClientAsync resourceManagerClient;
/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; ... }
AMRMClientAsync
abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。
需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。
2.简单实例MyCallbackHandler
AMRMClientAsync客户端生命周期
3.AMRMClientAsync部分源码
4.AMRMClientAsyncImpl部分源码
5.AMRMClient部分源码
package org.apache.hadoop.yarn.client.api;
import ...
// 抽象类AMRMClient public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends AbstractService { ... }
6.AMRMClientImpl部分源码
7.ApplicationMasterProtocol部分源码
ApplicationMasterProtocol接口比较简单,只有三个方法
package org.apache.hadoop.yarn.api;
import ... // 接口ApplicationMasterProtocol public interface ApplicationMasterProtocol { // 向RM注册自己(AM) public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException; // 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了 public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException; // AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse // 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager) // 最多执行一次,不会重复和过度分配 public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException; }
8.ApplicationMasterProtocolPBClientImpl部分源码
package org.apache.hadoop.yarn.api.impl.pb.client;
import ...
// 客户端ApplicationMasterProtocol接口的实现 public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {
private ApplicationMasterProtocolPB proxy;
public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class); // 底层会调用java.lang.reflect.Proxy#newProxyInstance proxy = (ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion, addr, conf); } ... }
9.ApplicationMasterProtocolPB
package org.apache.hadoop.yarn.api;
import ...
@Private @Unstable @ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB", protocolVersion = 1) public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {
}
10.ApplicationMasterProtocolService的定义
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto
option java_package = "org.apache.hadoop.yarn.proto"; option java_outer_classname = "ApplicationMasterProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.yarn;
import "yarn_service_protos.proto";
service ApplicationMasterProtocolService { rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto); rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto); rpc allocate (AllocateRequestProto) returns (AllocateResponseProto); }
11.ApplicationMasterProtocolPBServiceImpl部分源码
ApplicationMasterProtocolPB接口的服务端(RM)实现
package org.apache.hadoop.yarn.api.impl.pb.service;
import ...
@Private public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {
private ApplicationMasterProtocol real; // ResourceManager启动时会通过此构造方法初始化real对象 public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) { this.real = impl; } ... }
12.ApplicationMasterService部分源码
package org.apache.hadoop.yarn.server.resourcemanager;
import ...
@SuppressWarnings("unchecked") @Private public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { // 最终会调用到这里的方法并返回结果 ... }
【END】