未加星标

跟我一起学 Hadoop YARN(二)

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二05 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏
重要提示:文章比较长而且有大段代码,请尽可能在PC上阅读。
上一篇看的是不是想瞌睡?这一篇我保证都是干货,手把手教你写一个YARN版本的Hello World。 准备环境

我建议的开发环境是Mac或者linux,不建议使用windows作为开发环境(Hadoop压根就不支持Windows),为了便于测试我用Hadoop minicluster作为本机的模拟测试环境。

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

所以只需要依赖minicluster、junit。 对minicluster的scope设置为provider ,所有的hadoop依赖在运行时都会由外部提供;junit只是作为测试所以我们设置为test。

编写Client YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();

首先拿到YarnClient的实例,整个Client都是围绕这个对象进行的操作。还记得上篇我们提到的吗?Client的工作是向ResourceManager请求提交一个Application,ResourceManager恩准我们的请求之后会告诉我们集群现在还剩下多少可用资源。

YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
Resource clusterMax = appResponse.getMaximumResourceCapability(); //集群最大资源

createApplication这句就是在请求ResourceManager;如果请求被恩准函数就会成功返回,通过返回值可以拿到集群当前剩余的资源。接下来Client需要向ResourceManager再次请求――我需要 一个 计算资源(container)用来执行 某个命令行 ;ResourceManager如果恩准了请求会在NodeManager上分配一个计算资源然后执行 某个命令行 。

ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//填充resource和AMContainerSpec
Resource amResource = Records.newRecord(Resource.class);
amResource.setMemory(Math.min(clusterMax.getMemory(), 1024));
amResource.setVirtualCores(Math.min(clusterMax.getVirtualCores(), 4));
appContext.setResource(amResource);
//AMContainerSpec
ContainerLaunchContext clc = Records.newRecord(ContainerLaunchContext.class);
StringBuilder cmd = new StringBuilder();
cmd.append("\"" + ApplicationConstants.Environment.JAVA_HOME.$() + "/bin/java\"")
.append(" ")
.append(appMasterMainClass)
.append(" ")
.append("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT)
.append(" ")
.append("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR);
clc.setCommands(Collections.singletonList(cmd.toString()));
appContext.setAMContainerSpec(clc);

这段应该是整个交互中最复杂的部分,ApplicationSubmissionContext包括两个部分

需要的资源(Resource),VirtualCores是CPU数量、Memroy是内存大小(单位MB)。每一份资源都被称为container,它是NodeManager管理的资源最小单元。(NodeManager会通过不同的Executor来实现不同隔离程度的container――基于JVM的DefaultContainerExecutor(只能隔离内存)或CGroup(可以控制CPU和内存) LinuxContainerExecutor)

需要执行的命令行(ContainerLaunchContext), Commands表示Linux命令

上面的程序其实就是让NodeManager启动一个Container(1G内存、4vCPU),执行 java im.lsn.learnyarn.am.ApplicationMaster 1> stdou 2> stderr 。当然如果没有设置ClassPath,没有把ApplicationMaster所在的Jar放到ClassPath肯定是会出错的。所以我们需要下面两部分代码

//添加执行的Jar
Map<String, LocalResource> localResourceMap = new HashMap<String, LocalResource>();
File appMasterJarFile = new File(appMasterJar);
localResourceMap.put(appMasterJarFile.getName(), toLocalResource(fs, appResponse.getApplicationId().toString(),
appMasterJarFile));
clc.setLocalResources(localResourceMap);
//设置环境变量
Map<String, String> envMap = new HashMap<String, String>();
envMap.put("CLASSPATH", hadoopClassPath());
envMap.put("LANG", "en_US.UTF-8");
clc.setEnvironment(envMap);

为了防止出现乱码,我们把命令行环境变量设置为英文模式,这样命令出错的时候可以看到错误信息。需要解释的是第一条, 添加的JAR必须放到HDFS上,这样NodeManager才可以下载到这些JAR 。 总结一下这一部分

Client申请提交Application,ResourceManager返回集群剩余资源

Client准备好一个计算任务(ApplicationSubmissionContext),包括需要的计算资源(Resource)、需要执行的Linux命令和外部文件(ContainerLaunchContext)――比如Jar

Client把外部文件上传到HDFS上,之后把准备好的计算任务发送给ResourceManager

ResourceManager选择一个合适的NodeManager,把计算任务下发给NodeManager

NodeManager设置好环境变量、下载外部文件、 执行Linux命令 (注意是执行Linux命令)

至此,所有的工作都已经完成了。

编写ApplicationMaster

这部分我们按照最简单的实现

while (true) {
System.out.println("(stdout)Hello World");
System.err.println("(stderr)Hello World");
Thread.sleep(1000);
}

不停的循环,分别输出到标准输出,标准错误输出

调试 @Before
public void before() throws Exception {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
conf.set(YarnConfiguration.RM_HOSTNAME, "localhost");
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, "localhost:8030");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "localhost:8031");
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:8088");
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
MiniYARNCluster yrCluster = new MiniYARNCluster("test", 1, 1, 1);
yrCluster.init(conf);
yrCluster.start();
}

