【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性

Flink SQL 基础概念》系列,共包含以下 5 篇文章:

  • Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
  • Flink SQL 基础概念(二):数据类型
  • Flink SQL 基础概念(三):SQL 动态表 & 连续查询
  • Flink SQL 基础概念(四):SQL 的时间属性
  • Flink SQL 基础概念(五):SQL 时区问题

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 基础概念(四):SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 1. 这个 ts 就是常见的毫秒级别时间戳
  ts BIGINT,
  -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {
                    @Override
                    public long extractTimestamp(Row element) {
                        return (long) element.getField("f2");
                    }
                });
        // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 使用下面这句来将 user_action_time 声明为处理时间
  user_action_time AS PROCTIME()
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource());

        // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/465357.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法练习:前缀和

目录 1. 一维前缀和2. 二维前缀和3. 寻找数组中心下标4. 除自身以外数组的乘积5. !和为k的子数字6. !和可被k整除的子数组7. !连续数组8. 矩阵区域和 1. 一维前缀和 题目信息&#xff1a; 题目链接&#xff1a; 一维前缀和思路&#xff1a;求前缀和数组&#xff0c;sum dp[r] …

R语言:microeco:一个用于微生物群落生态学数据挖掘的R包,第六:trans_nullmodel class

近几十年来&#xff0c;系统发育分析和零模型的整合通过增加系统发育维度&#xff0c;更有力地促进了生态位和中性影响对群落聚集的推断。trans_nullmodel类提供了一个封装&#xff0c;包括系统发育信号、beta平均成对系统发育距离(betaMPD)、beta平均最近分类单元距离(betaMNT…

解决后端传给前端的日期问题

解决方式&#xff1a; 1). 方式一 在属性上加上注解&#xff0c;对日期进行格式化 但这种方式&#xff0c;需要在每个时间属性上都要加上该注解&#xff0c;使用较麻烦&#xff0c;不能全局处理。 2). 方式二&#xff08;推荐 ) 在WebMvcConfiguration中扩展SpringMVC的消息转…

专业120+总400+北京理工大学826信号处理导论考研经验北理工电子信息与通信工程,真题,大纲,参考书。

**今年专业课826信号处理导论&#xff08;信号系统和数字信号处理&#xff09;120&#xff0c;总分400&#xff0c;应群里同学需要&#xff0c;自己总结一下去年的复习经历&#xff0c;希望对大家复习有帮助。**专业课&#xff1a; 北京理工大学专业826是两门合一&#xff0c;…

Flutter开发进阶之使用工具效率开发

Flutter开发进阶之使用工具效率开发 软件开发团队使用Flutter开发的原因通常是因为Flutter开发性能高、效率高、兼容性好、可拓展性高&#xff0c;作为软件PM来说主要考虑的是范围管理、进度管理、成本管理、资源管理、质量管理、风险管理和沟通管理等&#xff0c;可以看到Flu…

微信小程序调用百度智能云API(菜品识别)

一、注册后生成应用列表创建应用 二、找到当前所需使用的api菜品识别文档 三、点链接看实例代码 这里需要使用到如下几个参数&#xff08;如下&#xff09;&#xff0c;其他的参数可以不管 client_id &#xff1a; 就是创建应用后的API Keyclient_secret&#xff1a; 就是创建…

Vue mqtt 附在线mqtt客户端地址 + 完整示例

mqtt&#xff1a;轻量级物联网消息推送协议。 目录 一、介绍 1、官方文档 1&#xff09;npm网 2) 中文网 MQTT中文网_MQTT 物联网接入平台-MQTT.CN 2、官方示例 二、准备工作 1、安装依赖包 2、示例版本 三、使用步骤 1、在单页面引入 mqtt 四、完整示例 tips 一、介…

正则表达式与re模块

目录 正则表达式 简介 语法&#xff1a; 常用元字符&#xff1a; 量词: 贪婪匹配和惰性匹配&#xff1a; re模块 简介&#xff1a; 常用的几个模块&#xff1a; 1.findall 2.search 3.finditer 4.compile 案例展示&#xff1a; 需求&#xff1a; 思路分析&#…

《论文阅读》E-CORE:情感相关性增强的移情对话生成 EMNLP 2023

《论文阅读》E-CORE:情感相关性增强的移情对话生成 EMNLP 2023 前言摘要模型架构图构建边的构建和初始化节点的初始化图更新情感相关性加强解码损失函数总结前言 亲身阅读感受分享,细节画图解释,再也不用担心看不懂论文啦~ 无抄袭,无复制,纯手工敲击键盘~ 今天为大家带来…

