Flink时间语义和时间窗口

前言

在实际的流计算业务场景中,我们会发现,数据和数据的计算往往都和时间具有相关性。

举几个例子:

  • 直播间右上角通常会显示观看直播的人数,并且这个数字每隔一段时间就会更新一次,比如10秒。
  • 电商平台的商品列表,会显示商品过去24小时的销量、或者总销量
  • 阅读CSDN博客会显示总的阅读量,并且会持续更新

归纳总结可以发现,这些和时间相关的数据计算可以统一用一个计算模型来描述:每隔一段时间,计算过去一段时间内的数据,并输出结果。这个计算模型,就是时间窗口。

时间窗口类型

时间窗口计算模型具备三个重要的属性:

  • 时间窗口的计算频次,即 隔多久计算一次
  • 时间窗口的大小,即 计算过去多久的数据
  • 时间窗口内数据的处理逻辑

举例来说,每隔1分钟计算商品过去24小时的销量。时间窗口的计算频次就是1分钟,时间窗口的大小是24小时,窗口数据的处理逻辑是 对商品销量求和。

Flink 提供了三种时间窗口的类型

滚动窗口(Tumble Window)

滚动窗口的特点是:时间窗口大小和计算频次相同!

顾名思义,滚动窗口就像一个车轮一样滚滚向前,因为窗口大小和计算频次相同,所以窗口是紧密相连的,窗口内的数据不会重复计算。

举个例子,每隔1分钟计算商品过去1分钟的销量。

如下示例程序,每隔5秒计算过去5秒的订单销售额:

public class TumblingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<Order>() {
                    @Override
                    public void run(SourceContext<Order> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            Order order = Order.mock();
                            sourceContext.collectWithTimestamp(order, order.createTime);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(i -> i.itemId)
                .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5L)))
                .sum("orderAmount")
                .print();
        environment.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public String itemId;
        public long orderAmount;
        public long createTime;

        static Order mock() {
            return new Order("001", ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());
        }
    }
}

这里采用滚动窗口计算模型,窗口大小和计算频次均是5秒,运行作业后,控制台会每隔5秒输出一次总销售额

1> TumblingWindow.Order(itemId=001, orderAmount=250, createTime=1722344630342)
1> TumblingWindow.Order(itemId=001, orderAmount=270, createTime=1722344635388)
1> TumblingWindow.Order(itemId=001, orderAmount=147, createTime=1722344640407)
1> TumblingWindow.Order(itemId=001, orderAmount=253, createTime=1722344645430)
......

滑动窗口(Sliding Window)

滑动窗口的特点是:时间窗口大小和计算频次不相同,如果窗口大小大于计算频次,就会导致数据被重复计算;如果窗口大小小于计算频次,就会导致数据被漏计算;如果二者相等,那就是滚动窗口了。

举个例子,每隔1分钟计算商品过去1小时的销量。窗口大小为1小时,计算频次为1分钟,因此数据会被重复计算多次。

如下示例程序,每隔1秒计算过去5秒的订单销售额,部分订单会被重复计算多次:

public class SlidingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<TumblingWindow.Order>() {
                    @Override
                    public void run(SourceContext<TumblingWindow.Order> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            TumblingWindow.Order order = TumblingWindow.Order.mock();
                            sourceContext.collectWithTimestamp(order, order.createTime);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(i -> i.itemId)
                .window(SlidingEventTimeWindows.of(Duration.ofSeconds(5L), Duration.ofSeconds(1L)))
                .sum("orderAmount")
                .print();
        environment.execute();
    }
}

作业运行后,控制台每秒会输出一次过去5秒的销售额。

会话窗口(Session Window)

会话窗口的窗口大小和计算频次非常灵活,可以动态改变,每次都不一样。当窗口隔一段时间没有接收到新的数据,Flink就认为会话可以关闭并计算了,等下一次有新的数据进来,就会开启一个新的会话。这里的“隔一段时间”就是值会话窗口的间隔(Gap),这个间隔可以固定设置也可以动态设置。

举个例子,读书类APP都会有的一个功能,就是统计用户的阅读时长。用户必须有持续的动作,APP才会认为用户是真的在阅读,反之用户长时间没有操作,APP会认为用户已经离开,此时不会再统计阅读时长。