启动测试集群,我们可以通过localhost:8080看到熟悉的Hadoop界面

DemoApplicationClient client = new DemoApplicationClient(conf);
client.setAppMasterJar("/Users/fireflyc/Source/TempSource/learn-yarn/target/learn-yarn-1.0.0-SNAPSHOT.jar");
ApplicationId applicationId = client.submit();
while (report.getYarnApplicationState() != YarnApplicationState.FINISHED) {
ApplicationReport report = client.getApplicationReport(applicationId);
System.out.println(String.format("%f %s", report.getProgress(), report.getYarnApplicationState()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(report.getFinalApplicationStatus());

提交作业,因为我是在intellij中调试不是通过jar运行。所以是没有办法获取到当前所在的jar文件的,只能通过手工设置这个路径。 这个jar是通过maven生成的,所以我们在intellij的Run/Debug Configuration中设置一下,以便自动生成


跟我一起学 Hadoop YARN(二)
执行结果

打开localhost:8088


跟我一起学 Hadoop YARN(二)
跟我一起学 Hadoop YARN(二)

点开sterr和stdout我们会发现程序正常输出了提示信息。

分析结果

打开工程目录中target/test目录


跟我一起学 Hadoop YARN(二)
我们注意到了有两个文件夹

test-localDir-nm-00,这个用来存放的外部资源和YARN生成的脚本。打开nmPrivate/application_xxxx/container_xxxxxx下中的launch_container.sh文件

没错,这就是YARN为我们生成的脚本。里面包含了YARN自己运行需要的环境变量和我们自己定义的环境变量(比如CLASSPATH)注意最后一句

exec /bin/bash -c ""$JAVA_HOME/bin/java" im.lsn.learnyarn.am.ApplicationMaster 1>/Users/fireflyc/Source/TempSource/learn-yarn/target/test/test-logDir-nm-0_0/application_1476095831541_0001/container_1476095831541_0001_01_000001/stdout 2>/Users/fireflyc/Source/TempSource/learn-yarn/target/test/test-logDir-nm-0_0/application_1476095831541_0001/container_1476095831541_0001_01_000001/stderr"

没错,这就是我们要执行的Linux命令。

test-logDir-nm-0_0,这个用来存放执行日志。打开application_xxxx/container_xxxxxx,看到了吗?stderr和stdout就在这里。

我们再看看一下进程

57837 57766 0 6:37下午 ?? 0:00.00 bash /Users/fireflyc/Source/TempSource/learn-yarn/target/test/test-localDir-nm-0_0/usercache/fireflyc/appcache/application_1476095831541_0001/container_1476095831541_0001_01_000001/default_container_executor.sh
57838 57837 0 6:37下午 ?? 0:00.01 /bin/bash -c /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java im.lsn.learnyarn.am.ApplicationMaster 1>/Users/fireflyc/Source/TempSource/learn-yarn/target/test/test-logDir-nm-0_0/application_1476095831541_0001/container_1476095831541_0001_01_000001/stdout 2>/Users/fireflyc/Source/TempSource/learn-yarn/target/test/test-logDir-nm-0_0/application_1476095831541_0001/container_1476095831541_0001_01_000001/stderr
57846 57838 0 6:37下午 ?? 0:00.18 /Library/Java/JavaVirtualMachines/jdk1.8.0_25.jdk/Contents/Home/bin/java im.lsn.learnyarn.am.ApplicationMaster

第一个数字是进程号,第二个数字是父进程号,我们的例子中57846就是ApplicationMaster;它是由57838创建,而57838则是由57837创建。 当NodeManager收到一个计算任务会生成一个脚本然后执行,脚本里面触发了一个新的bash进程,这个进程则会执行java im.lsn.learnyarn.am.ApplicationMaster。一共三个进程,而资源限制则是在第一个进程中(57846),为了便于导出标准输出和标准错误输出脚本执行了一个新的bash(57838),最后才是我们真正要执行的命令的进程(57837)。如果我们用docker作为executor,其实就是启动一个docker容器然后把脚本扔到容器里面跑。 一幅图来解释上面所有的过程,所有的有下家的三个淡蓝色的部分标识最终生成的三个进程。


跟我一起学 Hadoop YARN(二)
遗留问题

如果你仔细观察输出会发现Application的状态一直处于 ACCEPTED 状态,正常情况下应该处于 RUNNING 状态。如果想实现这个效果就必须修改ApplicationMaster,启动成功之后通知ResourceManager自己已经启动成功(注册)。

AMRMClientAsync<AMRMClient.ContainerRequest> resourceManager = AMRMClientAsync.createAMRMClientAsync(1000,
new ApplicationMasterCallback());
resourceManager.init(conf);
resourceManager.start();
RegisterApplicationMasterResponse registration = resourceManager.registerApplicationMaster("", -1, "");

首先构造一个ApplicationMaster和ResourceManager通讯的Client(AMRMClient),第一个参数是二者之间的心跳,这个心跳信息我们不必关系YARN会帮我们实现。第二个参数是一个Callback函数,当ResourceManager向ApplicationMaster发送消息的时候回触发这些事件,具体内容我们下一部分解释。 最后一句 registerApplicationMaster 需要传递三个参数,这三个参数组成了一个完整的HTTP访问地址(地址、端口、连接路径)。它就是出现在Hadoop Web界面中的 ApplicationMaster 链接。(还记得M/R或者Spark执行的时候通过点击这个可以进入到一个Web界面吗?没错这个就是Application自己提供的,后面我会带着大家一起做一个这样的界面) 还有一个问题是关于安全的,如果你把程序打包扔到集群上运行而你的环境刚好开启了kerberos,程序会执行失败。我们ApplicationMaster执行的时候需要和ResourceManager通讯,这个通讯过程必须经过kerberos认证和加密。所以我们必须把Token也带过去。 Hadoop引入了代理Token的做法,通常是存放在HDFS上的。所以我们必须在Client中去HDFS上拿到这个Token带到ApplicaitonMaster中

if (UserGroupInformation.isSecurityEnabled()) {
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
Credentials credentials = new Credentials();
fs.addDelegationTokens(tokenRenewer, credentials);
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
clc.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}

addDelegationTokens 这个方法非常诡异,它没有返回值其实是通过修改参数(credentials)的方式返回数据的。

写在最后

完整代码在github上,https://github.com/fireflyc/learnyarn/tree/v0.1

欢迎关注公众账号了解更多信息“写程序的康德――思考、批判、理性”


跟我一起学 Hadoop YARN(二)

本文数据库(综合)相关术语:系统安全软件

主题: HadoopLinuxJavaHDFSCPUWindowsSparkJVM其实UT
分页:12
转载请注明
本文标题:跟我一起学 Hadoop YARN(二)
本站链接:http://www.codesec.net/view/483344.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(31)