0%

Flink WordCount 程序

Mac 上搭建 Flink 1.8.0 环境并构建运行简单 WordCount 简单程序入门


前言

18年关注到的 Flink。那是还懵懵懂懂参加了 Flink Forward in China 2018。

因岗位为 SRE 和公司暂无需要。一直没有认真入门过。

这次打算作为 19 年下半年计划正式入坑学习。

开始前建议先阅读下云邪入门教程 5分钟从零构建第一个 Flink 应用

准备工作

Flink 可以运行在 Linux、Mac 和 Windows 上,唯一的要求就是必须安装 Java 8 或以上版本。

可以通过发出以下命令来检查Java的正确安装

1
java -version

有 Java 8,输出将如下所示

1
2
3
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

Java 安装后去官网下载 Flink,然后解压即可运行。这里以 flink-1.8.0 为例。

1
2
3
cd ~/Downloads/
tar xzf flink-1.8.0-bin-scala_2.12.tgz
cd flink-1.8.0

配置全局变量。FLINK_HOME 写你自己的 Flink 路径

1
2
3
4
5
vim ~/.bash_profile

# Flink HOME
FLINK_HOME=/Users/tu/Public/SoftWare/flink-1.8.0
export PATH=$FLINK_HOME/bin:$PATH

对于 MacOS 用户,可以选择通过 homebrew 安装 Flink。一键安装就可以用,不需要配置全局变量

1
brew install apache-flink

检查安装

1
2
$ flink --version
Version: 1.8.0, Commit ID: 4caec0d

在 Flink 目录下运行以下命令即可启动

1
2
3
4
[16:29:35] tu flink-1.8.0 $ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lihuimindeMacBook-Pro.local.
Starting taskexecutor daemon on host lihuimindeMacBook-Pro.local.

接着可以打开浏览器访问 http://localhost:8081/ 查看

通过 jps 可以看到多出来两个JVM进程,运行的主类

1
2
3
4
[16:29:45] tu bin $ jps -l
9169 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
9702 sun.tools.jps.Jps
9658 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

开发

集群启动后就可以开发 Flink 程序

使用 IDEA 新建一个 maven 项目

如果没有看到 flink-quickstart-java 的话通过右上角的 Add Archetype 按钮来添加。

创建一个 SocketTextStreamWordCount Java 文件,加入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package cn.lihm.examples.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* @author lihm
* @date 2019-07-23 16:18
* @description TODO
*/

public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
hostname = params.get("hostname");
} catch (Exception e) {
System.err.println("USAGE: Please run 'SocketWindowWordCount --hostname <hostname> --port <port>'");
return;
}
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取数据
DataStreamSource<String> stream = env.socketTextStream(hostname, port);

// 计数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);

sum.print();

env.execute("Java WordCount from SocketTextStream Example");
}

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");

for (String token: tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}

官网还有 Scala 版本的。自己学习尝试即可。这里不介绍了

运行

接着进入工程目录,使用以下命令打包。

mvn clean package -Dmaven.test.skip=true

然后开启监听 9000 端口

nc -l 9000

最后进入 flink 安装目录 bin 下执行以下命令提交job任务。注意换成你自己项目的路径

1
flink run -c cn.lihm.examples.streaming.SocketTextStreamWordCount flink-learning-examples-1.0-SNAPSHOT.jar --port 9000 --hostname 127.0.0.1

执行完上述命令后,可以在 webUI 中看到正在运行的程序

可以在 nc 监听端口中输入 text

1
2
3
4
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye

该任务的输出在 flink 家目录下 log 下以 .out 结尾的文件下。
通过 tail 命令看一下输出的文件,来观察统计结果。

最后测试完可以关闭集群

1
./bin/stop-cluster.sh

总结

整个过程下来还是有些成就感的。提高了自己动手能力。


参考链接