如下示例,随机5秒内模拟一次用户行为,会话窗口间隔设置为3秒,超过3秒认为用户离开,关闭窗口并统计用户阅读时长。

public class SessionWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<UserAction>() {
                    @Override
                    public void run(SourceContext<UserAction> ctx) throws Exception {
                        while (true) {
                            UserAction userAction = UserAction.mock();
                            ctx.collectWithTimestamp(userAction, userAction.time);
                            ctx.emitWatermark(new Watermark(System.currentTimeMillis()));
                            // 随机5秒内 用户才会有新的操作
                            Threads.sleep(ThreadLocalRandom.current().nextLong(0L, 5000L));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                })
                // 超过三秒没有收到用户新的动作,认为用户离开,关闭窗口并计算
                .windowAll(EventTimeSessionWindows.withGap(Duration.ofSeconds(3L)))
                .aggregate(new AggregateFunction<UserAction, UserReadingTime, UserReadingTime>() {
                    @Override
                    public UserReadingTime createAccumulator() {
                        return new UserReadingTime();
                    }

                    @Override
                    public UserReadingTime add(UserAction userAction, UserReadingTime userReadingTime) {
                        // 记录窗口内的用户阅读开始和结束时间
                        userReadingTime.userId = userAction.userId;
                        if (userReadingTime.startTime == 0L) {
                            userReadingTime.startTime = userAction.time;
                        }
                        userReadingTime.endTime = userAction.time;
                        return userReadingTime;
                    }

                    @Override
                    public UserReadingTime getResult(UserReadingTime userReadingTime) {
                        return userReadingTime;
                    }

                    @Override
                    public UserReadingTime merge(UserReadingTime userReadingTime, UserReadingTime acc1) {
                        return null;
                    }
                }).addSink(new SinkFunction<UserReadingTime>() {
                    @Override
                    public void invoke(UserReadingTime value, Context context) throws Exception {
                        System.err.println("用户" + value.userId + " 阅读了 " + (value.endTime - value.startTime) + " ms");
                    }
                });
        environment.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserAction {
        public Long userId;
        public long time;

        public static UserAction mock() {
            return new UserAction(1L, System.currentTimeMillis());
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserReadingTime {
        public Long userId;
        public long startTime;
        public long endTime;
    }
}

运行Flink作业,控制台随机输出用户的阅读时长

用户1 阅读了 3240 ms
用户1 阅读了 9414 ms
用户1 阅读了 138 ms
用户1 阅读了 2960 ms

时间语义

时间语义和时间窗口息息相关。

Flink 提供了三种不同的时间语义,分别是:处理时间、事件时间、摄入时间。

在不同的时间语义下,针对同样的数据,Flink 分配的时间窗口是不一样的。

举个例子,我们要统计某个商品过去1分钟的销量,这是个典型的一分钟大小的时间窗口。用户在 09:00:50 下了一笔订单,中间由于网络延时等原因,Flink 在 09:01:01 才收到这笔订单数据,恰巧此时 Flink 因为自身作业压力宕机崩溃,在 09:02:10 才恢复作业,该笔订单数据随即被 keyBy 分组发送给下游算子处理。

这个例子中的三个时间点,刚好对应了 Flink 的三种时间语义:

  • 事件时间:事件发生的时间,通常数据本身会携带一个时间戳,即例子中的 09:00:50
  • 摄入时间:Flink 数据源接收数据的subTask算子本地时间,即例子中的 09:01:01
  • 处理时间:Flink 算子处理数据的机器本地时间,即例子中的 09:02:10

事件时间

事件时间是最常用的,在事件时间语义下,数据本身通常会携带一个时间戳,Flink 会根据该时间戳为数据分配正确的时间窗口。

因为事件时间是不会改变的,所以在事件时间语义下,Flink 窗口计算的结果始终是一致的,数据是清晰明确的。

但是,事件时间语义 会带来另一个问题。事件的产生是顺序的,但是数据在传输过程中,可能会因为网络拥塞等种种原因,到达 Flink 时乱序了。此时,Flink 如何处理这些乱序数据就是个麻烦事儿了。

举个例子,还是统计商品过去1分钟的销量,Flink 先是接收到事件时间为 09:00:30 的订单数据,此时将其分配到 [09:00,09:01] 窗口缓存起来,接着接收到了 09:01:30 的订单数据,此时 [09:00,09:01] 窗口可以关闭并计算了吗?显然不能,因为数据乱序到达的原因,谁也不能保证 Flink 待会不会收到 09:00 分钟产生的订单。

那怎么办呢?[09:00,09:01] 窗口总不能一直不关闭吧。为了解决这个问题,Flink 引入了 Watermark 机制,这里不做介绍。

使用事件时间对应的窗口分配器是:

  • TumblingEventTimeWindows 基于事件时间的滚动窗口
  • SlidingEventTimeWindows 基于事件时间的滑动窗口
  • EventTimeSessionWindows 基于事件时间的会话窗口

如下示例,每秒生成一个带时间戳的随机数,数据用 Flink 自带的 Tuple2 封装,同时用 TumblingEventTimeWindows 让 Flink 基于事件时间语义来分配 5秒 的滚动窗口。运行 Flink 作业,控制台每隔5秒会输出前5秒的随机数之和。

public class TumblingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.addSource(new SourceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public void run(SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            // f0是随机数 f1是时间戳
                            Tuple2<Long, Long> tuple2 = new Tuple2<>(ThreadLocalRandom.current().nextLong(100), System.currentTimeMillis());
                            sourceContext.collectWithTimestamp(tuple2, tuple2.f1);
                            sourceContext.emitWatermark(new Watermark(System.currentTimeMillis()));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).windowAll(TumblingEventTimeWindows.of(Duration.ofSeconds(5L)))
                .sum(0)
                .print();
        environment.execute();
    }
}

控制台输出

// subTask任务ID 数字和 时间戳
19> (108,1722432788302)
20> (308,1722432790305)
21> (324,1722432795346)

总结一下,如果业务要按照事件发生的时间计算结果或分析数据,那么只能选事件时间语义。通常情况下,事件时间也确实更有价值。例如,利用Flink分析用户的行为日志,用户具体在什么时间点做了哪些行为,会更有分析价值,至于 Flink 是什么时候处理这些日志的,对业务方来说并不重要。因为事件时间具有不变性,所以基于事件时间统计的结果总是清晰明确的,缺点是数据到达Flink是乱序的,处理迟到数据会给Flink带来一定的压力。

摄入时间

摄入时间是指数据到达 Flink Source 算子的本地机器时间,它为处理数据流提供了一种相对简单而直观的时间参考,算是在 事件时间 和 处理时间 中间做了一个折中。

摄入时间具备一定的优势。一方面,它避免了事件时间的乱序问题,相较于事件时间具备更高的处理效率;另一方面,相较于处理时间而言,它具备不变性,计算产生的结果也会更加准确。

摄入时间适用于那些对时间精度要求不是特别高,但又希望时间能够相对反映数据进入系统先后顺序的场景。

如下示例,使用摄入时间语义计算过去5秒窗口生成的随机数之和。因为用的是摄入时间,所以无须发送 Watermark,数据本身也无须携带时间戳。

public class IngestionTimeFeature {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 采用摄入时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        environment.addSource(new SourceFunction<Tuple1<Long>>() {
                    @Override
                    public void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).keyBy(IN -> "all").timeWindow(Time.of(5L, TimeUnit.SECONDS))
                .sum(0)
                .print();
        environment.execute();
    }
}

处理时间

处理时间语义是指数据实际被处理的时间,也就是数据到达Window算子时subTask机器的本地时间。

因为 处理时间语义 完全依靠算子的机器本地时间,所以时间窗口在划分数据和触发计算,都只需要依靠本地时间来驱动,性能是最好的,延迟低,适用于对高性能和延迟敏感的业务。

同样的,处理时间语义也有它的劣势。因为采用的是subTask算子的本地时间,所以数据的时间其实是具备不确定性的。举个例子,订单数据在 09:00:01 被算子接收,它会被分配到 [09:00,09:01]窗口,假设此时该subTask作业故障宕机,等到 09:10:00 才恢复,Flink 重新消费这条数据,它又会被分配到 [09:10,09:11] 窗口,产出的数据就会不一致。因此在使用处理时间语义时,要保证业务方能接受这种因为异常情况导致的计算结果不符合预期的场景。

如下示例,采用处理时间语义,因为是采用subTask本地时间,所以同样也不需要发送 Watermark。

public class ProcessTimeFeature {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 采用处理时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        environment.addSource(new SourceFunction<Tuple1<Long>>() {
                    @Override
                    public void run(SourceContext<Tuple1<Long>> sourceContext) throws Exception {
                        while (true) {
                            Threads.sleep(1000);
                            sourceContext.collect(new Tuple1<>(ThreadLocalRandom.current().nextLong(100)));
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                }).windowAll(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5L)))
                .sum(0)
                .print();
        environment.execute();
    }
}

