Flink之SQL客户端与DDL操作

SQL客户端与DDL操作

  • Flink SQL
  • SQL客户端
    • 1.启动Flink
    • 2.启动Flink的SQL客户端
    • 3.HELP命令
    • 4.验证连接
    • 5.结果显示模式
    • 6.执行配置
  • 数据库操作
    • 1.创建数据库
    • 2.查询数据库
    • 3.修改数据库
    • 4.删除数据库
  • 表操作
    • 1.创建表
      • 表列属性
      • 表Watermark属性
      • 列PRIMARY KEY属性
      • 列PARTITIONED BY属性
      • 列WITH选项属性
      • 列LIKE属性
      • 列AS select_statement属性
      • 创建一张基于 Print的简单表
    • 2.查看表
    • 3.修改表
    • 4.删除表
  • 其他
    • 动态表
      • 将流转换成动态表
      • 连续查询
      • 将动态表转换为流
    • 时间属性
      • 处理时间
      • 事件时间

Flink SQL

Flink SQL是Apache Flink框架中的一种查询语言,用于对数据流和批处理作业执行SQL查询和转换操作。它提供了一种声明性的方式来处理数据,使得开发人员能够使用熟悉的SQL语法来操作流式和批处理数据。

Flink的Table API和SQL是流批统一的API,具有相同的语义。

Table API是用于Scala和Java语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join等关系型算子。

Flink SQL是基于Apache Calcite来实现的标准SQL,这两种API中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

SQL客户端

Flink的Table & SQL API可以处理SQL语言编写的查询语句,但是这些查询需要嵌入用Java或Scala编写的表程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了Java/Scala程序员对Flink的使用。

SQL客户端的目的是提供一种简单的方式来编写、调试和提交表程序到Flink集群上,而无需写一行Java或Scala代码。SQL客户端命令行界面(CLI)能够在命令行中检索和可视化分布式应用中实时产生的结果。

SQL 客户端捆绑在常规 Flink 发行版中,因此可以直接运行。它仅需要一个正在运行的 Flink 集群就可以在其中执行表程序。

1.启动Flink

# 基于独立模式的会话模式部署
./bin/start-cluster.sh

# 基于YARN运行模式的会话模式部署
./bin/yarn-session.sh -d

2.启动Flink的SQL客户端

SQL Client 脚本也位于 Flink 的 bin 目录中。用户可以通过启动嵌入式 standalone 进程或通过连接到远程 SQL Gateway 来启动 SQL 客户端命令行界面。

# SQL客户端默认使用embedded模式
./bin/sql-client.sh

# 显式使用embedded 模式
./bin/sql-client.sh embedded

# 使用gateway模式
./bin/sql-client.sh gateway --endpoint <gateway address>

# 基于YARN运行模式的embedded模式
./bin/sql-client.sh embedded -s yarn-session

# 启动时,指定sql文件
./bin/sql-client.sh embedded -s yarn-session -i ./sql-client-init.sql

3.HELP命令

命令行界面启动后,使用 HELP 命令列出所有可用的 SQL 语句

HELP                            打印可用命令的帮助信息。
QUIT/EXIT                       退出 SQL CLI 客户端。
CLEAR                           清除当前终端的内容。
SET                             设置会话配置属性。语法:"SET '<key>'='<value>';". 使用 "SET;" 列出所有属性。
RESET                           重置会话配置属性。语法:"RESET '<key>';". 使用 "RESET;" 重置所有会话属性。
INSERT INTOSQL SELECT 查询的结果插入到指定的表中。
INSERT OVERWRITESQL SELECT 查询的结果覆盖插入到指定的表中,覆盖现有数据。
SELECTFlink 集群上执行 SQL SELECT 查询。
EXPLAIN                         描述给定名称的查询或表的执行计划。
BEGIN STATEMENT SET             开始一个语句集合。语法:"BEGIN STATEMENT SET;"
END                             结束一个语句集合。语法:"END;"
ADD JAR                         将指定的 jar 文件添加到提交的作业类加载器中。语法:"ADD JAR '<path_to_filename>.jar'"
REMOVE JAR                      从提交的作业类加载器中移除指定的 jar 文件。语法:"REMOVE JAR '<path_to_filename>.jar'"
SHOW JARS                       显示用户指定的 jar 依赖列表。该列表受到 --jar 和 --library 启动选项以及 ADD/REMOVE JAR 命令的影响。

4.验证连接

输入第一条 SQL 查询语句并按 Enter 键执行,可以验证设置及集群连接是否正确

[root@node01 flink]# ./bin/sql-client.sh

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /root/.flink-sql-history

Flink SQL>

