Mac 上搭建 Flink 1.8.0 环境并构建运行简单 WordCount 简单程序入门
前言 18年关注到的 Flink。那是还懵懵懂懂参加了 Flink Forward in China 2018。
因岗位为 SRE 和公司暂无需要。一直没有认真入门过。
这次打算作为 19 年下半年计划正式入坑学习。
开始前建议先阅读下云邪入门教程 5分钟从零构建第一个 Flink 应用
准备工作 Flink 可以运行在 Linux、Mac 和 Windows 上,唯一的要求就是必须安装 Java 8 或以上版本。
可以通过发出以下命令来检查Java的正确安装
有 Java 8,输出将如下所示
1 2 3 java version "1.8.0_201" Java(TM) SE Runtime Environment (build 1.8.0_201-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
Java 安装后去官网 下载 Flink,然后解压即可运行。这里以 flink-1.8.0 为例。
1 2 3 cd ~/Downloads/ tar xzf flink-1.8.0-bin-scala_2.12.tgz cd flink-1.8.0
配置全局变量。FLINK_HOME 写你自己的 Flink 路径
1 2 3 4 5 vim ~/.bash_profile # Flink HOME FLINK_HOME=/Users/tu/Public/SoftWare/flink-1.8.0 export PATH=$FLINK_HOME/bin:$PATH
对于 MacOS 用户,可以选择通过 homebrew 安装 Flink。一键安装就可以用,不需要配置全局变量
1 brew install apache-flink
检查安装
1 2 $ flink --version Version: 1.8.0, Commit ID: 4caec0d
在 Flink 目录下运行以下命令即可启动
1 2 3 4 [16:29:35] tu flink-1.8.0 $ ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host lihuimindeMacBook-Pro.local. Starting taskexecutor daemon on host lihuimindeMacBook-Pro.local.
接着可以打开浏览器访问 http://localhost:8081/ 查看
通过 jps 可以看到多出来两个JVM进程,运行的主类
1 2 3 4 [16:29:45] tu bin $ jps -l 9169 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint 9702 sun.tools.jps.Jps 9658 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
开发 集群启动后就可以开发 Flink 程序
使用 IDEA 新建一个 maven 项目
如果没有看到 flink-quickstart-java 的话通过右上角的 Add Archetype 按钮来添加。
创建一个 SocketTextStreamWordCount Java 文件,加入以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package cn.lihm.examples.streaming;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class SocketTextStreamWordCount { public static void main (String[] args) throws Exception { final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); port = params.getInt("port" ); hostname = params.get("hostname" ); } catch (Exception e) { System.err.println("USAGE: Please run 'SocketWindowWordCount --hostname <hostname> --port <port>'" ); return ; } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> stream = env.socketTextStream(hostname, port); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter()) .keyBy(0 ) .sum(1 ); sum.print(); env.execute("Java WordCount from SocketTextStream Example" ); } public static final class LineSplitter implements FlatMapFunction <String , Tuple2 <String , Integer >> { @Override public void flatMap (String s, Collector<Tuple2<String, Integer>> collector) { String[] tokens = s.toLowerCase().split("\\W+" ); for (String token: tokens) { if (token.length() > 0 ) { collector.collect(new Tuple2<String, Integer>(token, 1 )); } } } } }
官网还有 Scala 版本的。自己学习尝试即可。这里不介绍了
运行 接着进入工程目录,使用以下命令打包。
mvn clean package -Dmaven.test.skip=true
然后开启监听 9000 端口
nc -l 9000
最后进入 flink 安装目录 bin 下执行以下命令提交job任务。注意换成你自己项目的路径
1 flink run -c cn.lihm.examples.streaming.SocketTextStreamWordCount flink-learning-examples-1.0-SNAPSHOT.jar --port 9000 --hostname 127.0.0.1
执行完上述命令后,可以在 webUI 中看到正在运行的程序
可以在 nc 监听端口中输入 text
1 2 3 4 $ nc -l 9000 lorem ipsum ipsum ipsum ipsum bye
该任务的输出在 flink 家目录下 log 下以 .out 结尾的文件下。 通过 tail 命令看一下输出的文件,来观察统计结果。
最后测试完可以关闭集群
总结 整个过程下来还是有些成就感的。提高了自己动手能力。
参考链接