尾巴

Flink 具有丰富的时间语义,包括事件时间、处理时间和摄入时间。事件时间基于数据本身携带的时间戳,处理时间基于系统处理数据的本地时钟,摄入时间则是数据进入 Flink Source算子的时间。

时间窗口是 Flink 处理流式数据的重要方式,Flink 提供了 滚动窗口、滑动窗口、会话窗口 三种窗口类型。滚动窗口有固定大小且不重叠,滑动窗口大小固定且可重叠,会话窗口根据数据间隔来划分。合理选择时间语义和时间窗口,能更准确有效地处理和分析流式数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/894909.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【大数据技术基础 | 实验一】配置SSH免密登录

文章目录 一、实验目的二、实验要求三、实验原理&#xff08;一&#xff09;大数据实验一体机&#xff08;二&#xff09;SSH免密认证 四、实验环境五、实验内容和步骤&#xff08;一&#xff09;搭建集群服务器&#xff08;二&#xff09;添加域名映射&#xff08;三&#xff…

基于SpringBoot+Vue+MySQL的智慧博物馆管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着信息技术的飞速发展&#xff0c;智慧化已成为博物馆发展的新趋势。然而&#xff0c;当前许多博物馆仍面临着预约困难、参观体验不佳等问题&#xff0c;严重影响了博物馆的服务质量和公众形象。传统的预约和票务管理方式已难…