输入SQL:SELECT 'Hello World';,该查询不需要 table source,并且只产生一行结果。CLI 将从集群中检索结果并将其可视化。按 Q 键退出结果视图。
在这里插入图片描述

5.结果显示模式

CLI为维护和可视化结果提供三种模式。默认table,还可以设置为tableau、changelog

1.表格模式(table mode)

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。

执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'table';

执行SQL:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                           name                  cnt
                          Alice                    1
                           Greg                    1
                            Bob                    2

2.变更日志模式(changelog mode

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';
 op                           name                  cnt
 +I                            Bob                    1
 +I                          Alice                    1
 +I                           Greg                    1
 -U                            Bob                    1
 +U                            Bob                    2

3.Tableau模式(tableau mode)

更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同:

SET 'sql-client.execution.result-mode' = 'tableau';
+----+--------------------------------+----------------------+
| op |                           name |                  cnt |
+----+--------------------------------+----------------------+
| +I |                            Bob |                    1 |
| +I |                          Alice |                    1 |
| +I |                           Greg |                    1 |
| -U |                            Bob |                    1 |
| +U |                            Bob |                    2 |
+----+--------------------------------+----------------------+
Received a total of 5 rows

6.执行配置

执行环境默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度

SET parallelism.default=1;

设置状态TTL

SET table.exec.state.ttl=1000;

数据库操作

1.创建数据库

根据给定的表属性创建数据库。若数据库中已存在同名表会抛出异常。

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)
IF NOT EXISTS:若数据库已经存在,则不会进行任何操作。

WITH OPTIONS:数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串文本常量。
Flink SQL> CREATE DATABASE db_flink;
[INFO] Execute statement succeed.

2.查询数据库

查询所有数据库

Flink SQL> SHOW DATABASES;
+------------------+
|    database name |
+------------------+
| default_database |
|         db_flink |
+------------------+
2 rows in set

查询当前数据库

Flink SQL> SHOW CURRENT DATABASE;
+-----------------------+
| current database name |
+-----------------------+
|      default_database |
+-----------------------+
1 row in set

切换数据库

USE database_name;

3.修改数据库

在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

4.删除数据库

根据给定的表名删除数据库。若需要删除的数据库不存在会抛出异常 。

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
IF EXISTS:若数据库不存在,不执行任何操作。

RESTRICT:当删除一个非空数据库时,会触发异常。(默认为开)

CASCADE:删除一个非空数据库时,把相关联的表与函数一并删除。
Flink SQL> DROP DATABASE db_flink;
[INFO] Execute statement succeed.

表操作

1.创建表

根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

表列属性

1.物理/常规列

physical_column_definition:定义物理列,其定义了物理介质中存储的数据中字段的名称、类型和顺序

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);

2.元数据列

metadata_column_definition: 定义元数据列

例如:元数据列可用于从 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。简单说就是将Kafka数据中的一个时间戳作为表的一个字段

创建一个带有引用元数据字段的附加元数据列的表timestamp

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH (
  'connector' = 'kafka'
  ...
);
# 如果自定义的列名称和定义metadata字段的名称一样, FROM子句可省略
`timestamp` TIMESTAMP_LTZ(3) METADATA

# 如果自定义列的数据类型和定义的metadata字段的数据类型不一致,程序运行时会自动强转,但是要求两种数据类型是可以强转
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA

3.计算列

computed_column_definition:定义计算列,将一些列经过自定义运算生成的新列,在物理上并不存储在表中,只能读不能写。

`money` AS price * quanitity

表Watermark属性

WATERMARK定义了表的事件时间属性,其形式为 :

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name:把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

Flink SQL提供了几种 WATERMARK 生产策略:

1.严格升序:

Flink任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。

WATERMARK FOR rowtime_column AS rowtime_column

2.递增:

一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 

3.有界无序:

用于设置最大乱序时间,此类Watermark 生成策略通常用于有数据乱序的场景,实际场景中,数据也都是会存在乱序,所以使用此类策略。

WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit 

假如设置为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND ,则生成的是运行 5s 延迟的Watermark。

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

列PRIMARY KEY属性

主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行,只支持 not enforced。

键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced
) 

列PARTITIONED BY属性

创建分区表

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

列WITH选项属性

创建表的表属性,表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。

一般with中的配置项由Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。

具体参考:Table & SQL Connectors

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

列LIKE属性

LIKE子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。

还可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改写 startup-mode 属性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

结果表 Orders_with_watermark 等效于使用以下语句创建的表:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

列AS select_statement属性

通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。

