02. Flink 快速上手
1、创建项目导入依赖
pom文件:
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
2、需求
批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。
(1)数据准备
在工程目录下新建文件夹input,新建文本words.txt。
文件输入:
hello world
hello flink
hello java
2.1 批处理
代码编写(使用DataSet API实现)
package com.company.onedayflink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkBatchWords {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2、从文件中读取数据
DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");
// 3、切分、转换
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
*
* @param value 读取到的输入
* @param out 返回的内容,Tuple2是一个二元分组,(字符串,个数)。
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 3.1 切分
for (String s : value.split(" ")) {
// 3.2 将单组转为二元组
Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
// 3.3 将二元组发送给下游
out.collect(tuple);
}
}
});
// 4、按照 word 分组
UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词
// 5、各分组聚合
AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数
// 6、输出
sum.print();
}
}
运行结果:
2.2 流处理
2.2.1 有界流
代码编写(使用DataStream API实现,读取文件属于有界流)
package com.company.onedayflink.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
@Slf4j
public class FlinkStreamWords {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、从文件中读取数据
DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");
// 3、处理数据(切换、转换、分组、聚合)
// 3.1 切换、转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String s : value.split(" ")) {
// 构建二元组
Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
// 通过采集器向下游发送数据
out.collect(tuple);
}
}
});
// 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型
KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne
.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素
// 3.3 聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1); // 1 表示二元组的第二个元素
// 4、输出数据
sum.print();
// 5、执行
env.execute();
}
}
执行结果:
2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)
前面的编号是并行度,线程数。
2.2.2 无界流
(1)使用 netcat 监听7777端口,建立stream流
安装 netcat
brew install netcat
监听 7777 端口
nc -lk 7777
(2)代码编写(使用DataStream API实现,读取stream流属于无界流)
package com.company.onedayflink.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlinkSteamSocketWords {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)
DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);
// 3、数据处理(切割、转换、分组、聚合)
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
// 3.1 切分
for (String s : value.split(" ")) {
// 3.2 将单组转为二元组
Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
// 3.3 将二元组发送给下游
out.collect(tuple);
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// 4、输出
sum.print();
// 5、执行
env.execute();
}
}
(3)测试
在终端发送消息
hello flink
hello world
观察程序控制台打印
8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)