mac安装brew时踩坑解决方案

安装包 mac上如果按照git等工具可能会使用brew&#xff0c;例如使用&#xff1a;$ brew install git命令&#xff0c;如果电脑没有按照brew&#xff0c;则会提示&#xff1a;zsh: command not found: brew 解决方案 需要我们打开brew的官网https://brew.sh/&#xff0c;复制…

C语言 | Leetcode C语言题解之第476题数字的补数

题目&#xff1a; 题解&#xff1a; class Solution { public:int findComplement(int num) {int pos;for (int i 30; i > 0; i--) {if (num & (1 << i)) {pos i;break;}}return (((1LL << (pos 1)) - 1) ^ (num));} };

mysql的重置

今天用Navicat16去连接mysql突然就连不上了。一直报错 连接本地mysql时出现2003-Can‘t connect to MySql server on ‘localhost‘(10061)错误。 以为是Navicat过期了。正好Navicat推出了Lite 17免费版本&#xff0c;心想正好可以尝尝鲜&#xff0c;而且还支持连接Redis&#…

Windows git 配置

需要在git-bash的目录下,配置.ssh 的配置文件 要 .ssh 目录下的配置无法使用

企业或设计师如何使用ComfyUI轻松构建项目AI工作流

ComfyUI是一个为Stable Diffusion专门设计的基于节点的图形用户界面&#xff08;GUI&#xff09;。它使用户能够通过链接不同的块&#xff08;称为节点&#xff09;来构建复杂的图像生成工作流程。这些节点可以包括各种任务&#xff0c;如加载检查点模型、输入提示、指定采样器…

CCS字体、字号更改+CCS下载官方链接

Step1、 按照图示箭头操作 step2 Step3 点击确定&#xff0c;点击Apply(应用)&#xff0c;点击Apply and close(应用和关闭) 4、历代版本下载链接 CCS下载&#xff1a;官方链接https://www.ti.com/tool/CCSTUDIO The last but not least 如果成功的解决了你的问题&#x…

