在学习了Flink之后,笔者通过以下案例对Flink API 进行简单复习
目录
案例要求
前置准备
编写主程序(点此跳转至代码)
运行截图
案例要求
以下数据 为某网站的访问日志 现要求通过以下数据 统计出最近10s内最热门的N个页面(即url链接),并且每5s更新一次;即求出页面访问量中的TOP N
{user='Alice', url='./home', time=1679043205254}
{user='Alice', url='./prod?id=1', time=1679043206254}
{user='Mary', url='./prod?id=2', time=1679043207256}
{user='Cary', url='./fav', time=1679043208257}
{user='Mary', url='./home', time=1679043209258}
{user='Cary', url='./prod?id=2', time=1679043210259}
......
前置准备
为方便编写程序 我们把以上数据封装为Event类
Event.class
package com.flink.wc.myflink.bean;
public class Event {
public String user;
public String url;
public long time;
public Event() {
}
public Event(String user, String url, long time) {
this.user = user;
this.url = url;
this.time = time;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", time=" + time +
'}';
}
}
同时我们需要自定义源算子 模拟网站实时访问日志
ClickSource.class
package com.flink.wc.myflink.source_sink;
import com.flink.wc.myflink.bean.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
String[] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home", "./cart","./fav", "./prod?id=1","./prod?id=2"};
while (running) {
ctx.collect(new Event(
users[random.nextInt(users.length)], // user 和 url 随机组合
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis() //getTimeInMillis 方法返回当前时间
));
// 在这个循环中 每隔一秒 就collect(发送)一个数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
为方便程序输出,我们把输出结果封装为UrlViewCount类
UrlViewCount.class
package com.flink.wc.myflink.bean;
public class UrlViewCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
this.url = url;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "UrlViewCount{" +
"url='" + url + '\'' +
", count=" + count +
", windowStart=" + windowStart +
", windowEnd=" + windowEnd +
'}';
}
}
做好了以上前置工作 下面我们开始编写主程序 求y页面pv的TOP N
编写主程序
大致分为以下三个步骤:
读取源数据 设置水位线
首先通过url进行聚合统计
再根据聚合统计的结果进行排序 求出Top N
详细步骤如下:
- 读取数据 设置水位线
- 根据url进行keyby分组
- 设置窗口大小为10s 每5s 执行一次 设置窗口函数 统计各url被访问量
- 通过 AggregateFunction 进行聚合统计
- 通过 ProcessWindowFunction 输出聚合统计的结果 和 窗口信息 以便后续计算
- 根据窗口时间进行keyby分组
- 在窗口范围内对各数据进行排序
- 来一个数据就把数据存储在状态变量中 (要利用到之前的数据进行排序 因此要用到状态变量)
- 来一个数据就根据windowsEnd 注册定时器 (根据定时器的特性 同一个key的同一个时间戳不会被重复注册 因此这里相同的windowsEndtime只会只执行一次定时器,即一个窗口只会执行一次定时器)
- 触发定时器
- 触发定时器则说明在该水位线之前的数据已经全部到达 全部存储在了状态变量中
- 把状态变量列表中的值全部赋值给java中List列表 进行排序
- 将排序后的结果输出 out.collect()
代码如下:
TopnTest.class
package com.flink.wc.myflink.process;
// 这三个是自定义的java类
import com.flink.wc.myflink.bean.Event;
import com.flink.wc.myflink.bean.UrlViewCount;
import com.flink.wc.myflink.source_sink.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
public class TopnTest {
public static void main(String[] args) throws Exception {
// step1 读取数据 并设置水位线
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> stream01 = streamSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (x, l) -> x.time)
);
// step2 聚合统计 每个url的访问量
SingleOutputStreamOperator<UrlViewCount> stream02 = stream01
.keyBy(x -> x.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
stream02.print("stream02");
// step3 合并排序各个url的访问量 计算出Top N
stream02.keyBy(x -> x.windowEnd)
.process(new TopN(2))
.print();
env.execute();
}
// 累加器 AggregateFunction<IN, ACC, OUT> extends Function, Serializable
private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong; // 返回UrlViewCountAgg 的结果 这里是url的计数count
}
@Override
public Long merge(Long aLong, Long acc1) {
return null;
}
}
// 增量聚合函数的 getResult作为 全窗口函数的输入
// just 为了输出窗口信息 ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction
private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
// 这里的迭代器对象 Iterable<Long> 就是增量聚合函数UrlViewCountAgg 中累加器聚合的结果,即每个url的count
public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
// 这里的迭代器只有一个元素 就是聚合函数中增量聚合的结果
out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
}
}
private static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
private Integer n;
private ListState<UrlViewCount> urlViewCountListState;
public TopN(Integer n){
this.n = n;
}
@Override
public void open(Configuration parmeters) throws Exception{
// 从环境中获取列表状态句柄
urlViewCountListState =
getRuntimeContext()
.getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
}
@Override
public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
// 来一个数据就加入状态列表中 UrlViewCount{url='./prod?id=2', count=3, windowStart=1678676045000, windowEnd=1678676055000}
// System.out.println("value: " + value);
urlViewCountListState.add(value);
// 这个key是windowEnd 同一个窗口end 说明在同一个窗口 就在这个窗口排序 同一个定时器 过了时间windowEnd+1就执行
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
// System.out.println("ctx.getCurrentKey():" + ctx.getCurrentKey());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for (UrlViewCount urlViewCount : urlViewCountListState.get()){
urlViewCountArrayList.add(urlViewCount);
}
urlViewCountListState.clear(); // 一个窗口内的数据已经全部赋值给了列表 把这个状态列表清空 让他去装其他窗口的数据(状态)
// 接下来就对该窗口的列表进行排序咯
// 也可以写为lambda表达式
urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return o2.count.intValue() - o1.count.intValue();
}
});
// System.out.println("urlViewCountArrayList:" + urlViewCountArrayList);
StringBuilder result = new StringBuilder();
result.append("=================================================\n");
result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < this.n; i=i+1){ // 这个循环 关于i 有点问题!!!
UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);
String info ="No." + (i +1) + " "
+ "url:" + UrlViewCount.url + " "
+"浏览量:" + UrlViewCount.count + "\n";
result.append(info);
}
result.append("=================================================\n");
out.collect(result.toString());
}
}
}