前言
在flink程序开发或者调试过程中,每次部署到集群上都需要不断打包部署,其实是比较麻烦的事情,其实flink一直就提供了一种比较好的方式使得开发同学不用部署就可以观察到flink执行情况。
上代码
第一步:开发之前需要引入在本机支持相关的包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
第二步:
其实只要在生成环境的时候增加webUI部分
Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
//使用配置
StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
当我们跑起来的时候,我们在浏览器上面输入http://localhost:8081/ 就可以访问:
下面是我的Idea程序:
当然:很多小伙伴肯定想速度操作一下,咱讲究一个服务到位,示例代码也给出来:
public class WordCountStreamUnboundedDemo {
public static void main(String[] args) throws Exception {
//StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
//使用配置
StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
DataStreamSource<String> inputDS = env.addSource(new ClickParallelSource());
inputDS.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (text, collector) -> {
String[] words=text.split("\\s+");
for ( String word:words){
collector.collect(Tuple2.of(word,1));
}
}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy((KeySelector<Tuple2<String,Integer>,String>) entry -> entry.f0)
.sum(1).print();
env.execute();
}
后记
其实做这个事情是因为前文Docker部署Flink的关系,我注意到各个环节上侧重的事情不同,开发环境对我们理解流的一些设计思想很有用,还有各种参数并行的调试都非常有帮助,奈何没有直观可见的东西,有了开发环境的UI界面,丝滑了N个数量级了。