IDEA 多个git仓库项目放一个窗口

1、多个项目先通过新建module或者CtrlAltShiftS 添加module引入 2、重点是右下角有时候git 分支视图只有一个module的Repositories。这时候需要去设置把多个git仓库添加到同一个窗口才能方便提交代码。 3、如果Directory Mappings已经有相关项目配置&#xff0c;但是灰色的&…

浅谈JPA框架

JPA 前言概述ORM 映射元数据JPQLJPA API附Spring Data JPA 前言 了解 JPA 框架对后续使用 Spring Boot 是有很大帮助的&#xff0c;下面简单介绍 JPA 框架的基础知识。 概述 JPA&#xff08; Java 对象持久化 API &#xff0c;Java Persistence API &#xff09;&#xff0c…

物联网数据驾驶舱

在信息化时代&#xff0c;数据已经成为驱动企业发展的核心动力。特别是在物联网领域&#xff0c;海量数据的实时采集、分析和监控&#xff0c;对于企业的运营决策和业务优化具有至关重要的作用。HiWoo Cloud作为领先的物联网云平台&#xff0c;其数据监控功能以“物联网数据驾驶…

npm包、全局数据共享、分包

使用 npm 包 小程序对 npm 的支持与限制 目前&#xff0c;小程序中已经支持使用 npm 安装第三方包&#xff0c;从而来提高小程序的开发效率。但是&#xff0c;在小程序中使用npm 包有如下 3 个限制&#xff1a; ① 不支持依赖于 Node.js 内置库的包 ② 不支持依赖于浏览器内置…

C++——string

一学习string的原因 1.从个人理解角度上&#xff1a; 在刚开始学习之前&#xff0c;我只知道学习完string在以后的刷题中能提高做题效率&#xff0c;在对字符串的处理string库中也许有对应的接口去实现需求&#xff0c;不用自己去写函数的实现。 但在学string中改变了之前的…

安卓安装Magisk面具以及激活EdXposed

模拟器&#xff1a;雷电模拟器 安卓版本: Android9 文中工具下载链接合集&#xff1a;https://pan.baidu.com/s/1c1X3XFlO2WZhqWx0oE11bA?pwdr08s 前提准备 模拟器需要开启system可写入和root权限 一、安装Magisk 1. 安装magisk 将magisk安装包拖入模拟器 点击&#xff1a…

UnityShader(十六)凹凸映射

前言&#xff1a; 纹理的一种常见应用就是凹凸映射&#xff08;bump mapping&#xff09;。凹凸映射目的就是用一张纹理图来修改模型表面的法线&#xff0c;让模型看起来更加细节&#xff0c;这种方法不会改变模型原本的顶点位置&#xff08;也就是不会修改模型的形状&#xf…

《硬件历险》之Mac抢救出现问题的时间机器硬盘中的数据

本文虽然使用“抢救”一词&#xff0c;但是运气比较好&#xff0c;远没有达到访问和修改底层的信息来抢救的地步。如果你是需要通过访问和修改底层信息来抢救数据&#xff0c;建议阅读刘伟的《数据恢复技术深度揭秘&#xff08;第二版&#xff09;》或者寻找专业人士的帮助。 《…

卷积篇 | YOLOv8改进之C2f模块融合SCConv | 即插即用的空间和通道维度重构卷积

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。SCConv是一种用于减少特征冗余的卷积神经网络模块。相对于其他流行的SOTA方法&#xff0c;SCConv可以以更低的计算成本获得更高的准确率。它通过在空间和通道维度上进行重构&#xff0c;从而减少了特征图中的冗余信息。这种…

留学生课设|R语言|研究方法课设

目录 INSTRUCTIONS Question 1. Understanding Quantitative Research Question 2. Inputting data into Jamovi and creating variables (using the dataset) Question 3. Outliers Question 4. Tests for mean difference Question 5. Correlation Analysis INSTRUCTIO…

深度学习 精选笔记(13.1)卷积神经网络-LeNet模型

学习参考&#xff1a; 动手学深度学习2.0Deep-Learning-with-TensorFlow-bookpytorchlightning ①如有冒犯、请联系侵删。 ②已写完的笔记文章会不定时一直修订修改(删、改、增)&#xff0c;以达到集多方教程的精华于一文的目的。 ③非常推荐上面&#xff08;学习参考&#x…