CREATE TABLE my_ctas_table
WITH (
    'connector' = 'kafka',
    ...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

结果表 my_ctas_table 等效于使用以下语句创建表并写入数据:

CREATE TABLE my_ctas_table (
    id BIGINT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    ...
);
 
INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意CTAS 有如下约束:

暂不支持创建临时表
暂不支持指定列信息
暂不支持指定 Watermark
暂不支持创建分区表
暂不支持主键约束

创建一张基于 Print的简单表

Print连接器允许将每一行写入标准输出流或者标准错误流。

CREATE TABLE print_table (
 f0 INT,
 f1 INT,
 f2 STRING,
 f3 DOUBLE
) WITH (
 'connector' = 'print'
)

也可以通过 LIKE子句 基于已有表的结构去创建新表

CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE source_table (EXCLUDING ALL)

2.查看表

查看所有表

展示指定库的所有表,如果没有指定库则展示当前库的所有表。另外返回的结果能被一个可选的匹配字符串过滤。

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

注意:如果没有指定数据库,则从当前数据库返回表。

LIKE子句中 sql 正则式的语法与 MySQL 方言中的语法相同

% 匹配任意数量的字符, 也包括0数量字符, \% 匹配一个 % 字符

_ 只匹配一个字符, \_ 匹配一个 _ 字符
Flink SQL> SHOW TABLES;
+-------------+
|  table name |
+-------------+
| print_table |
+-------------+
1 row in set

查看表信息

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
Flink SQL> DESC print_table;
+------+--------+------+-----+--------+-----------+
| name |   type | null | key | extras | watermark |
+------+--------+------+-----+--------+-----------+
|   f0 |    INT | TRUE |     |        |           |
|   f1 |    INT | TRUE |     |        |           |
|   f2 | STRING | TRUE |     |        |           |
|   f3 | DOUBLE | TRUE |     |        |           |
+------+--------+------+-----+--------+-----------+
4 rows in set

3.修改表

修改表名

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

4.删除表

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

其他

动态表

动态表是Flink的支持流数据的Table API和SQL的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。

注意:

连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

流、动态表和连续查询之间的关系:

1.将流转换为动态表

2.在动态表上计算一个连续查询,生成一个新的动态表

3.生成的动态表被转换回流

在这里插入图片描述

将流转换成动态表

为了使用关系查询处理流,必须将其转换成 Table。流的每条记录都被解释为对结果表的 INSERT 操作

如下:单击事件流转换为表。当插入更多的单击流记录时,结果表将不断增长。
在这里插入图片描述

连续查询

在动态表上计算一个连续查询,并生成一个新的动态表。

与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。

在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。

在这里插入图片描述

1.更新查询

当原始动态表不停地插入新的数据时,查询得到的结果表会持续地进行更改。

这里的更改操作可以是简单的插入,也可以是对之前数据的更新。这种持续查询被称为更新查询

2.追加查询

上述查询过程用到分组聚合,结果表中就会产生更新操作。

如果执行一个简单的条件查询,结果表中就会像原始表一样,只有插入操作。

那么这样的持续查询,就被称为追加查询,它定义的结果表的更新日志流中只有INSERT操作。

将动态表转换为流

动态表可以通过插入、更新和删除操作,进行持续的更改。在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。

Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

1.仅追加流 Append-only

仅通过INSERT插入更改来修改的动态表,可以直接转换为仅追加流。这个流中发出的数据,其实就是动态表中新增的每一行。

2.撤回流 Retract

撤回流是包含两类消息的流,添加消息和撤回消息。

INSERT插入操作编码为add消息

DELETE删除操作编码为retract消息

UPDATE更新操作则编码为被更改行的retract消息和更新后行(新行)的add消息。

这样通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流。

将动态表转换为 retract 流的过程。
在这里插入图片描述

3.更新插入流 Upsert

更新插入流中只包含两种类型的消息:更新插入消息和删除消息。

对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息,而DELETE删除操作则被编码为delete消息。

在这里插入图片描述

时间属性

Flink 可以基于几种不同的 时间 概念来处理数据。

处理时间 指的是执行具体操作时的机器时间

事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间

摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理

处理时间

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。

1.在创建表的DDL中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

2.在DataStream到Table转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。

DataStream<Tuple2<String, String>> stream = ...;

// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
        Tumble.over(lit(10).minutes())
            .on($("user_action_time"))
            .as("userActionWindow"));

3.使用TableSource定义
逻辑的时间属性会放在 TableSource 已有物理字段的最后

// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name" , "data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// create stream
		DataStream<Row> stream = ...;
		return stream;
	}

	@Override
	public String getProctimeAttribute() {
		// 这个名字的列会被追加到最后,作为第三列
		return "user_action_time";
	}
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble
	    .over(lit(10).minutes())
	    .on($("user_action_time"))
	    .as("userActionWindow"));