基于SpringBoot+Vue+uniapp的在线招聘平台的详细设计和实现

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…

基于SSM服装定制系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;服装类型管理&#xff0c;服装信息管理&#xff0c;服装定制管理&#xff0c;留言反馈&#xff0c;系统管理 前台账号功能包括&#xff1a;系统首页&#xff0c;个人中心&#xf…

Vue3工程基本创建模板

创建vue工程 执行这两个绿色的命令 输入 code . 打开vscode 安装依赖 Element - plus npm install element-plus --save 在vscode的 main.js 换这个代码 // main.ts import { createApp } from vue import ElementPlus from element-plus import element-plus/dist/inde…

Datawhale 组队学习 文生图 Prompt攻防 task03随笔

这期我们从不同角度切入探讨赛题的进阶思路 思路1&#xff1a;对比不同大模型 首先我们可以选择尝试不同的大模型&#xff0c;使用更复杂的大模型可以提高文本改写的质量和效果。随着模型大小的增加&#xff0c;其表示能力也随之增强&#xff0c;能够捕捉更细微的语言特征和语…

计算机是如何输入存储输出汉字、图片、音频、视频的

计算机是如何输入存储输出汉字、图片、音频、视频的 为了便于理解&#xff0c;先了解一下计算机的组成。 冯诺依曼计算机的五大组成部分。分别是运算器、控制器、存储器、输入设备和输出设备。参见下图&#xff1a; 一、运算器 运算器又称“算术逻辑单元”&#xff0c;是计算…

空间解析几何 4:空间中线段到圆的距离【附MATLAB代码】

目录 理论公式 matlab代码 理论公式 对于解一元4次方程&#xff0c;请详见我的博客 一元四次方程求解 -【附MATLAB代码】-CSDN博客文章浏览阅读1.4k次&#xff0c;点赞41次&#xff0c;收藏4次。最近在研究机器人的干涉&#xff08;碰撞&#xff09;检测&#xff0c;遇到了一…

《数字图像处理基础》学习02-BMP位图文件

目录 一&#xff0c;BMP文件组成 二&#xff0c;使用ultra edit软件查看图像结构 1&#xff0c;ultra edit软件的下载和安装 2&#xff0c;ultra edit打开图像 三&#xff0c;使用matlab显示RGB图像 在之前的文章学习到&#xff0c;计算机只能处理数字图像&#xff0c;因…

【动手学深度学习】8.2. 文本预处理(个人向笔记)

本节将解析文本的常见预处理步骤包括&#xff1a;将文本作为字符串加载到内存中。将字符串拆分为词元&#xff08;如单词和字符&#xff09;。建立一个词表&#xff0c;将拆分的词元映射到数字索引。将文本转换为数字索引序列&#xff0c;方便模型操作。 1. 读取数据集 我们下…

C++中的vector介绍(常用函数)

目录 vector的介绍及使用1.vector的介绍2.vector的使用2.1vector的定义2.2 vector iterator 的使用2.3vector 空间增长问题2.4 vector 增删查改2.5 vector 迭代器失效问题。&#xff08;重点&#xff09; 3.动态二维数组理解4.模拟实现reserve vector的介绍及使用 1.vector的介…

基于SpringBoot的“社区医院管理服务系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“社区医院管理服务系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统首页界面图 用户注册界面图 医生界面…

基于x86_64汇编语言简单教程1: 环境预备与尝试

目录 前言 环境配置 基本硬件与操作系统要求 WSL VSCode基本配置(For Windows) 安装基本的依赖 为您的VSCode安装插件&#xff1a; 学习要求 入门 先试试味道 前言 笔者最近正在梭哈使用NASM汇编器的x86 32位汇编&#xff0c;笔者这里记录一下一个晚上的成果。 环境…

chrome清除https状态

莫名其妙的http跳转到https的url了。 解决办法 浏览器地址栏输入&#xff1a;chrome://net-internals/#hsts 输入你需要删除的域名即可&#xff01;&#xff01;&#xff01;