由ApplicationMaster起动一个Container的步骤
1. 申请Container
1) 连接ResourceManager
Configuration conf = new Configuration(); YarnRPC rpc = YarnRPC.create(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf); // 获取ResourceManager的地址 InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); AMRMProtocol resourceManager = ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
2) 向ResourceManager注册为ApplicationMaster
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); // 设定该Application的相关信息 appMasterRequest.setApplicationAttemptId(appAttemptID); appMasterRequest.setHost(appMasterHostname); appMasterRequest.setRpcPort(appMasterRpcPort); appMasterRequest.setTrackingUrl(appMasterTrackingUrl); RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterRequest);
3) 向ResourceManager申请Container
// 初始化申请Container的request (包含运行Container的host、优先级、占用内存) ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setHostName("*"); request.setNumContainers(numContainers); Priority pri = Records.newRecord(Priority.class); pri.setPriority(requestPriority); request.setPriority(pri); Resource capability = Records.newRecord(Resource.class); capability.setMemory(containerMemory); request.setCapability(capability); List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); resourceReq.add(request); // 向ResourceManager发送报告 (包含第几次申请、需要申请Container的request、保存需要释放Container的List、已分配Container与总共Container的比例) AllocateRequest req = Records.newRecord(AllocateRequest.class); CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId AtomicInteger rmRequestID = new AtomicInteger(); req.setResponseId(rmRequestID.incrementAndGet()); req.setApplicationAttemptId(appAttemptID); req.addAllAsks(resourceReq); req.addAllReleases(releasedContainers); req.setProgress((float)numCompletedContainers.get()/numTotalContainers); AllocateResponse resp = resourceManager.allocate(req); AMResponse amResp = resp.getAMResponse();
2. 为申请到的Container分配任务
1) 获取上面申请到的Container
List<Container> allocatedContainers = amResp.getAllocatedContainers(); for (Container allocatedContainer : allocatedContainers) { }
2) 初始化运行Container的上下文 (
ContainerId
User:运行该Container的用户,即运行当前Application的用户
Resource:ResourceManager分配给该Container的资源
ContainerToken:Security模式下的SecurityTokens
LocalResources:该Container所运行的程序所需的资源,比如程序所在的jar包
ServiceData:
Environment:该Container所运行的程序所需的环境变量,KeyValue格式
Commands:该Container所运行程序的命令,比如运行的为java程序,即$JAVA_HOME/bin/java org.yourclass
ApplicationACLs:该Container所属的Application的访问控制列表
)
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); ctx.setContainerId(container.getId()); ctx.setResource(container.getResource()); ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); Map<String, String> childEnv = new HashMap<String, String>(); ctx.setEnvironment(childEnv); // 设定LocalResource // 将jar包上传到HDFS上 FileSystem fs = FileSystem.get(conf); Path src = new Path(存放Container程序的本地路径); String pathSuffix = appName + "/" + appId.getId() + "/ChildProgram.jar"; Path dst = new Path(fs.getHomeDirectory(), pathSuffix); fs.copyFromLocalFile(false, true, src, dst); FileStatus destStatus = fs.getFileStatus(dst); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LocalResource childRsrc = Records.newRecord(LocalResource.class); childRsrc.setType(LocalResourceType.FILE); childRsrc.setVisibility(LocalResourceVisibility.APPLICATION); childRsrc.setTimestamp(destStatus.getModificationTime()); childRsrc.setSize(destStatus.getLen()); childRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(dst))); localResources.put("Child.jar", amJarRsrc); ctx.setLocalResources(localResources); // 设定Command Vector<CharSequence> vargs = new Vector<CharSequence>(5); vargs.add(JavaCommand); vargs.add(JavaArgs); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } List<String> commands = new ArrayList<String>(); commands.add(command.toString()); ctx.setCommands(commands); StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx);
3) 连接该Container属于的ContainerManager
String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); ContainerManager cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
4) 通过ContainerManager启动Container
cm.startContainer(startReq);
3. 轮询获取Container的状态
1) 向ContainerManager获取Container的状态
GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); statusReq.setContainerId(container.getId()); GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq); ContainerStatus containerStatus = statusResp.getStatus();
4. 更新Application状态
1) 在所有Container运行成功/失败后通知ResourceManager该Application运行成功/失败
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); finishReq.setAppAttemptId(appAttemptID); finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); // finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); finishReq.setDiagnostics(diagnostics); resourceManager.finishApplicationMaster(finishReq);