yarn logs -applicationId 命令 Java 版本简单实现
前言
有个需求是需要把将 yarn logs 日志获得然后在前端页面上显示出来。
我一开始是直接读取 /tmp/logs 下面 log 文件。读取出来排版有点丑。而且开头和结尾处有乱码。
花了大量时间再纠结乱码的去除。被折腾的不要不要的。
最后在 任我行的yarn logs -applicationId命令java版本简单实现 看到实现的可能性。
任我行代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| import java.io.DataInputStream; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.PrintStream;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
public class GetYarnLog { public static void main(String[] args) { run("application_1535700682133_0496"); } public static int run(String appIdStr) throws Throwable{ Configuration conf = new YarnConfiguration(); conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml")); conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml")); conf.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml")); if(appIdStr == null || appIdStr.equals("")) { System.out.println("appId is null!"); return -1; } PrintStream out=new PrintStream(appIdStr); ApplicationId appId = null; appId = ConverterUtils.toApplicationId(appIdStr); Path remoteRootLogDir = new Path(conf.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));
String user = UserGroupInformation.getCurrentUser().getShortUserName();; String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user, logDirSuffix); RemoteIterator<FileStatus> nodeFiles; try { Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir); nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir); } catch (FileNotFoundException fnf) { logDirNotExist(remoteAppLogDir.toString()); return -1; } boolean foundAnyLogs = false; while (nodeFiles.hasNext()) { FileStatus thisNodeFile = (FileStatus)nodeFiles.next(); if (!thisNodeFile.getPath().getName().endsWith(".tmp")) { System.out.println("NodeFileName = "+thisNodeFile.getPath().getName()); AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); try { AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); DataInputStream valueStream = reader.next(key); for (;;) { if (valueStream != null) { String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName(); out.println(containerString); out.println(StringUtils.repeat("=", containerString.length())); try { for (;;) { AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, out, thisNodeFile.getModificationTime()); foundAnyLogs = true; } } catch (EOFException eof) { key = new AggregatedLogFormat.LogKey(); valueStream = reader.next(key); } }else{ break; } } } finally { reader.close(); } } } if (!foundAnyLogs) { emptyLogDir(remoteAppLogDir.toString()); return -1; } return 0; } }
|
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| package com.jdb.bigdatams.util;
import java.io.*; import java.net.URI; import java.nio.charset.StandardCharsets;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
@Component public class YarnLogUtil {
@Value("${hadooprequest.nn1Address}") private String nn1;
@Value("${hadooprequest.nn2Address}") private String nn2;
@Value("${hadooprequest.nameservices}") private String HdfsNameservices;
public String readLog(String applicationId, String userName) throws Exception { Configuration conf=new Configuration(false); String nameservices = HdfsNameservices; String[] namenodesAddr = {nn1, nn2}; String[] namenodes = {"nn1","nn2"}; conf.set("fs.defaultFS", "hdfs://" + nameservices); conf.set("dfs.nameservices",nameservices); conf.set("dfs.ha.namenodes." + nameservices, namenodes[0]+","+namenodes[1]); conf.set("dfs.namenode.rpc-address." + nameservices + "." + namenodes[0], namenodesAddr[0]); conf.set("dfs.namenode.rpc-address." + nameservices + "." + namenodes[1], namenodesAddr[1]); conf.set("dfs.client.failover.proxy.provider." + nameservices,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); String hdfsRPCUrl = "hdfs://" + nameservices + ":" + 8020;
ByteArrayOutputStream os = new ByteArrayOutputStream(); PrintStream out = new PrintStream(os); try { FileSystem fs = FileSystem.get(new URI(hdfsRPCUrl), conf, userName); FileStatus[] paths = fs.listStatus(new Path("/tmp/logs/" + userName + "/logs/" + applicationId));
if (paths == null || paths.length==0) { throw new FileNotFoundException("Cannot access " + "/tmp/logs/" + userName + "/logs/" + applicationId + ": No such file or directory."); }
long sizeLength = 0; for (FileStatus fileStatus : paths) { sizeLength += fs.getContentSummary(fileStatus.getPath()).getLength(); }
if (sizeLength > 1024 * 1024 * 1024) { return "文件大于 1 G,请自行到集群上查看"; }
for (int i = 0 ; i < paths.length ; ++i) { Configuration yarnConfiguration = new YarnConfiguration();
yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/core-site.xml")); yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/yarn-site.xml")); yarnConfiguration.addResource(new Path("/etc/hadoop/conf.cloudera.yarn/hdfs-site.xml")); AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(yarnConfiguration, paths[i].getPath()); try { AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); DataInputStream valueStream = reader.next(key); for (;;) { if (valueStream != null) { String containerString = "\n\nContainer: " + key + " on " + paths[i].getPath().getName(); out.println(containerString); out.println(StringUtils.repeat("=", containerString.length()));
try { for (;;) { AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, out, paths[i].getModificationTime());
} } catch (EOFException eof) { key = new AggregatedLogFormat.LogKey(); valueStream = reader.next(key); } } else { break; } } } finally { reader.close(); } }
} catch (FileNotFoundException e) { throw e; } catch (Exception e) { e.printStackTrace(); } out.close(); return new String(os.toByteArray(), StandardCharsets.UTF_8); } }
|
遇到的问题
1. 权限问题
一开始遇到权限问题,尝试过在代码中解决,想以 application 的 appOwner 去读取。
没找到突破口。最后授权给运行的账号拥有 /tmp/logs 路径下的读权限解决。
1 2
| HDFS 开启了 ACL 权限 hadoop fs -setfacl -R -m default:user:hadoop:rwx /tmp/logs
|
开启 default ACL 的话后续生成的文件。hadoop 都有权限去读。HDFS ACL 权限管理
2. 输出 String
按 任我行的代码。logs 日志一直是输出到本地代码的源路径下。
最后仔细研读代码后发现是跟 PrintStream 打印流有关。
问题就变成如何把 PrintStream 转为 String 了
具体参考 PrintStream 里的内容转为 String
参考链接