事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现的结果。

事件时间属性也有类似于处理时间的三种定义方式:

1.在DDL中定义

2.在DataStream到Table转换时定义

3.用TableSource定义

1.在DDL中定义

事件时间属性可以在创建表DDL中定义,增加一个字段,通过WATERMARK语句来定义事件时间属性。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换

ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),

2.在DataStream到Table转换时定义

在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

在schema的结尾追加一个新的字段

替换一个已经存在的字段
// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:
WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));

3.使用TableSource定义

// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name", "data", "user_action_time"};
		TypeInformation[] types =
		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// 构造 DataStream
		// ...
		// 基于 "user_action_time" 定义 watermark
		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
		return stream;
	}

	@Override
	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
		// 标记 "user_action_time" 字段是事件时间字段
		// 给 "user_action_time" 构造一个时间属性描述符
		RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
			"user_action_time",
			new ExistingField("user_action_time"),
			new AscendingTimestamps());
		List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
		return listRowtimeAttrDescr;
	}
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/129228.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

c: struct sort descending and ascending in windows and Ubuntu

/*** file StudentStructSort.h* author geovindu,Geovin Du,涂聚文 (geovindu163.com)* ide: vscode c11,c17 Ubuntu 22.4* brief 结构体排序示例* date 2023-11-05* version 0.1* copyright geovindu 站在巨人的肩膀上 Standing on the Shoulders of Giants**/#ifnd…

matplotlib从起点出发(11)_Tutorial_11_TightLayout

如何使用紧凑的而已来干净利落地将绘图放入图形中。 tight_layout会自动调整子图参数&#xff0c;使子图适合图区域。这是一项实验性功能&#xff0c;在某些情况下可能不起作用。它仅检查刻度标签、轴标签和标题的范围。 tight_layout的替代方法是constrained_layout。 1 简…

10. GPIO中断

10. GPIO中断 回顾stm32中断系统STM32中断向量表中断向量偏移NVIC中断控制器 Cortex_A7 中断系统中断向量表GIC控制器中断IDGIC逻辑分块CP15协处理器c0寄存器c1寄存器c12寄存器c15寄存器 中断使能中断优先级设置优先级数配置 GICC_PMR抢占优先级和子优先级位数设置 GICC_BPR优先…

【C++】异常 智能指针

C异常 & 智能指针 1.C异常1.1.异常的抛出与捕获1.2.异常体系1.3.异常安全与规范1.4.异常优缺点 2.智能指针2.1.RAII2.2.智能指针的使用及原理2.2.1.auto_ptr2.2.2.unique_ptr2.2.3.shared_ptr2.2.4.shared_ptr的循环引用问题 & weak_ptr 2.3.定制删除器 1.C异常 C异常…

UML类图绘制指南

目录 类图简介 什么是类图 类图的作用 应用场景 类图中的元素 类和接口 六大关系 强弱关系 依赖关系&#xff1a; 关联关系&#xff1a; 聚合关系&#xff1a; 组合关系&#xff1a; 实现关系&#xff1a; 继承关系&#xff1a; 画图注意事项 总结 类图的重要…

NIO讲解

一&#xff1a;什么是NIO? 二&#xff1a;NIO三大组件 1. channel channel 有一点类似于 stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从 channel 将数据读入 buffer&#xff0c;也可以将 buffer 的数据写入 channel&#xff0c;而之前的 stream 要么是输入…

P1547 [USACO05MAR] Out of Hay S 题解

文章目录 题目描述输入格式输出格式样例样例输入样例输出 完整代码 题目描述 Bessie 计划调查 N N N&#xff08; 2 ≤ N ≤ 2 000 2 \leq N \leq 2\,000 2≤N≤2000&#xff09;个农场的干草情况&#xff0c;它从 1 1 1 号农场出发。农场之间总共有 M M M&#xff08; 1 ≤…

阻塞队列+定时器+常见的锁策略

1)阻塞队列:是一个线程安全的队列&#xff0c;是可以保证线程安全的 1.1)如果当前队列为空&#xff0c;尝试出队列&#xff0c;进入阻塞状态&#xff0c;一直阻塞到队列里面的元素不为空 1.2)如果当前队列满了&#xff0c;尝试入队列&#xff0c;也会产生阻塞&#xff0c;一直阻…

(论文阅读24/100)Visual Tracking with Fully Convolutional Networks

