0%

监控 Yarn 上作业状态

监控 yarn 上 spark 或者 mr 应用的存活状态


前言

有开发人员有疑惑想监控 yarn 上 spark 或者 mr 应用的存活状态。

实际通过 Yarn 提供的 API 即可做到

实操

pom.xml 文件,添加相关的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-cdh6.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>3.0.0-cdh6.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.0.0-cdh6.3.1</version>
</dependency>
</dependencies>

具体实现代码其实很简单就是,通过 yarnclient 获取 resourcemanager 上 Application 状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;

public class client {

public static ApplicationReport getAppReport(String applictionName) {
Configuration conf = new YarnConfiguration();
// 修改为 RM 的地址
conf.set("yarn.resourcemanager.address", "master-240:8032");
conf.set("yarn.resourcemanager.admin.address", "master-240:8033");
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);
if (appStates.isEmpty()) {
appStates.add(YarnApplicationState.RUNNING);
appStates.add(YarnApplicationState.ACCEPTED);
appStates.add(YarnApplicationState.SUBMITTED);
appStates.add(YarnApplicationState.FINISHED);
appStates.add(YarnApplicationState.FAILED);
appStates.add(YarnApplicationState.KILLED);
}
List<ApplicationReport> appsReports = null;
try {
// 如果不指定YarnApplicationState的话就搜索所有状态 Application
//appsReports = yarnClient.getApplications(appStates);
appsReports = yarnClient.getApplications();
// appsReports.sort(new Comparator<ApplicationReport>() {
// @Override
// public int compare(ApplicationReport o1, ApplicationReport o2) {
// return -o1.getApplicationId().compareTo(o2.getApplicationId());
// }
// });
// appsReports.sort((ApplicationReport o1, ApplicationReport o2) -> -(o1.getApplicationId().compareTo(o2.getApplicationId())));
// Collections.sort(appsReports, Comparator.comparing(ApplicationReport::getApplicationId).reversed());
// 反转 applicationId
Comparator<ApplicationReport> comp = (ApplicationReport o1, ApplicationReport o2) -> o1.getApplicationId().compareTo(o2.getApplicationId());
appsReports.sort(comp.reversed());
} catch (YarnException | IOException e) {
e.printStackTrace();
}

assert appsReports != null;
ApplicationReport aimAppReport = null;
for (ApplicationReport appReport : appsReports) {
// 获取任务
String appName = appReport.getName();
if (appName.equals(applictionName)) {
aimAppReport = appReport;
break;
}
}

yarnClient.stop();
return aimAppReport;
}

public static void main(String[] args){
ApplicationReport applicationReport = getAppReport("Spark shell");
System.out.println("ApplicationId ============> "+ applicationReport.getApplicationId());
System.out.println("YarnApplicationState ============> "+ applicationReport.getYarnApplicationState());
System.out.println("name ============> "+ applicationReport.getName());
System.out.println("queue ============> "+ applicationReport.getQueue());
System.out.println("queue ============> "+ applicationReport.getUser());
}
}

这样,我们通过app name字段可以获取到存活的 spark 等任务

客户端连接还可以将 yarn-site.xml 放到 resources 目录下

1
2
3
Configuration conf = new YarnConfiguration();
// 这里将会加载 yarn 配置,配置地址如果是 hostname 的话,要在 client 机器上 hosts 文件配置好 host ip 映射
conf.addResource(new Path("src/main/resources/yarn-site.xml"));

如果是高可用配置的话还可以配置参数的形式访问

1
2
3
4
5
6
Configuration conf = new YarnConfiguration();
conf.set("yarn.resourcemanager.ha.enabled", "true");
// 下方三个参数每个 yarn 集群不一样,可以从 yarn-site.xml 中寻找
conf.set("yarn.resourcemanager.ha.rm-ids", "rm61,rm54");
conf.set("yarn.resourcemanager.address.rm54", "192.168.1.229:8032");
conf.set("yarn.resourcemanager.address.rm61", "192.168.1.228:8032");

参考链接