引言
KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧
正文
了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下
/**
* A keyed function that processes elements of a stream.
*
* <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is
* invoked. This can produce zero or more elements as output. Implementations can also query the
* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,
* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as
* output and register further timers.
*
* <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.
*
* <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link
* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link
* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown
* methods can be implemented. See {@link
* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
*/
上面简单来说就是以下四点
- Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
- 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
- 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
- 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面
processElement方法解析
- Flink会调用processElement方法处理输入流中的每一条数据
- KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
- 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据
onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务
以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用
实战简介
本次实战的目标是学习KeyedProcessFunction,内容如下:
- 监听本机7777端口读取字符串
- 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
- 将Tuple2实例集合通过f0字段分区,得到KeyedStream
- KeyedSteam通过自定义KeyedProcessFunction处理
- 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发
使用代码例子
首先定义pojo类
public class CountWithTimestampNew {
private String key;
private long count;
private long lastQuestTimestamp;
public long getAndIncrementCount() {
return ++count;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public long getLastQuestTimestamp() {
return lastQuestTimestamp;
}
public void setLastQuestTimestamp(long lastQuestTimestamp) {
this.lastQuestTimestamp = lastQuestTimestamp;
}
}
接着实现KeyedProcessFunction类
public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
private ValueState<CountWithTimestampNew> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));
}
// 实现数据处理逻辑的地方
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Tuple currentKey = ctx.getCurrentKey();
CountWithTimestampNew countWithTimestampNew = state.value();
if (countWithTimestampNew == null) {
countWithTimestampNew = new CountWithTimestampNew();
countWithTimestampNew.setKey(value.f0);
}
countWithTimestampNew.getAndIncrementCount();
//更新这个单词最后一次出现的时间
countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());
//单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象
state.update(countWithTimestampNew);
//给当前单词创建定时器,十秒后触发
long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;
//尝试注释掉看看是否还会触发onTimer方法
ctx.timerService().registerProcessingTimeTimer(timer);
//打印所有信息,用于确保数据准确性
System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",
currentKey.getField(0),
countWithTimestampNew.getCount(),
time(countWithTimestampNew.getLastQuestTimestamp()),
time(timer)
));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Tuple currentKey = ctx.getCurrentKey();
CountWithTimestampNew countWithTimestampNew = state.value();
//标记当前元素是否已经连续10s未出现
boolean isTimeout = false;
if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {
//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));
isTimeout = true;
}
//打印所有信息,用于确保数据准确性
System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",
currentKey.getField(0),
countWithTimestampNew.getCount(),
time(countWithTimestampNew.getLastQuestTimestamp()),
time(timestamp),
String.valueOf(isTimeout)
));
}
public static String time(long timeStamp) {
return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
}
}
最后是启动类
public class KeyedProcessFunctionDemo2 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度1
env.setParallelism(1);
// 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 监听本地9999端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);
// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
// 对收到的字符串用空格做分割,得到多个单词
.flatMap(new SplitterFlatMapFunction())
// 设置时间戳分配器,用当前时间作为时间戳
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
// 使用当前系统时间作为时间戳
return System.currentTimeMillis();
}
@Override
public Watermark getCurrentWatermark() {
// 本例不需要watermark,返回null
return null;
}
})
// 将单词作为key分区
.keyBy(0)
// 按单词分区后的数据,交给自定义KeyedProcessFunction处理
.process(new CountWithTimeoutKeyProcessFunctionNew());
// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
timeOutWord.print();
env.execute("ProcessFunction demo : KeyedProcessFunction");
}
}
演示
在启动服务前,先通过linux指令监听端口 nc -lk 7777
-
启动Flink服务后,往7777端口里面发送数据
-
通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
-
那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
-
再输入点其他的试试
-
通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
思考题
- open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
- registerProcessingTimeTimer和registerEventTimeTimer的差异是什么
参考资料
- https://blog.csdn.net/boling_cavalry/article/details/106299167
- https://blog.csdn.net/lujisen/article/details/105510532
- https://blog.csdn.net/qq_31866793/article/details/102831731