【Flink SQL】Flink SQL 基础概念:SQL 的时间属性

Flink SQL 基础概念:SQL 的时间属性

  • 1.Flink 三种时间属性简介
  • 2.Flink 三种时间属性的应用场景
    • 2.1 事件时间案例
    • 2.2 处理时间案例
    • 2.3 摄入时间案例
  • 3.SQL 指定时间属性的两种方式
  • 4.SQL 事件时间案例
  • 5.SQL 处理时间案例

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间事件时间摄入时间 三种时间语义。

三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。

1.Flink 三种时间属性简介

  • 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
  • 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()),在生产环境中用的次多。
  • 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴们要注意到:

  • 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
  • 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

2.Flink 三种时间属性的应用场景

讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

  • 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
  • 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

2.1 事件时间案例

还是以之前的 clicks 表拿来举例。

在这里插入图片描述
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime 划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime 设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)

上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.2 处理时间案例

还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。

那么这种触发机制就是处理时间。

2.3 摄入时间案例

在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

3.SQL 指定时间属性的两种方式

如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

4.SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊,如下。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 1. 这个 ts 就是常见的毫秒级别时间戳
  ts BIGINT,
  -- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
  time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  -- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
  -- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
  WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
  • DataStream 中指定事件时间。

之前介绍了 TableDataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {
                    @Override
                    public long extractTimestamp(Row element) {
                        return (long) element.getField("f2");
                    }
                });
        // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

5.SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

  • CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  -- 使用下面这句来将 user_action_time 声明为处理时间
  user_action_time AS PROCTIME()
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
  • DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 分配 watermark
        DataStream<Row> r = env.addSource(new UserDefinedSource());

        // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳
        Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");

        tEnv.createTemporaryView("source_table", sourceTable);

        // 3. 在 tumble window 中使用 f2
        String tumbleWindowSql =
                "SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"
                + "FROM source_table\n"
                + "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)"
                ;

        Table resultTable = tEnv.sqlQuery(tumbleWindowSql);

        tEnv.toDataStream(resultTable, Row.class).print();

        env.execute();
    }


    private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Row> sourceContext) throws Exception {

            int i = 0;

            while (!this.isCancel) {

                sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));

                Thread.sleep(10L);
                i++;
            }

        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }

        @Override
        public TypeInformation<Row> getProducedType() {
            return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),
                    TypeInformation.of(Long.class));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/457543.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

重新认识BIO、NIO、IO多路复用、Select、Poll、Epollo它们之间的关系

目录 一、背景 二、名词理解 &#xff08;1&#xff09;BIO &#xff08;2&#xff09;NIO &#xff08;3&#xff09;IO多路复用 &#xff08;4&#xff09;Select、Poll、Epollo 三、他们之间的关系总结 一、背景 最近又在学习网络IO相关知识&#xff0c;对我们常说的…

【软件测试基础篇】第一节.软件测试基础1

文章目录 前言⼀、了解软件测试行业二、主流测试技能三、测试常用分类四、模型 4.1 质量模型 4.2 w模型五、测试流程六、测试用例总结 前言 一、了解软件测试行业 1.概念&#xff1a; 使用技术手段验证软件功能是否符合需求 2.特点&#xff1a; 岗位缺口&#xff1a…

【OceanBase诊断调优 】 —— 合并问题如何排查?

最近总结一些诊断OCeanBase的一些经验&#xff0c;出一个【OceanBase诊断调优】专题&#xff0c;也欢迎大家贡献自己的诊断OceanBase的方法。 1. 前言 OceanBase 数据库的存储引擎基于 LSM-Tree 架构&#xff0c;将数据分为静态基线数据&#xff08;放在 SSTable 中&#xff…

如何利用POI导出报表

一、报表格式 二、依赖坐标 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.16</version> </dependency> <dependency><groupId>org.apache.poi</groupId><art…

Spring, SpringBoot, SpringCloud,微服务

1,SSM (Spring+SpringMVC+MyBatis) SSM框架集由Spring、MyBatis两个开源框架整合而成(SpringMVC是Spring中的部分内容),常作为数据源较简单的web项目的框架。 Spring MVC 是 Spring 提供的一个基于 MVC 设计模式的轻量级 Web 开发框架,本质上相当于 Servlet,Controlle…

浅谈路由器基本结构与工作原理

目录 一、结构 1.1 输入端口 1.2 交换结构 1.3 输出端口 1.4 路由选择处理器 二、输入端口处理和基于目的地转发 三、交换 四、输出端口处理 五、何时出现排队 5.1 输入排队 5.2 输出排队 一、结构 下图是一个通用路由器体系结构的总体试图视图&#xff0c;其主要由…

亚信安慧AntDB在数据可靠性和系统安全中的实践

亚信安慧AntDB以持续创新和技术进步为理念&#xff0c;不断优化性能和功能&#xff0c;至今已经保持了15年的平稳运行。这一漫长的历程并非偶然&#xff0c;而是源于AntDB团队对技术的不懈探索和追求。他们始终秉承着“永不停歇&#xff0c;永不满足”的信念&#xff0c;将技术…

openwrt中时间同步ntp使用