文献阅读笔记&#xff08;sel - CNN&#xff09; 简介 题目 Visual Tracking with Fully Convolutional Networks 作者 Lijun Wang, Wanli Ouyang, Xiaogang Wang, and Huchuan Lu 原文链接 http://202.118.75.4/lu/Paper/ICCV2015/iccv15_lijun.pdf 【DeepLearning】…

easyexcel==省市区三级联动

省市区三级联动&#xff0c;不选前面的就没法选后面的 package com.example.demoeasyexcel.jilian2; import com.alibaba.excel.write.metadata.holder.WriteSheetHolder; import com.alibaba.excel.write.metadata.holder.WriteWorkbookHolder; import org.apache.poi.ss.use…

IT 基础设施监控工具

IT 基础架构监控作为一个整体&#xff0c;是关于跟踪网络环境中所有 IT 资产的运行状况和性能&#xff0c;网络管理系统收集有关各种指标的数据&#xff0c;例如可用性、运行状况、性能和利用率&#xff0c;然后&#xff0c;IT 基础架构监控将这些数据转换为有用的统计数据&…

【Python】数据分析案例:世界杯数据可视化 | 文末送书

文章目录 前期数据准备导入数据 分析&#xff1a;世界杯中各队赢得的比赛数分析&#xff1a;先打或后打的比赛获胜次数分析&#xff1a;世界杯中的抛硬币决策分析&#xff1a;2022年T20世界杯的最高得分者分析&#xff1a;世界杯比赛最佳球员奖分析&#xff1a;最适合先击球或追…

Python+reuqests自动化接口测试

1.最近自己在摸索Pythonreuqests自动化接口测试&#xff0c;要实现某个功能&#xff0c;首先自己得有清晰的逻辑思路&#xff01;这样效率才会很快&#xff01; 思路--1.通过python读取Excel中的接口用例&#xff0c;2.通过python的函数调用&#xff0c;get/Post 进行测试&…

【QT】qt打包程序后无法正常启动

本人在自己电脑上打包Qt程序后可以正常运行&#xff0c;但换了个电脑就无法运行了&#xff0c;显示应用程序无法正常启动&#xff08;0xc000007b&#xff09;。 造成这种情况的原因是因为系统变量的原因&#xff0c;我用的win10自带的cmd。 应该采用Qt自带的cmd&#xff0c;打开…

五种常见的IO模型

目录 一. IO的概述 1.1 什么是IO 1.2 IO的效率问题 1.3 同步IO和异步IO的概念 二. 阻塞式IO 三. 非阻塞式IO 四. 信号驱动式IO 五. IO多路复用 六. 异步IO 七. 总结 一. IO的概述 1.1 什么是IO IO&#xff0c;表示输入输出&#xff0c;即&#xff1a;InPut / OutPut…

Day22力扣打卡

打卡记录 替换子串得到平衡字符串&#xff08;滑动窗口&#xff09; 链接 由于是以后统计替换的子串&#xff0c;不可以直接使用hash表统计的每个次数大于 n / 4 的字符&#xff0c;再将其次数减去平衡数来得到答案&#xff0c;根据字符串的连贯性&#xff0c;使用 滑动窗口 …

Linux服务器配置信息查询命令

Linux服务器配置信息查询命令 一、查看CPU信息 查询系统的CPU的详细信息&#xff0c;包括每个处理器的型号、频率、缓存等级以及每个核心的数量。cat /proc/cpuinfo二、查看内存信息 查询系统的内存信息&#xff0c;包括可用内存、已用内存和缓存等。cat /proc/meminfo三、查…

飞控硬件介绍及其主要传感器特性解析

飞行控制器是无人机的关键组件之一&#xff0c;它主要由主控单片机、IMU传感器、电源和输出IO等部分构成。这些硬件和传感器的特性对于无人机的二次开发至关重要&#xff0c;其性能和质量直接关系到无人机的稳定性、飞行性能和功能扩展能力。 本文将带领新手开发者深入了解飞行…

【文件IO】认识文件

文章目录 认识文件文件的结构和目录文件路径 认识文件 我们先来认识狭义上的文件(file)&#xff0c;针对硬盘这种持久化存储的I/O设备&#xff0c;当我们想要进行数据保存时&#xff0c;往往不是保存一个整体&#xff0c;而是独立成一个个单位进行保存&#xff0c;这个独立的单…

jbase代码生成器(成型篇)

上一篇说到通用码表可以解决百分之八十的基础维护功能&#xff0c;剩下的百分二十的需要级联维护的界面可以用代码生成器生成代码&#xff0c;基于生成的代码拷贝再组装界面&#xff0c;来解决这百分之二十的工作量里的百分之八十工作量。 首先实现代码生成器 Class Jbase.Ma…