前言 openwrt开发中&#xff0c;我们可能遇到这样需求&#xff0c;使用路由器支持局域网内设备ntp授时功能。 作者&#xff1a;羽林君 转载授权以及围观&#xff1a;欢迎关注微信公众号&#xff1a;羽林君 或者添加作者个人微信&#xff1a;become_me ntp是什么 NTP&#…

STM32-V5开发板和树莓派的区别介绍

STM32-V5开发板和树莓派是两种不同类型的嵌入式开发平台&#xff0c;它们在设计理念、性能、用途和编程方式上都有所区别。 STM32-V5开发板 STM32-V5开发板是基于STM32微控制器的开发平台&#xff0c;通常用于嵌入式系统开发、教学和实验。 区别&#xff1a; - **核心*…

冒泡排序,详详解解

目录 基本概念&#xff1a; 上图&#xff1a; 核心思路&#xff1a; 基本步骤&#xff1a; 关键&#xff1a; 代码核心&#xff1a; 补充&#xff1a; 代码&#xff08;规范&#xff09; &#xff1a; 代码&#xff08;优化&#xff09;&#xff1a; 今天我们不刷力扣了&…

【Node.js从基础到高级运用】十二、身份验证与授权:JWT

身份验证与授权是现代Web应用中不可或缺的部分。了解如何在Node.js应用中实施这些机制&#xff0c;将使你能够构建更安全、更可靠的应用程序。本文将引导你通过使用JWT实现用户注册、登录和权限控制的过程。 JWT&#xff08;Json Web Token&#xff09; JWT是一种用于双方之间…

使用HttpRequest工具类调用第三方URL传入普通以及文件参数并转换MultipartFile成File

使用HttpRequest工具类调用第三方URL传入普通以及文件参数 一、依赖及配置二、代码1、模拟第三方服务2、调用服务3、效果实现 一、依赖及配置 <!--工具依赖--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId&g…

IP在网络通信中的重要作用

IP&#xff0c;全称Internet Protocol&#xff0c;即网际互连协议&#xff0c;是TCP/IP体系中的网络层协议。IP作为整个TCP/IP协议族的核心&#xff0c;是构成互联网的基础。它的作用重大且深远&#xff0c;下面将详细阐述IP的定义及其在网络通信中的重要作用。 首先&#xff0…

.NET高级面试指南专题十七【 策略模式模式介绍,允许在运行时选择算法的行为】

介绍&#xff1a; 策略模式是一种行为设计模式&#xff0c;它允许在运行时选择算法的行为。它定义了一系列算法&#xff0c;将每个算法封装到一个对象中&#xff0c;并使它们可以互相替换。这使得算法可独立于使用它的客户端变化。 原理&#xff1a; 策略接口&#xff08;Strat…

【JavaScript 漫游】【036】CORS 通信总结

文章简介 CORS 是一个 W3C 标准&#xff0c;全称是“跨域资源共享”&#xff08;Cross-origin resource sharing&#xff09;。它允许浏览器向跨域的服务器&#xff0c;发出 XMLHttpRequest 请求&#xff0c;从而克服了 AJAX 只能同源使用的限制。 本篇文章为【JavaScript 漫…

Frustum PointNets for 3D Object Detection from RGB-D Data(2018)

3D空间的几何和拓扑结构 直接在3D空间操作可以更自然的参数化以及捕捉 重复、平面、对称等几何结构 2. Related Work 3D Object Detection from RGB-D Data Front view image based methods&#xff08;只是介绍了一种表示方法&#xff09; Bird’s eye view based methods&a…

【Ubuntu20.04】Clion 配置 Libtorch + OpenCV

首先根据自己的CUDA版本安装正确对应的cuda和cudnn并进行配置。 这里安装的是cuda-11.3版本&#xff0c;以下基于这个版本进行安装。 1. 安装 Clion 因为Clion更容易直接编写CMakelists.txt&#xff0c;所以使用Clion作为IDE。 需要在File -> Setting -> CMake的CMake…

C# wpf 使用GDI实现截屏

wpf截屏系列 第一章 使用GDI实现截屏&#xff08;本章&#xff09; 第二章 使用GDI实现截屏 第三章 使用DockPanel制作截屏框 第四章 实现截屏框热键截屏 第五章 实现截屏框实时截屏 第六章 使用ffmpeg命令行实现录屏 文章目录 wpf截屏系列前言一、导入gdi32方法一、NuGet获取…

ChatGPT赋能遥感研究:精准分析处理遥感影像数据,推动科研新突破

遥感技术主要通过卫星和飞机从远处观察和测量我们的环境&#xff0c;是理解和监测地球物理、化学和生物系统的基石。ChatGPT是由OpenAI开发的最先进的语言模型&#xff0c;在理解和生成人类语言方面表现出了非凡的能力。重点介绍ChatGPT在遥感中的应用&#xff0c;人工智能在解…

数字排列 - 华为OD统一考试(C卷)

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 小明负责公司年会&#xff0c;想出一个趣味游戏: 屏幕给出 1−9 中任意 4 个不重复的数字,大家以最快时间给出这几个数字可拼成的数字从小到大排列位于第 n 位置…