[Flink]三、Flink1.13

11. Table API 和 SQL

如图 11-1 所示,在 Flink 提供的多层级 API 中,核心是 DataStream API ,这是我们开发流
处理应用的基本途径;底层则是所谓的处理函数( process function ),可以访问事件的时间信
息、注册定时器、自定义状态,进行有状态的流处理。 DataStream API 和处理函数比较通用,
有了这些 API ,理论上我们就可以实现所有场景的需求了。
不过在企业实际应用中,往往会面对大量类似的处理逻辑,所以一般会将底层 API 包装
成更加具体的应用级接口。怎样的接口风格最容易让大家接收呢?作为大数据工程师,我们最
为熟悉的数据统计方式,当然就是写 SQL 了。
SQL 是结构化查询语言( Structured Query Language )的缩写,是我们对关系型数据库进
行查询和修改的通用编程语言。在关系型数据库中,数据是以表( table )的形式组织起来的,
所以也可以认为 SQL 是用来对表进行处理的工具语言。无论是传统架构中进行数据存储的
MySQL PostgreSQL ,还是大数据应用中的 Hive ,都少不了 SQL 的身影;而 Spark 作为大数
据处理引擎,为了更好地支持在 Hive 中的 SQL 查询,也提供了 Spark SQL 作为入口。
Flink 同样提供了对于“表”处理的支持,这就是更高层级的应用 API ,在 Flink 中被称为
Table API SQL Table API 顾名思义,就是基于“表”( Table )的一套 API ,它是内嵌在 Java
Scala 等语言中的一种声明式领域特定语言( DSL ),也就是专门为处理表而设计的;在此基础
上, Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中
直接写 SQL 来实现处理需求了。
                
Flink 中这两种 API 被集成在一起, SQL 执行的对象也是 Flink 中的表( Table ),所以
我们一般会认为它们是一体的,本章会放在一起进行介绍。 Flink 是批流统一的处理框架,无
论是批处理( DataSet API )还是流处理( DataStream API ),在上层应用中都可以直接使用 Table
API 或者 SQL 来实现;这两种 API 对于一张表执行相同的查询操作,得到的结果是完全一样
的。我们主要还是以流处理应用为例进行讲解。
需要说明的是, Table API SQL 最初并不完善,在 Flink 1.9 版本合并阿里巴巴内部版本
Blink 之后发生了非常大的改变,此后也一直处在快速开发和完善的过程中,直到 Flink 1.12
版本才基本上做到了功能上的完善。而即使是在目前最新的 1.13 版本中, Table API SQL
依然不算稳定,接口用法还在不停调整和更新。所以这部分希望大家重在理解原理和基本用法,
具体的 API 调用可以随时关注官网的更新变化。
11.1 快速上手
如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API SQL 的使用其实非常简单:
只要得到一个“表”( Table ),然后对它调用 Table API ,或者直接写 SQL 就可以了。接下来我
们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。
11.1.1 需要引入的依赖
我们想要在代码中使用 Table API ,必须引入相关的依赖。
<dependency>
 <groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
这里的依赖是一个 Java 的“桥接器”( bridge ),主要就是负责 Table API 和下层 DataStream
API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境( IDE )里运行 Table API SQL ,还需要引入以下
依赖:
<dependency>
 <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
这里主要添加的依赖是一个“计划器”( planner ),它是 Table API 的核心组件,负责提供
运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner 。由于 Flink
装包的 lib 目录下会自带 planner ,所以在生产集群环境中提交的作业不需要打包这个依赖。
而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加
一个 Scala 版流处理的相关依赖。
另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-common</artifactId>
 <version>${flink.version}</version>
</dependency>
11.1.2 一个简单示例
有了基本的依赖,接下来我们就可以尝试在 Flink 代码中使用 Table API SQL 了。比如,
我们可以自定义一些 Event 类型(包含了 user url timestamp 三个字段,参考 5.2.1 小节中
的定义)的用户访问事件,作为输入的数据源;而后从中提取 url 地址和用户名 user 两个字段
作为输出。
如果使用 DataStream API ,我们可以直接读取数据源后,用一个简单转换算子 map 来做字
段的提取。而这个需求直接写 SQL 的话,实现会更加简单:
select url, user from EventTable;
这里我们把流中所有数据组成的表叫作 EventTable 。在 Flink 代码中直接对这个表执行上
面的 SQL ,就可以得到想要提取的数据了。
在代码中具体实现如下:
public class TableExample {
 public static void main(String[] args) throws Exception {
 // 获取流执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 读取数据源
 SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 5 * 1000L),
 new Event("Cary", "./home", 60 * 1000L),
 new Event("Bob", "./prod?id=3", 90 * 1000L),
 new Event("Alice", "./prod?id=7", 105 * 1000L)
 );
 // 获取表环境
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 // 将数据流转换成表
 Table eventTable = tableEnv.fromDataStream(eventStream);
 // 用执行 SQL 的方式提取数据
 Table visitTable = tableEnv.sqlQuery("select url, user from " + eventTable);
 // 将表转换成数据流,打印输出
 tableEnv.toDataStream(visitTable).print();
 // 执行程序
 env.execute();
 }
}
这里我们需要创建一个“表环境”( TableEnvironment ),然后将数据流( DataStream )转
换成一个表( Table );之后就可以执行 SQL 在这个表中查询数据了。查询得到的结果依然是
一个表,把它重新转换成流就可以打印输出了。
代码执行的结果如下:
+I[./home, Alice]
+I[./cart, Bob]
+I[./prod?id=1, Alice]
+I[./home, Cary]
+I[./prod?id=3, Bob]
+I[./prod?id=7, Alice]
可以看到,我们将原始的 Event 数据转换成了 (url user) 这样类似二元组的类型。每行输
306 出前面有一个“ +I ”标志,这是表示每条数据都是“插入”( Insert )到表中的新增数据。
Table Table API 中的核心接口类,对应着我们熟悉的“表”的概念。基于 Table 我们也
可以调用一系列查询方法直接进行转换,这就是所谓 Table API 的处理方式:
// Table API 方式提取数据
Table clickTable2 = eventTable.select($("url"), $("user"));
这里的 $ 符号是 Table API 中定义的“表达式”类 Expressions 中的一个方法,传入一个字
段名称,就可以指代数据中对应字段。将得到的表转换成流打印输出,会发现结果与直接执行
SQL 完全一样。
11.2 基本 API
通过上节中的简单示例,我们已经对 Table API SQL 的用法有了大致的了解;本节就继
续展开,对 API 的相关用法做一个详细的说明。
11.2.1 程序架构
Flink 中, Table API SQL 可以看作联结在一起的一套 API ,这套 API 的核心概念就
是“表”( Table )。在我们的程序中,输入数据可以定义成一张表;然后对这张表进行查询,
就可以得到新的表,这相当于就是流数据的转换操作;最后还可以定义一张用于输出的表,负
责将处理结果写入到外部系统。
我们可以看到,程序的整体处理流程与 DataStream API 非常相似,也可以分为读取数据
源( Source )、转换( Transform )、输出数据( Sink )三部分;只不过这里的输入输出操作不需
要额外定义,只需要将用于输入和输出的表定义出来,然后进行转换查询就可以了。
程序基本架构如下:
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");
// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// 执行 SQL 对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");
// 使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");
与上一节中不同,这里不是从一个 DataStream 转换成 Table ,而是通过执行 DDL 来直接
创建一个表。这里执行的 CREATE 语句中用 WITH 指定了外部系统的连接器,于是就可以连
接外部系统读取数据了。这其实是更加一般化的程序架构,因为这样我们就可以完全抛开
DataStream API ,直接用 SQL 语句实现全部的流处理过程。
而后面对于输出表的定义是完全一样的。可以发现,在创建表的过程中,其实并不区分“输
入”还是“输出”,只需要将这个表“注册”进来、连接到外部系统就可以了;这里的 inputTable
outputTable 只是注册的表名,并不代表处理逻辑,可以随意更换。至于表的具体作用,则要
等到执行后面的查询转换操作时才能明确。我们直接从 inputTable 中查询数据,那么 inputTable
就是输入表;而 outputTable 会接收另外表的结果进行写入,那么就是输出表。
在早期的版本中,有专门的用于输入输出的 TableSource TableSink ,这与流处理里的概
念是一一对应的;不过这种方式与关系型表和 SQL 的使用习惯不符,所以已被弃用,不再区
Source Sink
11.2.2 创建表环境
对于 Flink 这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用 Table
API SQL 需要一个特别的运行时环境,这就是所谓的“表环境”( TableEnvironment )。它主
要负责:
1 )注册 Catalog 和表;
2 )执行 SQL 查询;
3 )注册用户自定义函数( UDF );
4 DataStream 和表之间的转换。
这里的 Catalog 就是“目录”,与标准 SQL 中的概念是一致的,主要用来管理所有数据库
database )和表( table )的元数据( metadata )。通过 Catalog 可以方便地对数据库和表进行
查询的管理,所以可以认为我们所定义的表都会“挂靠”在某个目录下,这样就可以快速检索。
在表环境中可以由用户自定义 Catalog ,并在其中注册表和自定义函数( UDF )。默认的 Catalog
就叫作 default_catalog
每个表和 SQL 的执行,都必须绑定在一个表环境( TableEnvironment )中。 TableEnvironment
Table API 中提供的基本接口类,可以通过调用静态的 create() 方法来创建一个表环境实例。
方法需要传入一个环境的配置参数 EnvironmentSettings ,它可以指定当前表环境的执行模式和
计划器( planner )。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使
blink planner
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .inStreamingMode() // 使用流处理模式
 .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式
来创建表环境:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
这 里 我 们 引 入 了 一 个 “ 流 式 表 环 境 ”( StreamTableEnvironment ), 它 是 继 承 自
TableEnvironment 的子接口。调用它的 create() 方法,只需要直接将当前的流执行环境
StreamExecutionEnvironment )传入,就可以创建出对应的流式表环境了。这也正是我们在
上一节简单示例中使用的方式。
11.2.3 创建表
表( Table )是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也
SQL 执行的基本对象。 Flink 中的表概念也并不特殊,是由多个 数据构成的,每个行( Row
又可以有定义好的多个列( Column )字段;整体来看,表就是固定类型的数据组成的二维矩
阵。
为了方便地查询表,表环境中会维护一个目录( Catalog )和表的对应关系。所以表都是
通过 Catalog 来进行注册创建的。表在环境中有一个唯一的 ID ,由三部分组成:目录( catalog
名,数据库( database )名,以及表名。在默认情况下,目录名为 default_catalog ,数据库名为
default_database 。所以如果我们直接创建一个叫作 MyTable 的表,它的 ID 就是:
default_catalog.default_database.MyTable
具体创建表的方式,有通过连接器( connector )和虚拟表( virtual tables )两种。
1. 连接器表( Connector Tables
最直观的创建表的方式,就是通过连接器( connector )连接到一个外部系统,然后定义出
对应的表结构。例如我们可以连接到 Kafka 或者文件系统,将存储在这些外部系统的数据以“表”
的形式定义出来,这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表
环境中读取这张表,连接器就会从外部系统读取数据并进行转换;而当我们向这张表写入数据,
连接器就会将数据输出( Sink )到外部系统中。
在代码中,我们可以调用表环境的 executeSql() 方法,可以传入一个 DDL 作为参数执行
SQL 操作。这里我们传入一个 CREATE 语句进行表的创建,并通过 WITH 关键字指定连接到
外部系统的连接器:
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector'
= ... )");
这里的 TEMPORARY 关键字可以省略。关于连接器的具体定义,我们会在 11.8 节中展开
讲解。
这里没有定义 Catalog Database , 所 以 都 是 默 认 的 , 表 的 完 整 ID 就 是
default_catalog.default_database.MyTable 。如果希望使用自定义的目录名和库名,可以在环境中
进行设置:
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
这样我们创建的表完整 ID 就变成了 custom_catalog.custom_database.MyTable 。之后在表
环境中创建的所有表, ID 也会都以 custom_catalog.custom_database 作为前缀。
2. 虚拟表( Virtual Tables
在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
这里调用了表环境的 sqlQuery() 方法,直接传入一条 SQL 语句作为参数执行查询,得到的
结果是一个 Table 对象。 Table Table API 中提供的核心接口类,就代表了一个 Java 中定义的
表实例。
得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL ,又该
怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册;所以我们还需要将这
个中间结果表注册到环境中,才能在 SQL 中使用:
tableEnv.createTemporaryView("NewTable", newTable);
我们发现,这里的注册其实是创建了一个“虚拟表”( Virtual Table )。这个概念与 SQL
法中的视图( View )非常类似,所以调用的方法也叫作创建“虚拟视图”( createTemporaryView )。
视图之所以是“虚拟”的,是因为我们并不会直接保存这个表的内容,并没有“实体”;只是
在用到这张表的时候,会将它对应的查询语句嵌入到 SQL 中。
注册为虚拟表之后,我们就又可以在 SQL 中直接使用 NewTable 进行查询转换了。不难看
到,通过虚拟表可以非常方便地让 SQL 分步骤执行得到中间结果,这为代码编写提供了很大
的便利。
另外,虚拟表也可以让我们在 Table API SQL 之间进行自由切换。一个 Java 中的 Table
对象可以直接调用 Table API 中定义好的查询转换方法,得到一个中间结果表;这跟对注册好
的表直接执行 SQL 结果是一样的。具体我们会在下一小节继续讲解。
310 11.2.4 表的查询
创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询( Query )操作,就
对应着流数据的转换( Transform )处理。
Flink 为我们提供了两种查询方式: SQL Table API
1. 执行 SQL 进行查询
基于表执行 SQL 语句,是我们最为熟悉的查询方式。 Flink 基于 Apache Calcite 来提供对
SQL 的支持, Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具,很多大数据框
架比如 Apache Hive Apache Kylin 中的 SQL 支持都是通过集成 Calcite 来实现的。
在代码中,我们只要调用表环境的 sqlQuery() 方法,传入一个字符串形式的 SQL 查询语句
就可以了。执行得到的结果,是一个 Table 对象。
// 创建表环境
TableEnvironment tableEnv = ...; 
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查询用户 Alice 的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
 "SELECT user, url " +
 "FROM EventTable " +
 "WHERE user = 'Alice' "
 );
目前 Flink 支持标准 SQL 中的绝大部分用法,并提供了丰富的计算函数。这样我们就可
以把已有的技术迁移过来,像在 MySQL Hive 中那样直接通过编写 SQL 实现自己的处理需
求,从而大大降低了 Flink 上手的难度。
例如,我们也可以通过 GROUP BY 关键字定义分组聚合,调用 COUNT() SUM() 这样的
函数来进行统计计算:
Table urlCountTable = tableEnv.sqlQuery(
 "SELECT user, COUNT(url) " +
 "FROM EventTable " +
 "GROUP BY user "
 );
上面的例子得到的是一个新的 Table 对象,我们可以再次将它注册为虚拟表继续在 SQL
中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的
executeSql() 方法来执行 DDL ,传入的是一个 INSERT 语句:
// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
311
// 将查询结果输出到 OutputTable 中
tableEnv.executeSql (
"INSERT INTO OutputTable " +
 "SELECT user, url " +
 "FROM EventTable " +
 "WHERE user = 'Alice' "
 );
2. 调用 Table API 进行查询
另外一种查询方式就是调用 Table API 。这是嵌入在 Java Scala 语言内的查询 API ,核
心就是 Table 接口类,通过一步步链式调用 Table 的方法,就可以定义出所有的查询转换操作。
每一步方法调用的返回结果,都是一个 Table
由于 Table API 是基于 Table Java 实例进行调用的,因此我们首先要得到表的 Java 对象。
基于环境中已注册的表,可以通过表环境的 from() 方法非常容易地得到一个 Table 对象:
Table eventTable = tableEnv.from("EventTable");
传入的参数就是注册好的表名。注意这里 eventTable 是一个 Table 对象,而 EventTable
在环境中注册的表名。得到 Table 对象之后,就可以调用 API 进行各种转换操作了,得到的是
一个新的 Table 对象:
Table maryClickTable = eventTable
.where($("user").isEqual("Alice"))
.select($("url"), $("user"));
这里每个方法的参数都是一个“表达式”( Expression ),用方法调用的形式直观地说明了
想要表达的内容;“ $ ”符号用来指定表中的一个字段。上面的代码和直接执行 SQL 是等效的。
Table API 是嵌入编程语言中的 DSL SQL 中的很多特性和功能必须要有对应的实现才可
以使用,因此跟直接写 SQL 比起来肯定就要麻烦一些。目前 Table API 支持的功能相对更少,
可以预见未来 Flink 社区也会以扩展 SQL 为主,为大家提供更加通用的接口方式;所以我们
接下来也会以介绍 SQL 为主,简略地提及 Table API
3. 两种 API 的结合使用
可以发现,无论是调用 Table API 还是执行 SQL ,得到的结果都是一个 Table 对象;所以
这两种 API 的查询可以很方便地结合在一起。
1 )无论是那种方式得到的 Table 对象,都可以继续调用 Table API 进行查询转换;
2 )如果想要对一个表执行 SQL 操作(用 FROM 关键字引用),必须先在环境中对它进
行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:
tableEnv.createTemporaryView("MyTable", myTable);
注意:这里的第一个参数 "MyTable" 是注册的表名,而第二个参数 myTable Java 中的
Table 对象。
另外要说明的是,在 11.1.2 小节的简单示例中,我们并没有将 Table 对象注册为虚拟表就
直接在 SQL 中使用了:
312 313
Table clickTable = tableEnvironment.sqlQuery("select url, user from " +
eventTable);
这其实是一种简略的写法,我们将 Table 对象名 eventTable 直接以字符串拼接的形式添加
SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视
图的步骤。
两种 API 殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易
引起混淆,而 Table API 功能相对较少、通用性较差,所以企业项目中往往会直接选择 SQL
方式来实现需求。
11.2.5 输出表
表的创建和查询,就对应着流处理中的读取数据源( Source )和转换( Transform );而最
后一个步骤 Sink ,也就是将结果数据输出到外部系统,就对应着表的输出操作。
在代码上,输出一张表最直接的方法,就是调用 Table 的方法 executeInsert() 方法将一个
Table 写入到注册过的表中,方法传入的参数就是注册的表名。
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
在底层,表的输出是通过将数据写入到 TableSink 来实现的。 TableSink Table API 中提
供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如 CSV Parquet )、
存储数据库(比如 JDBC HBase Elasticsearch )和消息队列(比如 Kafka )。它有些类似于
DataStream API 中调用 addSink() 方法时传入的 SinkFunction ,有不同的连接器对它进行了实现。
关于不同外部系统的连接器,我们会在 11.8 节展开介绍。
这里可以发现,我们在环境中注册的 ,其实在写入数据的时候就对应着一个 TableSink
11.2.6 表和流的转换
从创建表环境开始,历经表的创建、查询转换和输出,我们已经可以使用 Table API SQL
进行完整的流处理了。不过在应用的开发过程中,我们测试业务逻辑一般不会直接将结果直接
写入到外部系统,而是在本地控制台打印输出。对于 DataStream 这非常容易,直接调用 print()
方法就可以看到结果数据流的内容了;但对于 Table 就比较悲剧——它没有提供 print() 方法。
这该怎么办呢?
Flink 中我们可以将 Table 再转换成 DataStream ,然后进行打印输出。这就涉及了表和 流的转换。
1. 将表( Table )转换成流( DataStream
1 )调用 toDataStream() 方法
将一个 Table 对象转换成 DataStream 非常简单,只要直接调用表环境的方法 toDataStream()
就可以了。例如,我们可以将 11.2.4 小节经查询转换得到的表 maryClickTable 转换成流打印输
出,这代表了“ Mary 点击的 url 列表”:
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user, url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
// 将表转换成数据流
tableEnv.toDataStream(aliceVisitTable).print();
这里需要将要转换的 Table 对象作为参数传入。
2 )调用 toChangelogStream() 方法
maryClickTable 转换成流打印输出是很简单的;然而,如果我们同样希望将“用户点击
次数统计”表 urlCountTable 进行打印输出,就会抛出一个 TableException 异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink
'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't
support consuming update changes ...
这表示当前的 TableSink 并不支持表的更新( update )操作。这是什么意思呢?
因为 print 本身也可以看作一个 Sink 操作,所以这个异常就是说打印输出的 Sink 操作不
支持对数据进行更新。具体来说, urlCountTable 这个表中进行了分组聚合统计,所以表中的
每一行是会“更新”的。也就是说, Alice 的第一个点击事件到来,表中会有一行 (Alice, 1)
第二个点击事件到来,这一行就要更新为 (Alice, 2) 。但之前的 (Alice, 1) 已经打印输出了,“覆
水难收”,我们怎么能对它进行更改呢?所以就会抛出异常。
解决的思路是,对于这样有更新操作的表,我们不要试图直接把它转换成 DataStream
印输出,而是记录一下它的“更新日志”( change log )。这样一来,对于表的所有更新操作,
就变成了一条更新日志的流,我们就可以转换成流打印输出了。
代码中需要调用的是表环境的 toChangelogStream() 方法:
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user, COUNT(url) " +
"FROM EventTable " +
"GROUP BY user "
);
// 将表转换成更新日志流
tableEnv.toDataStream(urlCountTable).print();
与“更新日志流”( Changelog Streams )对应的,是那些只做了简单转换、没有进行聚合
314 统计的表,例如前面提到的 maryClickTable 。它们的特点是数据只会插入、不会更新,所以也
被叫作“仅插入流”( Insert-Only Streams )。
2. 将流( DataStream )转换成表( Table
1 )调用 fromDataStream() 方法
想要将一个 DataStream 转换成表也很简单,可以通过调用表环境的 fromDataStream() 方法
来实现,返回的就是一个 Table 对象。例如,我们可以直接将事件流 eventStream 转换成一个
表:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
// 将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);
由于流中的数据本身就是定义好的 POJO 类型 Event ,所以我们将流转换成表之后,每一
行数据就对应着一个 Event ,而表中的列名就对应着 Event 中的属性。
另外,我们还可以在 fromDataStream() 方法中增加参数,用来指定提取哪些属性作为表中
的字段名,并可以任意指定位置:
// 提取 Event 中的 timestamp url 作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"),
$("url"));
需要注意的是, timestamp 本身是 SQL 中的关键字,所以我们在定义表名、列名时要尽量
避免。这时可以通过表达式的 as() 方法对字段进行重命名:
// timestamp 字段重命名为 ts
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),
$("url"));
2 )调用 createTemporaryView() 方法
调用 fromDataStream() 方法简单直观,可以直接实现 DataStream Table 的转换;不过如
果我们希望直接在 SQL 中引用这张表,就还需要调用表环境的 createTemporaryView() 方法来
创建虚拟视图了。
对于这种场景,也有一种更简洁的调用方式。我们可以直接调用 createTemporaryView()
方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是
DataStream 。之后仍旧可以传入多个参数,用来指定表中的字段
tableEnv.createTemporaryView("EventTable", eventStream,
$("timestamp").as("ts"),$("url"));
这样,我们接下来就可以直接在 SQL 中引用表 EventTable 了。
315 3 )调用 fromChangelogStream () 方法
表环境还提供了一个方法 fromChangelogStream() ,可以将一个更新日志流转换成表。这
个方法要求流中的数据类型只能是 Row ,而且每一个数据都需要指定当前行的更新类型
RowKind );所以一般是由连接器帮我们实现的,直接应用比较少见,感兴趣的读者可以查
看官网的文档说明。
3. 支持的数据类型
前面示例中的 DataStream ,流中的数据类型都是定义好的 POJO 类。如果 DataStream
的类型是简单的基本类型,还可以直接转换成表吗?这就涉及了 Table 中支持的数据类型。
整体来看, DataStream 中支持的数据类型, Table 中也是都支持的,只不过在进行转换时
需要注意一些细节。
1 )原子类型
Flink 中,基础数据类型( Integer Double String )和通用数据类型(也就是不可再拆
分的数据类型)统一称作 原子类型 。原子类型的 DataStream ,转换之后就成了只有一列的
Table ,列字段( field )的数据类型可以由原子类型推断出。另外,还可以在 fromDataStream()
方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv = ...;
DataStream<Long> stream = ...;
// 将数据流转换成动态表,动态表只有一个字段,重命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
2 Tuple 类型
当原子类型不做重命名时,默认的字段名就是“ f0 ”,容易想到,这其实就是将原子类型
看作了一元组 Tuple1 的处理结果。
Table 支持 Flink 中定义的元组类型 Tuple ,对应在表中字段名默认就是元组中元素的属性
f0 f1 f2... 。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通
过调用表达式的 as() 方法来进行重命名。
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...;
// 将数据流转换成只包含 f1 字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
// 将数据流转换成包含 f0 f1 字段的表,在表中 f0 f1 位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
// f1 字段命名为 myInt f0 命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"),
$("f0").as("myLong"));
316 3 POJO 类型
Flink 也支持多种数据类型组合成的 复合类型 ,最典型的就是简单 Java 对象( POJO
型)。由于 POJO 中已经定义好了可读性强的字段名,这种类型的数据流转换成 Table 就显得
无比顺畅了。
POJO 类型的 DataStream 转换成 Table ,如果不指定字段名称,就会直接使用原始 POJO
类型中的字段名称。 POJO 中的字段同样可以被重新排序、提却和重命名,这在之前的例子中
已经有过体现。
StreamTableEnvironment tableEnv = ...;
DataStream<Event> stream = ...;
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),
$("url").as("myUrl"));
4 Row 类型
Flink 中还定义了一个在关系型表中更加通用的数据类型——行( Row ),它是 Table 中数
据的基本组织形式。 Row 类型也是一种复合类型,它的长度固定,而且无法直接推断出每个
字段的类型,所以在使用时必须指明具体的类型信息;我们在创建 Table 时调用的 CREATE
语句就会将所有的字段名称和类型指定,这在 Flink 中被称为表的“模式结构”( Schema )。除
此之外, Row 类型还附加了一个属性 RowKind ,用来表示当前行在更新操作中的类型。这样,
Row 就可以用来表示更新日志流( changelog stream )中的数据,从而架起了 Flink 中流和表的
转换桥梁。
所以在更新日志流中,元素的类型必须是 Row ,而且需要调用 ofKind() 方法来指定更新类
型。下面是一个具体的例子:
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
4. 综合应用示例
现在,我们可以将介绍过的所有 API 整合起来,写出一段完整的代码。同样还是用户的
一组点击事件,我们可以查询出某个用户(例如 Alice )点击的 url 列表,也可以统计出每个
用户累计的点击次数,这可以用两句 SQL 来分别实现。具体代码如下:
public class TableToStreamExample {
 public static void main(String[] args) throws Exception {
 // 获取流环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 读取数据源
 SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 5 * 1000L),
 new Event("Cary", "./home", 60 * 1000L),
 new Event("Bob", "./prod?id=3", 90 * 1000L),
 new Event("Alice", "./prod?id=7", 105 * 1000L)
 );
 // 获取表环境
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 // 将数据流转换成表
tableEnv.createTemporaryView("EventTable", eventStream);
 // 查询 Alice 的访问 url 列表
 Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable
WHERE user = 'Alice'");
 
// 统计每个用户的点击次数
 Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM
EventTable GROUP BY user");
 // 将表转换成数据流,在控制台打印输出
 tableEnv.toDataStream(aliceVisitTable).print("alice visit");
 tableEnv.toChangelogStream(urlCountTable).print("count");
 
 // 执行程序
 env.execute();
 }
}
用户 Alice 的点击 url 列表只需要一个简单的条件查询就可以得到,对应的表中只有插入
操作,所以我们可以直接调用 toDataStream() 将它转换成数据流,然后打印输出。控制台输出
的结果如下:
alice visit > +I[./home, Alice]
alice visit > +I[./prod?id=1, Alice]
alice visit > +I[./prod?id=7, Alice]
这里每条数据前缀的 +I 就是 RowKind ,表示 INSERT (插入)。
而由于统计点击次数时用到了分组聚合,造成结果表中数据会有更新操作,所以在打印输
出时需要将表 urlCountTable 转换成更新日志流( changelog stream )。控制台输出的结果如下:
count> +I[Alice, 1]
count> +I[Bob, 1]
count> -U[Alice, 1]
count> +U[Alice, 2]
count> +I[Cary, 1]
count> -U[Bob, 1]
count> +U[Bob, 2]
count> -U[Alice, 2]
count> +U[Alice, 3]
这里数据的前缀出现了 +I -U +U 三种 RowKind ,分别表示 INSERT (插入)、
UPDATE_BEFORE (更新前)和 UPDATE_AFTER (更新后)。当收到每个用户的第一次点击
事件时,会在表中插入一条数据,例如 +I[Alice, 1] +I[Bob, 1] 。而之后每当用户增加一次点击
事件,就会带来一次更新操作,更新日志流( changelog stream )中对应会出现两条数据,分
别表示之前数据的失效和新数据的生效;例如当 Alice 的第二条点击数据到来时,会出现一个
-U[Alice, 1] 和一个 +U[Alice, 2] ,表示 Alice 的点击个数从 1 变成了 2
这种表示更新日志的方式,有点像是声明“撤回”了之前的一条数据、再插入一条更新后
的数据,所以也叫作“撤回流”( Retract Stream )。关于表到流转换过程的编码方式,我们会在
下节进行更深入的讨论。
11.3 流处理中的表
上一节中介绍了 Table API SQL 的基本使用方法。我们会发现,在 Flink 中使用表和 SQL
基本上跟其它场景是一样的;不过对于表和流的转换,却稍显复杂。当我们将一个 Table 转换
DataStream 时,有“仅插入流”( Insert-Only Streams )和“更新日志流”( Changelog Streams
两种不同的方式,具体使用哪种方式取决于表中是否存在更新( update )操作。
这种麻烦其实是不可避免的。我们知道, Table API SQL 本质上都是基于关系型表的操
作方式;而关系型表( Table )本身是有界的,更适合批处理的场景。所以在 MySQL Hive
这样的固定数据集中进行查询,使用 SQL 就会显得得心应手。而对于 Flink 这样的流处理框
架来说,要处理的是源源不断到来的无界数据流,我们无法等到数据都到齐再做查询,每来一
条数据就应该更新一次结果;这时如果一定要使用表和 SQL 进行处理,就会显得有些别扭了,
需要引入一些特殊的概念。
我们可以将关系型表 /SQL 与流处理做一个对比,如表 11-1 所示。
                                        表 11-1 关系型数据库的查询和流查询的对比
关系型表/SQL流处理
处理的数据对象字段元组的有界集合字段元组的无限序列
查询(Query)对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
查询终止条件生成固定大小的结果集后终止永不停止,根据持续收到的数据不断更新查询结果
可以看到,其实关系型表和 SQL ,主要就是针对批处理设计的,这和流处理有着天生的
隔阂。既然“八字不合”,那 Flink 中的 Table API SQL 又是怎样做流处理的呢?接下来我
们就来深入探讨一下流处理中表的概念。
11.3.1 动态表和持续查询
流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库
中的表完全不同;而基于表执行的查询操作,也就有了新的含义。
如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;如果进一步基
于表执行 SQL 查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。
1. 动态表( Dynamic Tables
当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该
在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”( Dynamic
Tables )。
动态表是 Flink Table API SQL 中的核心概念,它为流数据处理提供了表和 SQL 支持。
我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态
表则完全不同,它里面的数据会随时间变化。
其实动态表的概念,我们在传统的关系型数据库中已经有所接触。数据库中的表,其实是
一系列 INSERT UPDATE DELETE 语句执行的结果;在关系型数据库中,我们一般把它
称为更新日志流( changelog stream )。如果我们保存了表在某一时刻的快照( snapshot ),那么
接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型
数据库(比如 Oracle DB2 )中都有“物化视图”( Materialized Views )的概念,可以用来缓
SQL 查询的结果;它的更新其实就是不停地处理更新日志流的过程。
Flink 中的动态表,就借鉴了物化视图的思想。
2. 持续查询( Continuous Query
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义
SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不
会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”( Continuous
Query )。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。
由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输
入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”( snapshot ),当作有
限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就
构成了“持续查询”。
如图 11-2 所示,描述了持续查询的过程。这里我们也可以清晰地看到流、动态表和持续
查询的关系:
持续查询的步骤如下:
1 )流( stream )被转换为动态表( dynamic table );
2 )对动态表进行持续查询( continuous query ),生成新的动态表;
3 )生成的动态表被转换成流。
这样,只要 API 将流和动态表的转换封装起来,我们就可以直接在数据流上执行 SQL
询,用处理表的方式来做流处理了。
11.3.2 将流转换成动态表
为了能够使用 SQL 来做流处理,我们必须先把流( stream )转换成动态表。当然,之前
在讲解基本 API 时,已经介绍过代码中的 DataStream Table 如何转换;现在我们则要抛开
具体的数据类型,从原理上理解流和动态表的转换过程。
如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入( Insert
操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只
能在后面追加;所以我们其实是通过一个只有插入操作( insert-only )的更新日志( changelog
流,来构建一个表。
为了更好地说明流转换成动态表的过程,我们还是用 11.2 节中举的例子来做分析说明。
// 获取流环境
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源
SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 5 * 1000L),
 new Event("Cary", "./home", 60 * 1000L),
 new Event("Bob", "./prod?id=3", 90 * 1000L),
321
 new Event("Alice", "./prod?id=7", 105 * 1000L)
 );
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表
tableEnv.createTemporaryView("EventTable", eventStream, $("user"), $("url"), 
$("timestamp").as("ts"));
 
// 统计每个用户的点击次数
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM
EventTable GROUP BY user");
// 将表转换成数据流,在控制台打印输出
tableEnv.toChangelogStream(urlCountTable).print("count");
 
// 执行程序
env.execute();
我们现在的输入数据,就是用户在网站上的点击访问行为,数据类型被包装为 POJO 类型
Event 。我们将它转换成一个动态表,注册为 EventTable 。表中的字段定义如下:
[
 user: VARCHAR, // 用户名
 url: VARCHAR, // 用户访问的 URL
 ts: BIGINT // 时间戳
]
如图 11-3 所示,当用户点击事件到来时,就对应着动态表中的一次插入( Insert )操作,
每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。
11.3.2 SQL 持续查询
1. 更新( Update )查询
我们在代码中定义了一个 SQL 查询。
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM
EventTable GROUP BY user");
这个查询很简单,主要是分组聚合统计每个用户的点击次数。我们把原始的动态表注册为
EventTable ,经过查询转换后得到 urlCountTable ;这个结果动态表中包含两个字段,具体定义
如下:
[
user: VARCHAR, // 用户名
cnt: BIGINT // 用户访问 url 的次数
]
如图 11-4 所示,当原始动态表不停地插入新的数据时,查询得到的 urlCountTable 会持续
地进行更改。由于 count 数量可能会叠加增长,因此这里的更改操作可以是简单的插入( Insert ),
也可以是对之前数据的更新( Update )。换句话说,用来定义结果表的更新日志( changelog
流中,包含了 INSERT UPDATE 两种操作。这种持续查询被称为更新查询( Update Query ),
更新查询得到的结果表如果想要转换成 DataStream ,必须调用 toChangelogStream() 方法。
具体步骤解释如下:
1 )当查询启动时,原始动态表 EventTable 为空;
2 )当第一行 Alice 的点击数据插入 EventTable 表时,查询开始计算结果表, urlCountTable
323 中插入一行数据 [Alice 1]
3 )当第二行 Bob 点击数据插入 EventTable 表时,查询将更新结果表并插入新行 [Bob
1]
4 )第三行数据到来,同样是 Alice 的点击事件,这时不会插入新行,而是生成一个针
对已有行的更新操作。这样,结果表中第一行 [Alice 1] 就更新为 [Alice 2]
5 )当第四行 Cary 的点击数据插入到 EventTable 表时,查询将第三行 [Cary 1] 插入到
结果表中。
2. 追加( Append )查询
上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一
个简单的条件查询,结果表中就会像原始表 EventTable 一样,只有插入( Insert )操作了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE
user = 'Cary'");
这样的持续查询,就被称为追加查询( Append Query ),它定义的结果表的更新日志
changelog )流中只有 INSERT 操作。追加查询得到的结果表,转换成 DataStream 调用方法
没有限制,可以直接用 toDataStream() ,也可以像更新查询一样调用 toChangelogStream()
这样看来,我们似乎可以总结一个规律:只要用到了聚合,在之前的结果上有叠加,就会
产生更新操作,就是一个更新查询。但事实上,更新查询的判断标准是结果表中的数据是否会
UPDATE 操作,如果聚合的结果不再改变,那么同样也不是更新查询。
什么时候聚合的结果会保持不变呢?一个典型的例子就是窗口聚合。
我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个
endT 字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:
[
user: VARCHAR, // 用户名
endT: TIMESTAMP, // 窗口结束时间
cnt: BIGINT // 用户访问 url 的次数
]
如图 11-5 所示,与之前的分组聚合一样,当原始动态表不停地插入新的数据时,查询得
到的结果 result 会持续地进行更改。比如时间戳在 12:00:00 12:59:59 之间的有四条数据,其
Alice 三次点击、 Bob 一次点击;所以当水位线达到 13:00:00 时窗口关闭,输出到结果表中
的就是新增两条数据 [Alice, 13:00:00, 3] [Bob, 13:00:00, 1] 。同理,当下一小时的窗口关闭时,
也会将统计结果追加到 result 表后面,而不会更新之前的数据。
所以我们发现,由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中
只会包含插入 INSERT 操作,而没有更新 UPDATE 操作。所以这里的持续查询,依然是一个
追加( Append )查询。结果表 result 如果转换成 DataStream ,可以直接调用 toDataStream()
法。
需要注意的是,由于涉及时间窗口,我们还需要为事件时间提取时间戳和生成水位线。完
整代码如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class AppendQueryExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
 SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
325
 new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
 new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
 new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
 new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L) 
 )
 .assignTimestampsAndWatermarks(
 WatermarkStrategy.<Event>forMonotonousTimestamps()
 .withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {
 @Override
 public long extractTimestamp(Event element, long 
recordTimestamp) {
 return element.timestamp;
 }
 })
 );
// 创建表环境
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
 Table eventTable = tableEnv.fromDataStream(
 eventStream,
 $("user"),
 $("url"),
 $("timestamp").rowtime().as("ts") 
// 将 timestamp 指定为事件时间,并命名为 ts
 );
// 为方便在 SQL 中引用,在环境中注册表 EventTable
 tableEnv.createTemporaryView("EventTable", eventTable);
// 设置 1 小时滚动窗口,执行 SQL 统计查询
 Table result = tableEnv
 .sqlQuery(
 "SELECT " +
 "user, " +
 "window_end AS endT, " + // 窗口结束时间
 "COUNT(url) AS cnt " + // 统计 url 访问次数
 "FROM TABLE( " +
 "TUMBLE( TABLE EventTable, " + // 1 小时滚动窗口
 "DESCRIPTOR(ts), " +
 "INTERVAL '1' HOUR)) " +
 "GROUP BY user, window_start, window_end "
 );
 tableEnv.toDataStream(result).print();
326
 env.execute();
 }
}
运行结果如下:
+I[Alice, 1970-01-01T01:00, 3]
+I[Bob, 1970-01-01T01:00, 1]
+I[Cary, 1970-01-01T02:00, 2]
+I[Bob, 1970-01-01T02:00, 1]
可以看到,所有输出结果都以 +I 为前缀,表示都是以 INSERT 操作追加到结果表中的;
这是一个追加查询,所以我们直接使用 toDataStream() 转换成流是没有问题的。这里输出的
window_end 是一个 TIMESTAMP 类型;由于我们直接以一个长整型数作为事件发生的时间戳,
所以可以看到对应的都是 1970 1 1 日的时间。
关于 Table API SQL 中窗口和聚合查询的使用,我们会在后面详细讲解。
3. 查询限制
在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的 代价太高 ,可能
是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。
状态大小
用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能
非常大。例如我们之前举的更新查询的例子,需要记录每个用户访问 url 的次数。如果随着时
间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查
询失败。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
更新计算
对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可
能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是 RANK() 函数,
它会基于一组数据计算当前值的排名。例如下面的 SQL 查询,会根据用户最后一次点击的时
间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间( lastAction
就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被
他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多
越来越严重。
SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
SELECT user, MAX(ts) AS lastAction FROM EventTable GROUP BY user
);
这样的查询操作,就不太适合作为连续查询在流处理中执行。这里 RANK() 的使用要配合
一个 OVER 子句,这是所谓的 开窗聚合 ,我们会在 11.5 节展开介绍。
327 11.3.3 将动态表转换为流
与关系型数据库中的表一样,动态表也可以通过插入( Insert )、更新( Update )和删除( Delete
操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进
行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在 Flink 中, Table API SQL
支持三种编码方式:
仅追加( Append-only )流
仅通过插入( Insert )更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发
出的数据,其实就是动态表中新增的每一行。
撤回( Retract )流
撤回流是包含两类消息的流,添加( add )消息和撤回( retract )消息。
具体的编码规则是: INSERT 插入操作编码为 add 消息; DELETE 删除操作编码为 retract
消息;而 UPDATE 更新操作则编码为被更改行的 retract 消息,和更新后行(新行)的 add
息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回
流了。
可以看到,更新操作对于撤回流来说,对应着两个消息:之前数据的撤回(删除)和新数
据的插入。
如图 11-6 所示,显示了将动态表转换为撤回流的过程。
这里我们用 + 代表 add 消息(对应插入 INSERT 操作),用 - 代表 retract 消息(对应删除
DELETE 操作);当 Alice 的第一个点击事件到来时,结果表新增一条数据 [Alice, 1] ;而当 Alice
的第二个点击事件到来时,结果表会将 [Alice, 1] 更新为 [Alice, 2] ,对应的编码就是删除 [Alice, 1]
插入 [Alice, 2] 。这样当一个外部系统收到这样的两条消息时,就知道是要对 Alice 的点击统计
328 次数进行更新了。
更新插入( Upsert )流
更新插入流中只包含两种类型的消息:更新插入( upsert )消息和删除( delete )消息。
所谓的 “upsert” 其实是 “update” “insert” 的合成词,所以对于更新插入流来说, INSERT
入操作和 UPDATE 更新操作,统一被编码为 upsert 消息;而 DELETE 删除操作则被编码为 delete
消息。
既然更新插入流中不区分插入( insert )和更新( update ),那我们自然会想到一个问题:
如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?
这就需要动态表中必须有唯一的键( key )。通过这个 key 进行查询,如果存在对应的数据
就做更新( update ),如果不存在就直接插入( insert )。这是一个动态表可以转换为更新插入流
的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键( key ),这样才能
正确地处理消息。
如图 11-7 所示,显示了将动态表转换为更新插入流的过程。
可以看到,更新插入流跟撤回流的主要区别在于,更新( update )操作由于有 key 的存在,
只需要用单条消息编码就可以,因此效率更高。
需要注意的是,在代码里将动态表转换为 DataStream 时,只支持仅追加( append-only
和撤回( retract )流,我们调用 toChangelogStream() 得到的其实就是撤回流;这也很好理解,
DataStream 中并没有 key 的定义,所以只能通过两条消息一减一增来表示更新操作。而连接到
外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。
11.4 时间属性和窗口
基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在
Table API SQL 中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示
时间。
所以所谓的时间属性( time attributes ),其实就是每个表模式结构( schema )的一部分。
它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦
定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。
时间属性的数据类型为 TIMESTAMP ,它的行为类似于常规时间戳,可以直接访问并且进
行计算。
按照时间语义的不同,我们可以把时间属性的定义分成事件时间( event time )和处理时
间( processing time )两种情况。
11.4.1 事件时间
我们在实际应用中,最常用的就是事件时间。在事件时间语义下,允许表处理程序根据每
个数据中包含的时间戳(也就是事件发生的时间)来生成结果。
事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线
watermark )来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时
间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。
为了处理无序事件,并区分流中的迟到事件。 Flink 需要从事件数据中提取时间戳,并生
成水位线,用来推进事件时间的进展。
事件时间属性可以在创建表 DDL 中定义,也可以在数据流和表的转换中定义。
1. 在创建表的 DDL 中定义
在创建表的 DDL CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK
语句来定义事件时间属性。 WATERMARK 语句主要用来定义水位线( watermark )的生成表达
式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的
延迟时间。具体定义方式如下:
CREATE TABLE EventTable(
 user STRING,
 url STRING,
 ts TIMESTAMP(3),
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 ...
);
这里我们把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的
330 5 秒”是以“时间间隔”的形式定义的,格式是 INTERVAL < 数值 > < 时间单位 >
INTERVAL '5' SECOND
这里的数值必须用单引号引起来,而单位用 SECOND SECONDS 是等效的。
Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ 。这里
TIMESTAMP_LTZ 是指带有本地时区信息的时间戳( TIMESTAMP WITH LOCAL TIME
ZONE );一般情况下如果数据中的时间戳是“年 - - - - - 秒”的形式,那就是不带时区信
息的,可以将事件时间属性定义为 TIMESTAMP 类型。
而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件
时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:
CREATE TABLE events (
 user STRING,
 url STRING,
 ts BIGINT,
 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);
这里我们另外定义了一个字段 ts_ltz ,是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的;
进而使用 WATERMARK 语句将它设为事件时间属性,并设置 5 秒的水位线延迟。
2. 在数据流转换为表时定义
事件时间属性也可以在将 DataStream 转换为表的时候来定义。我们调用 fromDataStream()
方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上 .rowtime()
缀,就表示将当前字段指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去
的“逻辑字段”,就像之前 DDL 中定义的第二种情况;也可以是本身固有的字段,那么这个
字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP 。不论那种方式,时间属性
字段中保存的都是事件的时间戳( TIMESTAMP 类型)。
需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前
就在 DataStream 上定义好了。由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属
性解析成不带时区的 TIMESTAMP 类型,所有的时间值都被当作 UTC 标准时间。
在代码中的定义方式如下:
// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = 
inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), 
$("ts").rowtime());
331
// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream = 
inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), 
$("ts").rowtime());
11.4.2 处理时间
相比之下处理时间就比较简单了,它就是我们的系统时间,使用时不需要提取时间戳
timestamp )和生成水位线( watermark )。因此在定义处理时间属性时,必须要额外声明一个
字段,专门用来保存当前的处理时间。
类似地,处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成
表时定义。
1. 在创建表的 DDL 中定义
在创建表的 DDL CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统
内置的 PROCTIME() 函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ
CREATE TABLE EventTable(
 user STRING,
 url STRING,
 ts AS PROCTIME()
) WITH (
 ...
);
这里的时间属性,其实是以“计算列”( computed column )的形式定义出来的。所谓的计
算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,
并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段
转换成 TIMESTAMP_LTZ 类型的 ts_ltz ,也是计算列的定义方式。
2. 在数据流转换为表时定义
处 理 时 间 属 性 同 样 可 以 在 将 DataStream 转 换 为 表 的 时 候 来 定 义 。 我 们 调 用
fromDataStream() 方法创建表时,可以用 .proctime() 后缀来指定处理时间属性字段。由于处理时
间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段
上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
代码中定义处理时间属性的方法如下:
DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外的字段作为处理时间属性字段
332 Table table = tEnv.fromDataStream(stream, $("user"), $("url"),
$("ts").proctime());
11.4.3 窗口( Window
有了时间属性,接下来就可以定义窗口进行计算了。我们知道,窗口可以将无界流切割成
大小有限的“桶”( bucket )来做计算,通过截取有限数据集来处理无限的流数据。在 DataStream
API 中提供了对不同类型的窗口进行定义和处理的接口,而在 Table API SQL 中,类似的功
能也都可以实现。
1. 分组窗口( Group Window ,老版本)
Flink 1.12 之前的版本中, Table API SQL 提供了一组“分组窗口”( Group Window
函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就
是调用 TUMBLE() HOP() SESSION() ,传入时间属性字段、窗口大小等参数就可以了。以
滚动窗口为例:
TUMBLE(ts, INTERVAL '1' HOUR)
这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔” INTERVAL 来定义。
在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内
的数据进行聚合。基本使用方式如下:
Table result = tableEnv.sqlQuery(
 "SELECT " +
 "user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
 "COUNT(url) AS cnt " +
 "FROM EventTable " +
 "GROUP BY " + // 使用窗口和用户名进行分组
 "user, " +
 "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口
 );
这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数
COUNT() 对分组数据的个数进行了聚合统计,并将结果字段重命名为 cnt ;用 TUPMBLE_END()
函数获取滚动窗口的结束时间,重命名为 endT 提取出来。
分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用( deprecated )的状
态。
2. 窗口表值函数( Windowing TVFs ,新版本)
1.13 版本开始, Flink 开始使用窗口表值函数( Windowing table-valued functions
Windowing TVFs )来定义窗口。窗口表值函数是 Flink 定义的多态表函数( PTF ),可以将表
进行扩展后返回。表函数( table function )可以看作是返回一个表的函数,关于这部分内容,
我们会在 11.6 节进行介绍。
333 目前 Flink 提供了以下几个窗口 TVF
滚动窗口( Tumbling Windows );
滑动窗口( Hop Windows ,跳跃窗口);
累积窗口( Cumulate Windows );
会话窗口( Session Windows ,目前尚未完全支持)。
窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到
了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N 、窗口联结( window
join )等等。当然,目前窗口 TVF 的功能还不完善,会话窗口和很多高级功能还不支持,不过
正在快速地更新完善。可以预见在未来的版本中,窗口 TVF 将越来越强大,将会是窗口处理
的唯一入口。
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:
“窗口起始点”( window_start )、“窗口结束点”( window_end )、“窗口时间”( window_time )。
起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于
window_end - 1ms ,所以相当于是窗口中能够包含数据的最大时间戳。
SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE() HOP()
CUMULATE() 就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。下面我们就
分别对这几种窗口 TVF 进行介绍。
1 )滚动窗口( TUMBLE
滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、
无重叠的窗口,一般用于周期性的统计计算。
SQL 中通过调用 TUMBLE() 函数就可以声明一个滚动窗口,只有一个核心参数就是窗
口大小( size )。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要
将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还
应该把当前查询的表作为参数整体传入。具体声明如下:
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
这里基于时间字段 ts ,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将
表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
2 )滑动窗口( HOP
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL
中通过调用 HOP() 来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小( size
和滑动步长( slide )两个参数。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
这里我们基于时间属性 ts ,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟
滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长( slide ),第四个参
数才是窗口大小( size )。
3 )累积窗口( CUMULATE
334 滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇
到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统
计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠
加累积的。
例如,我们按天来统计网站的 PV Page View ,页面浏览量),如果用 1 天的滚动窗口,
那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但
统计的就变成了“过去 24 小时的 PV ”。所以我们真正希望的是,还是按照自然日统计每天的
PV ,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累
积窗口”( Cumulate Window )。
        
累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积
窗口中有两个核心的参数:最大窗口长度( max window size )和累积步长( step )。所谓的最
大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。如图 11-8
所示,开始时,创建的第一个窗口大小就是步长 step ;之后的每个窗口都会在之前的基础上再
扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE() 函数来定义,具体
如下:
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
这里我们基于时间属性 ts ,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1
小时的累积窗口。注意第三个参数为步长 step ,第四个参数则是最大窗口长度。
上面所有的语句只是定义了窗口,类似于 DataStream API 中的窗口分配器;在 SQL 中窗
口的完整调用,还需要配合聚合操作和其它操作。我们会在下一节详细讲解窗口的聚合。
11.5 聚合( Aggregation )查询
SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多
个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合( Aggregation )查询。
Flink 中的 SQL 是流处理与标准 SQL 结合的产物,所以聚合查询也可以分成两种:流处理中
特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式。
11.5.1 分组聚合
SQL 中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如
SUM() MAX() MIN() AVG() 以及 COUNT() 。它们的特点是对多条输入数据进行计算,得
到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:
Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");
而更多的情况下,我们可以通过 GROUP BY 子句来指定分组的键( key ),从而对数据按
照某个字段做一个分组统计。例如之前我们举的例子,可以按照用户名进行分组,统计每个用
户点击 url 的次数:
SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user
这种聚合方式,就叫作“分组聚合”( group aggregation )。从概念上讲, SQL 中的分组聚
合可以对应 DataStream API keyBy 之后的聚合转换,它们都是按照某个 key 对数据进行了
划分,各自维护状态来进行聚合统计的。在流处理中,分组聚合同样是一个持续查询,而且是
一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更
新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流( retract stream
或更新插入流( upsert stream )的编码方式;如果在代码中直接转换成 DataStream 打印输出,
需要调用 toChangelogStream()
另外,在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要
维护的状态也会持续增长。为了防止状态无限增长耗尽资源, Flink Table API SQL 可以在表
环境中配置状态的生存时间( TTL ):
TableEnvironment tableEnv = ...
// 获取表环境的配置
TableConfig tableConfig = tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60));
或者也可以直接设置配置项 table.exec.state.ttl
TableEnvironment tableEnv = ...
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "60 min");
这两种方式是等效的。需要注意,配置 TTL 有可能会导致统计结果不准确,这其实是以
336 牺牲正确性为代价换取了资源的释放。
此外,在 Flink SQL 的分组聚合中同样可以使用 DISTINCT 进行去重的聚合处理;可以使
HAVING 对聚合结果进行条件筛选;还可以使用 GROUPING SETS (分组集)设置多个分
组情况分别统计。这些语法跟标准 SQL 中的用法一致,这里就不再详细展开了。
可以看到,分组聚合既是 SQL 原生的聚合查询,也是流处理中的聚合操作,这是实际应
用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求
也可以进行自定义。关于自定义函数( UDF ),我们会在 11.7 节中详细介绍。
11.5.2 窗口聚合
在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的 窗口 。在 11.4.3
小节中已经介绍了窗口的声明方式,这相当于 DataStream API 中的窗口分配器( window
assigner ),只是明确了窗口的形式以及数据如何分配;而窗口具体的计算处理操作,在
DataStream API 中还需要窗口函数( window function )来进行定义。
Flink Table API SQL 中,窗口的计算是通过“窗口聚合”( window aggregation
来实现的。与分组聚合类似,窗口聚合也需要调用 SUM() MAX() MIN() COUNT() 一类的
聚合函数,通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作
为分组 key 的一部分定义出来。在 Flink 1.12 版本之前,是直接把窗口自身作为分组 key 放在
GROUP BY 之后的,所以也叫“分组窗口聚合”(参见 11.4.3 小节);而 1.13 版本开始使用了
“窗口表值函数”( Windowing TVF ),窗口本身返回的是就是一个表,所以窗口会出现在 FROM
后面, GROUP BY 后面的则是窗口新增的字段 window_start window_end
比如,我们将 11.4.3 中分组窗口的聚合,用窗口 TVF 重新实现一下:
Table result = tableEnv.sqlQuery(
 "SELECT " +
 "user, " +
 "window_end AS endT, " +
 "COUNT(url) AS cnt " +
 "FROM TABLE( " +
 "TUMBLE( TABLE EventTable, " +
 "DESCRIPTOR(ts), " +
 "INTERVAL '1' HOUR)) " +
 "GROUP BY user, window_start, window_end "
 );
这里我们以 ts 作为时间属性字段、基于 EventTable 定义了 1 小时的滚动窗口,希望统计
出每小时每个用户点击 url 的次数。用来分组的字段是用户名 user ,以及表示窗口的
window_start window_end ;而 TUMBLE() 是表值函数,所以得到的是一个表( Table ),我们
的聚合查询就是在这个 Table 中进行的。这就是 11.3.2 小节中窗口聚合的实现方式。
Flink SQL 目前提供了滚动窗口 TUMBLE() 、滑动窗口 HOP() 和累积窗口( CUMULATE
三种表值函数( TVF )。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚
合的完整代码,以累积窗口为例:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class CumulateWindowExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
 SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
 new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
 new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
 new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
 new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
 )
 .assignTimestampsAndWatermarks(
 WatermarkStrategy.<Event>forMonotonousTimestamps()
 .withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {
 @Override
 public long extractTimestamp(Event element, long 
recordTimestamp) {
 return element.timestamp;
 }
 })
 );
// 创建表环境
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
 Table eventTable = tableEnv.fromDataStream(
 eventStream,
 $("user"),
 $("url"),
338
 $("timestamp").rowtime().as("ts") 
 );
// 为方便在 SQL 中引用,在环境中注册表 EventTable
 tableEnv.createTemporaryView("EventTable", eventTable);
// 设置累积窗口,执行 SQL 统计查询
 Table result = tableEnv
 .sqlQuery(
 "SELECT " +
 "user, " +
 "window_end AS endT, " +
 "COUNT(url) AS cnt " +
 "FROM TABLE( " +
 "CUMULATE( TABLE EventTable, " + // 定义累积窗口
 "DESCRIPTOR(ts), " +
 "INTERVAL '30' MINUTE, " +
 "INTERVAL '1' HOUR)) " +
 "GROUP BY user, window_start, window_end "
 );
 tableEnv.toDataStream(result).print();
 env.execute();
 }
}
这里我们使用了统计周期为 1 小时、累积间隔为 30 分钟的累积窗口。可以看到,代码的
架构和处理逻辑与 11.3.2 小节中的实现完全一致,只是将滚动窗口 TUMBLE() 换成了累积窗口
CUMULATE() 。代码执行结果如下:
+I[Alice, 1970-01-01T00:30, 2]
+I[Bob, 1970-01-01T00:30, 1]
+I[Alice, 1970-01-01T01:00, 3]
+I[Bob, 1970-01-01T01:00, 1]
+I[Bob, 1970-01-01T01:30, 1]
+I[Cary, 1970-01-01T02:00, 2]
+I[Bob, 1970-01-01T02:00, 1]
与分组聚合不同,窗口聚合不会将中间聚合的状态输出,只会最后输出一个结果。我们可
以看到,所有数据都是以 INSERT 操作追加到结果动态表中的,因此输出每行前面都有 +I
前缀。所以窗口聚合查询都属于追加查询,没有更新操作,代码中可以直接用 toDataStream()
将结果表转换成流。
具体来看,上面代码输入的前三条数据属于第一个半小时的累积窗口,其中 Alice 的访问
数据有两条, Bob 的访问数据有 1 条,所以输出了两条结果 [Alice, 1970-01-01T00:30, 2] [Bob,
1970-01-01T00:30, 1] ;而之后又到来的一条 Alice 访问数据属于第二个半小时范围,同时也属
于第一个 1 小时的统计周期 ,所以会在之前两条的基础上进行叠加,输出 [Alice,
339 1970-01-01T00:30, 3] ,而 Bob 没有新的访问数据,因此依然输出 [Bob, 1970-01-01T00:30, 1]
从第二个小时起,数据属于新的统计周期,就全部从零开始重新计数了。
相比之前的分组窗口聚合, Flink 1.13 版本的窗口表值函数( TVF )聚合有更强大的功能。
除了应用简单的聚合函数、提取窗口开始时间( window_start )和结束时间 (window_end) 之外,
窗口 TVF 还提供了一个 window_time 字段,用于表示窗口中的时间属性;这样就可以方便地
进行窗口的级联( cascading window )和计算了。另外,窗口 TVF 还支持 GROUPING SETS
极大地扩展了窗口的应用范围。
基于窗口的聚合,是流处理中聚合统计的一个特色,也是与标准 SQL 最大的不同之处。
在实际项目中,很多统计指标其实都是基于时间窗口来进行计算的,所以窗口聚合是 Flink SQL
中非常重要的功能;基于窗口 TVF 的聚合未来也会有更多功能的扩展支持,比如窗口 Top N
会话窗口、窗口联结等等。
11.5.3 开窗( Over )聚合
在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比
如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它
之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就
是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF
聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每
行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。
与标准 SQL 中一致, Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时
开窗聚合也叫作“ OVER 聚合”( Over Aggregation )。基本语法如下:
SELECT
< 聚合函数 > OVER (
[PARTITION BY < 字段 1>[, < 字段 2>, ...]]
ORDER BY < 时间属性字段 >
< 开窗范围 >),
...
FROM ...
这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER
子句中主要有以下几个部分:
PARTITION BY (可选)
用来指定分区的键( key ),类似于 GROUP BY 的分组,这部分是可选的;
ORDER BY
OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数
量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在
OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中,
目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间
340 属性。
开窗范围
对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来
做聚合。这个范围是由 BETWEEN < 下界 > AND < 上界 > 来定义的,也就是“从下界到上界”
的范围。目前支持的上界只能是 CURRENT ROW ,也就是定义一个“从之前某一行到当前行”
的范围,所以一般的形式为:
BETWEEN ... PRECEDING AND CURRENT ROW
前面我们提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还
应该在两种模式之间做出选择:范围间隔( RANGE intervals )和行间隔( ROW intervals )。
范围间隔
范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一
般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
行间隔
行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。
例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
下面是一个具体示例:
SELECT user, ts,
COUNT(url) OVER (
PARTITION BY user
ORDER BY ts
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS cnt
FROM EventTable
这里我们以 ts 作为时间属性字段,对 EventTable 中的每行数据都选取它之前 1 小时的所
有数据进行聚合,统计每个用户访问 url 的总次数,并重命名为 cnt 。最终将表中每行的 user
ts 以及扩展出 cnt 提取出来。
可以看到,整个开窗聚合的结果,是对每一行数据都有一个对应的聚合值,因此就像将表
中扩展出了一个新的列一样。由于聚合范围上界只能到当前行,新到的数据一般不会影响之前
数据的聚合结果,所以结果表只需要不断插入( INSERT )就可以了。执行上面 SQL 得到的结
果表,可以用 toDataStream() 直接转换成流打印输出。
开窗聚合与窗口聚合(窗口 TVF 聚合)本质上不同,不过也还是有一些相似之处的:它
们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是
的思路。事实上,在 Table API 中确实就定义了两类窗口:分组窗口( GroupWindow )和开
窗窗口( OverWindow );而在 SQL 中,也可以用 WINDOW 子句来在 SELECT 外部单独定义
一个 OVER 窗口:
SELECT user, ts,
COUNT(url) OVER w AS cnt,
341 MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
PARTITION BY user
ORDER BY ts
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
上面的 SQL 中定义了一个选取之前 2 行数据的 OVER 窗口,并重命名为 w ;接下来就可
以基于它调用多个聚合函数,扩展出更多的列提取出来。比如这里除统计 url 的个数外,还统
计了 url 的最大长度:首先用 CHAR_LENGTH() 函数计算出 url 的长度,再调用聚合函数 MAX()
进行聚合统计。这样,我们就可以方便重复引用定义好的 OVER 窗口了,大大增强了代码的
可读性。
11.5.4 应用实例 —— Top N
灵活使用各种类型的窗口以及聚合函数,可以实现不同的需求。一般的聚合函数,比如
SUM() MAX() MIN() COUNT() 等,往往只是针对一组数据聚合得到一个唯一的值;所谓
OVER 聚合的 多对多 模式,也是针对每行数据都进行一次聚合才得到了多行的结果,对于每
次聚合计算实际上得到的还是唯一的值。而有时我们可能不仅仅需要统计数据中的最大 / 最小
值,还希望得到前 N 个最大 / 最小值;这时每次聚合的结果就不是一行,而是 N 行了。这就是
经典的“ Top N ”应用场景。
Top N 聚合字面意思是“最大 N 个”,这只是一个泛称,它不仅包括查询最大的 N 个值、
也包括了查询最小的 N 个值的场景。
理想的状态下,我们应该有一个 TOPN() 聚合函数,调用它对表进行聚合就可以得到想要
选取的前 N 个值了。不过仔细一想就会发现,这个聚合函数并不容易实现:对于每一次聚合
计算,都应该都有多行数据输入,并得到 N 行结果输出,这是一个真正意义上的“多对多”
转换。这种函数相当于把一个表聚合成了另一个表,所以叫作“表聚合函数”( Table Aggregate
Function )。表聚合函数的抽象比较困难,目前只有窗口 TVF 有能力提供直接的 Top N 聚合,
不过也尚未实现。
所以目前在 Flink SQL 中没有能够直接调用的 Top N 函数,而是提供了稍微复杂些的变通
实现方法。
1. 普通 Top N
Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现 Top N 的。具体来说,是通过
将一个特殊的聚合函数 ROW_NUMBER() 应用到 OVER 窗口上,统计出每一行排序后的行号,
作为一个字段提取出来;然后再用 WHERE 子句筛选行号小于等于 N 的那些行返回。
基本语法如下:
SELECT ...
FROM (
 SELECT ...,
 ROW_NUMBER() OVER (
[PARTITION BY <字段 1>[, <字段 1>...]]
 ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...]
) AS row_num
 FROM ...)
WHERE row_num <= N [AND <其它条件>]
这里的 OVER 窗口定义与之前的介绍基本一致,目的就是利用 ROW_NUMBER() 函数为
每一行数据聚合得到一个排序之后的行号。行号重命名为 row_num ,并在外层的查询中以
row_num <= N 作为条件进行筛选,就可以得到根据排序字段统计的 Top N 结果了。
需要对关键字额外做一些说明:
WHERE
用来指定 Top N 选取的条件,这里必须通过 row_num <= N 或者 row_num < N + 1 指定一
个“排名结束点”( rank end ),以保证结果有界。
PARTITION BY
是可选的,用来指定分区的字段,这样我们就可以针对不同的分组分别统计 Top N 了。
ORDER BY
指定了排序的字段,因为只有排序之后,才能进行前 N 个最大 / 最小的选取。每个排序字
段后可以用 asc 或者 desc 来指定排序规则: asc 为升序排列,取出的就是最小的 N 个值; desc
为降序排序,对应的就是最大的 N 个值。默认情况下为升序, asc 可以省略。
细心的读者可能会发现,之前介绍的 OVER 窗口不是说了,目前 ORDER BY 后面只能跟
时间字段、并且只支持升序吗?这里怎么又可以任意指定字段进行排序了呢?
这是因为 OVER 窗口目前并不完善,不过针对 Top N 这样一个经典应用场景, Flink SQL
专门用 OVER 聚合做了优化实现。所以只有在 Top N 的应用场景中, OVER 窗口 ORDER BY
后才可以指定其它排序字段;而要想实现 Top N ,就必须按照上面的格式进行定义,否则 Flink
SQL 的优化器将无法正常解析。而且,目前 Table API 中并不支持 ROW_NUMBER() 函数,所
以也只有 SQL 中这一种通用的 Top N 实现方式。
另外要注意, Top N 的实现必须写成上面的嵌套查询形式。这是因为行号 row_num 是内
部子查询聚合的结果,不可能在内部作为筛选条件,只能放在外层的 WHERE 子句中。
下面是一个具体的示例,我们统计每个用户的访问事件中,按照字符长度排序的前两个
url
SELECT user, url, ts, row_num
FROM (
 SELECT *,
 ROW_NUMBER() OVER (
PARTITION BY user
 ORDER BY CHAR_LENGTH(url) desc 
) AS row_num
 FROM EventTable)
WHERE row_num <= 2
这里我们以用户来分组,以访问 url 的字符长度作为排序的字段,降序排列后用聚合统计
出每一行的行号,这样就相当于在 EventTable 基础上扩展出了一列 row_num 。而后筛选出行
号小于等于 2 的所有数据,就得到了每个用户访问的长度最长的两个 url
需要特别说明的是,这里的 Top N 聚合是一个更新查询。新数据到来后,可能会改变之
前数据的排名,所以会有更新( UPDATE )操作。这是 ROW_NUMBER() 聚合函数的特性决定
的。因此,如果执行上面的 SQL 得到结果表,需要调用 toChangelogStream() 才能转换成流打
印输出。
2. 窗口 Top N
除了直接对数据进行 Top N 的选取,我们也可以针对窗口来做 Top N
例如电商行业,实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先
开窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来,按窗口进行分组,并按
点击量大小降序排序,选取前 N 个作为结果返回。
我们已经知道, Top N 聚合本质上是一个表聚合函数,这和窗口表值函数( TVF )有天然
的联系。尽管如此,想要基于窗口 TVF 实现一个通用的 Top N 聚合函数还是比较麻烦的,目
Flink SQL 尚不支持。不过我们同样可以借鉴之前的思路,使用 OVER 窗口统计行号来实现。
具体来说,可以先做一个窗口聚合,将窗口信息 window_start window_end 连同每个商
品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。
接下来就可以像一般的 Top N 那样定义 OVER 窗口了,按窗口分组,按点击量排序,用
ROW_NUMBER() 统计行号并筛选前 N 行就可以得到结果。所以窗口 Top N 的实现就是窗口
聚合与 OVER 聚合的结合使用。
下面是一个具体案例的代码实现。由于用户访问事件 Event 中没有商品相关信息,因此我
们统计的是每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class WindowTopNExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
 SingleOutputStreamOperator<Event> eventStream = env
 .fromElements(
 new Event("Alice", "./home", 1000L),
 new Event("Bob", "./cart", 1000L),
 new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),
 new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
 new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
 new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
 new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
 )
 .assignTimestampsAndWatermarks(
 WatermarkStrategy.<Event>forMonotonousTimestamps()
 .withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {
 @Override
 public long extractTimestamp(Event element, long 
recordTimestamp) {
 return element.timestamp;
 }
 })
 );
 // 创建表环境
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 // 将数据流转换成表,并指定时间属性
 Table eventTable = tableEnv.fromDataStream(
 eventStream,
 $("user"),
 $("url"),
 $("timestamp").rowtime().as("ts") 
// 将 timestamp 指定为事件时间,并命名为 ts
 );
 // 为方便在 SQL 中引用,在环境中注册表 EventTable
 tableEnv.createTemporaryView("EventTable", eventTable);
 // 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表
 String subQuery =
 "SELECT window_start, window_end, user, COUNT(url) as cnt " +
 "FROM TABLE ( " +
 "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) 
" +
 "GROUP BY window_start, window_end, user ";
 // 定义 Top N 的外层查询
 String topNQuery =
 "SELECT * " +
 "FROM (" +
 "SELECT *, " +
345
 "ROW_NUMBER() OVER ( " +
 "PARTITION BY window_start, window_end " +
 "ORDER BY cnt desc " +
 ") AS row_num " +
 "FROM (" + subQuery + ")) " +
 "WHERE row_num <= 2";
 // 执行 SQL 得到结果表
 Table result = tableEnv.sqlQuery(topNQuery);
 tableEnv.toDataStream(result).print();
 env.execute();
 }
}
这里为了更好的代码可读性,我们将 SQL 拆分成了用来做窗口聚合的内部子查询,和套
Top N 模板的外层查询。
1 )首先基于 ts 时间字段定义 1 小时滚动窗口,统计 EventTable 中每个用户的访问次数,
重命名为 cnt ;为了方便后面做排序,我们将窗口信息 window_start window_end 也提取出
来,与 user cnt 一起作为聚合结果表中的字段。
2 )然后套用 Top N 模板,对窗口聚合的结果表中每一行数据进行 OVER 聚合统计行号。
这里以窗口信息进行分组,按访问次数 cnt 进行排序,并筛选行号小于等于 2 的数据,就可以
得到每个窗口内访问次数最多的前两个用户了。
运行结果如下:
+I[1970-01-01T00:00, 1970-01-01T01:00, Alice, 3, 1]
+I[1970-01-01T00:00, 1970-01-01T01:00, Bob, 1, 2]
+I[1970-01-01T01:00, 1970-01-01T02:00, Cary, 2, 1]
+I[1970-01-01T01:00, 1970-01-01T02:00, Bob, 1, 2]
可以看到,第一个 1 小时窗口中, Alice 3 次访问排名第一, Bob 1 次访问排名第二;
而第二小时内, Cary 2 次访问占据活跃榜首, Bob 仍以 1 次访问排名第二。由于窗口的统
计结果只会最终输出一次,所以排名也是确定的,这里结果表中只有插入( INSERT )操作。
也就是说,窗口 Top N 是追加查询,可以直接用 toDataStream() 将结果表转换成流打印输出。
11.6 联结( Join )查询
按照数据库理论,关系型表的设计往往至少需要满足第三范式( 3NF ),表中的列都直接
依赖于主键,这样就可以避免数据冗余和更新异常。例如商品的订单信息,我们会保存在一个
“订单表”中,而这个表中只有商品 ID ,详情则需要到“商品表”按照 ID 去查询;这样的好
处是当商品信息发生变化时,只要更新商品表即可,而不需要在订单表中对所有这个商品的所
有订单进行修改。不过这样一来,我们就无法从一个单独的表中提取所有想要的数据了。
346 在标准 SQL 中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表
的联结( Join )。在 Flink SQL 中,同样支持各种灵活的联结( Join )查询,操作的对象是动态
表。
在流处理中,动态表的 Join 对应着两条数据流的 Join 操作。与上一节的聚合查询类似,
Flink SQL 中的联结查询大体上也可以分为两类: SQL 原生的联结查询方式,和流处理中特有
的联结查询。
11.6.1 常规联结查询
常规联结( Regular Join )是 SQL 中原生定义的 Join 方式,是最通用的一类联结操作。它
的具体语法与标准 SQL 的联结完全相同,通过关键字 JOIN 来联结两个表,后面用关键字 ON
来指明联结条件。按照习惯,我们一般以 左侧 右侧 来区分联结操作的两个表。
在两个动态表的联结中,任何一侧表的插入( INSERT )或更改( UPDATE )操作都会让
联结的结果表发生改变。例如,如果左侧有新数据到来,那么它会与右侧表中所有之前的数据
进行联结合并,右侧表之后到来的新数据也会与这条数据连接合并。所以,常规联结查询一般
是更新( Update )查询。
与标准 SQL 一致, Flink SQL 的常规联结也可以分为内联结( INNER JOIN )和外联结
OUTER JOIN ),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件”
作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。
1. 等值内联结( INNER Equi-JOIN
内联结用 INNER JOIN 来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓
的笛卡尔积( Cartesian product )。目前仅支持等值联结条件。
例如之前提到的“订单表”(定义为 Order )和“商品表”(定义为 Product )的联结查询,
就可以用以下 SQL 实现:
SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id
这里是一个内联结,联结条件是订单数据的 product_id 和商品数据的 id 相等。由于订单
表中出现的商品 id 一定会在商品表中出现,因此这样得到的联结结果表,就包含了订单表 Order
中所有订单数据对应的详细信息。
2. 等值外联结( OUTER Equi-JOIN
与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一
侧表中找不到任何匹配的行也单独返回。 Flink SQL 支持左外( LEFT JOIN )、右外( RIGHT JOIN
和全外( FULL OUTER JOIN ),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行
返回。例如,订单表中未必包含了商品表中的所有 ID ,为了将哪些没有任何订单的商品信息
347 也查询出来,我们就可以使用右外联结( RIGHT JOIN )。当然,外联结查询目前也仅支持等值
联结条件。具体用法如下:
SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id
这部分知识与标准 SQL 中是完全一样的,这里不再赘述。
11.6.2 间隔联结查询
8.3 节中,我们曾经学习过 DataStream API 中的双流 Join ,包括窗口联结( window join
和间隔联结( interval join )。两条流的 Join 就对应着 SQL 中两个表的 Join ,这是流处理中特有
的联结方式。目前 Flink SQL 还不支持窗口联结,而间隔联结则已经实现。
间隔联结( Interval Join )返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不
过这里的 约束条件 除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要
点:
两表的联结
间隔联结不需要用 JOIN 关键字,直接在 FROM 后将要联结的两表列出来就可以,用逗
号分隔。这与标准 SQL 中的语法一致,表示一个 交叉联结 Cross Join ),会返回两表中所有
行的笛卡尔积。
联结条件
联结条件用 WHERE 子句来定义,用一个等值表达式描述。交叉联结之后再用 WHERE
进行条件筛选,效果跟内联结 INNER JOIN ... ON ... 非常类似。
时间间隔限制
我们可以在 WHERE 子句中,联结条件后用 AND 追加一个时间间隔的限制条件;做法是
提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义
方式有下面三种,这里分别用 ltime rtime 表示左右表中的时间字段:
1 ltime = rtime
2 ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
348 3 ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5'
SECOND
判断两者相等,这是最强的时间约束,要求两表中数据的时间必须完全一致才能匹配;一
般情况下,我们还是会放宽一些,给出一个间隔。间隔的定义可以用 < <= >= > 这一类的
关系不等式,也可以用 BETWEEN ... AND ... 这样的表达式。
例如,我们现在除了订单表 Order 外,还有一个 发货表 ”Shipment ,要求在收到订单后四
个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合
并在一起返回。
SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
在流处理中,间隔联结查询只支持具有时间属性的 仅追加 Append-only )表。
那对于有更新操作的表,又怎么办呢?除了间隔联结之外, Flink SQL 还支持时间联结
Temporal Join ),这主要是针对“版本表”( versioned table )而言的。所谓版本表,就是记录
了数据随着时间推移版本变化的表,可以理解成一个“更新日志”( change log ),它就是具有
时间属性、还会进行更新操作的表。当我们联结某个版本表时,并不是把当前的数据连接合并
起来就行了,而是希望能够根据数据发生的时间,找到当时的“版本”;这种根据更新时间提
取当时的值进行联结的操作,就叫作“时间联结”( Temporal Join )。这部分内容由于涉及版本
表的定义,我们就不详细展开了,感兴趣的读者可以查阅官网资料。
11.7 函数
SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这
就是 函数 functions )。
Flink Table API SQL 同样提供了函数的功能。两者在调用时略有不同: Table API
的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为
参数。例如,要把一个字符串 str 转换成全大写的形式, Table API 的写法是调用 str 这个 String
对象的 upperCase() 方法:
str.upperCase();
SQL 中的写法就是直接引用 UPPER() 函数,将 str 作为参数传入:
UPPER(str)
由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比
较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较
少使用。下面我们主要介绍 Flink SQL 中函数的使用。
Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用
就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT() CHAR_LENGTH()
349 UPPER() 等等;而另一类函数则是用户自定义的函数( UDF ),需要在表环境中注册才能使用。
接下来我们就对这两类函数分别进行介绍。
11.7.1 系统函数
系统函数( System Functions )也叫内置函数( Built-in Functions ),是在系统中预先实现好
的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。 Flink SQL 提供了
大量的系统函数,几乎支持所有的标准 SQL 中的操作,这为我们使用 SQL 编写流处理程序提
供了极大的方便。
Flink SQL 中的系统函数又主要可以分为两大类:标量函数( Scalar Functions )和聚合函
数( Aggregate Functions )。
1. 标量函数( Scalar Functions
所谓的“标量”,是指只有数值大小、没有方向的量;所以标量函数指的就是只对输入数
据做转换操作、返回一个值的函数。这里的输入数据对应在表中,一般就是一行数据中 1 个或
多个字段,因此这种操作有点像流处理转换算子中的 map 。另外,对于一些没有输入参数、直
接可以得到唯一结果的函数,也属于标量函数。
标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准 SQL 中也有
定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官
网的完整函数列表。
比较函数( Comparison Functions
比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。
这个比较表达式可以是用 < > = 等符号连接两个值,也可以是用关键字定义的某种判断。
例如:
1 value1 = value2 判断两个值相等;
2 value1 <> value2 判断两个值不相等
3 value IS NOT NULL 判断 value 不为空
逻辑函数( Logical Functions
逻辑函数就是一个逻辑表达式,也就是用与( AND )、或( OR )、非( NOT )将布尔类型
的值连接起来,也可以用判断语句( IS IS NOT )进行真值判断;返回的还是一个布尔类型
的值。例如:
1 boolean1 OR boolean2 布尔值 boolean1 与布尔值 boolean2 取逻辑或
2 boolean IS FALSE 判断布尔值 boolean 是否为 false
3 NOT boolean 布尔值 boolean 取逻辑非
算术函数( Arithmetic Functions
进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:
350 1 numeric1 + numeric2 两数相加
2 POWER(numeric1, numeric2) 幂运算,取数 numeric1 numeric2 次方
3 RAND() 返回( 0.0, 1.0 )区间内的一个 double 类型的伪随机数
字符串函数( String Functions
进行字符串处理的函数。例如:
1 string1 || string2 两个字符串的连接
2 UPPER(string) 将字符串 string 转为全部大写
3 CHAR_LENGTH(string) 计算字符串 string 的长度
时间函数( Temporal Functions
进行与时间相关操作的函数。例如:
1 DATE string 按格式 "yyyy-MM-dd" 解析字符串 string ,返回类型为 SQL Date
2 TIMESTAMP string 按格式 "yyyy-MM-dd HH:mm:ss[.SSS]" 解析,返回类型为 SQL
timestamp
3 CURRENT_TIME 返回本地时区的当前时间,类型为 SQL time (与 LOCALTIME
等价)
4 INTERVAL string range 返回一个时间间隔。 string 表示数值; range 可以是 DAY
MINUTE DAT TO HOUR 等单位,也可以是 YEAR TO MONTH 这样的复合单位。如“ 2
10 个月”可以写成: INTERVAL '2-10' YEAR TO MONTH
2. 聚合函数( Aggregate Functions
聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作
为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗( Over )聚合,对数
据的聚合操作都可以用相同的函数来定义。
标准 SQL 中常见的聚合函数 Flink SQL 都是支持的,目前也在不断扩展,为流处理应用
提供更强大的功能。例如:
COUNT(*) 返回所有行的数量,统计个数
SUM([ ALL | DISTINCT ] expression) 对某个字段进行求和操作。默认情况
下省略了关键字 ALL ,表示对所有行求和;如果指定 DISTINCT ,则会对数据进行去
重,每个值只叠加一次。
RANK() 返回当前值在一组值中的排名
ROW_NUMBER() 对一组值排序后,返回当前值的行号。与 RANK()
功能相似
其中, RANK() ROW_NUMBER() 一般用在 OVER 窗口中,在之前 11.5.4 小节实现 Top
N 的过程中起到了非常重要的作用。
351 11.7.2 自定义函数( UDF
系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需
要用自定义函数( User Defined Functions UDF )来实现了。事实上,系统内置函数仍然在不
断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工
JIRA 上向 Flink 开发团队提出“议题”( issue ),请求将新的函数添加到系统函数中。
Flink Table API SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF
主要有以下几类:
标量函数( Scalar Functions ):将输入的标量值转换成一个新的标量值;
表函数( Table Functions ):将标量值转换成一个或多个新的行数据,也就是
扩展成一个表;
聚合函数( Aggregate Functions ):将多行数据里的标量值转换成一个新的标
量值;
表聚合函数( Table Aggregate Functions ):将多行数据里的标量值转换成一
个或多个新的行数据。
1. 整体调用流程
要想在代码中使用自定义的函数,我们需要首先自定义对应 UDF 抽象类的实现,并在表
环境中注册这个函数,然后就可以在 Table API SQL 中调用了。
1 )注册函数
注册函数时需要调用表环境的 createTemporarySystemFunction() 方法,传入注册的函数名
以及 UDF 类的 Class 对象:
// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
我们自定义的 UDF 类叫作 MyFunction ,它应该是上面四种 UDF 抽象类中某一个的具体
实现;在环境中将它注册为名叫 MyFunction 的函数。
这里 createTemporarySystemFunction() 方法的意思是创建了一个“临时系统函数”,所以
MyFunction 函 数 名 是 全 局 的 , 可 以 当 作 系 统 函 数 来 使 用 ; 我 们 也 可 以 用
createTemporaryFunction() 方法,注册的函数就依赖于当前的数据库( database )和目录( catalog
了,所以这就不是系统函数,而是“目录函数”( catalog function ),它的完整名称应该包括所
属的 database catalog
一般情况下,我们直接用 createTemporarySystemFunction() 方法将 UDF 注册为系统函数就
可以了。
2 )使用 Table API 调用函数
Table API 中,需要使用 call() 方法来调用自定义函数:
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
这里 call() 方法有两个参数,一个是注册好的函数名 MyFunction ,另一个则是函数调用时
352 本身的参数。这里我们定义 MyFunction 在调用时,需要传入的参数是 myField 字段。
此外,在 Table API 中也可以不注册函数,直接用“内联”( inline )的方式调用 UDF
tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField")));
区别只是在于 call() 方法第一个参数不再是注册好的函数名,而直接就是函数类的 Class
对象了。
3 )在 SQL 中调用函数
当我们将函数注册为系统函数之后,在 SQL 中的调用就与内置系统函数完全一样了:
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
可见, SQL 的调用方式更加方便,我们后续依然会以 SQL 为例介绍 UDF 的用法。
接下来我们就对不同类型的 UDF 进行展开介绍。
2. 标量函数( Scalar Functions
自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一
行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数
一对一 的转换。
想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction ,并实
现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的( public ),
而且名字必须是 eval 。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数
和返回值类型。
这里需要特别说明的是, ScalarFunction 抽象类中并没有定义 eval() 方法,所以我们不能直
接在代码中重写( override );但 Table API 的框架底层又要求了求值方法必须名字为 eval() 。这
Table API SQL 目前还显得不够完善的地方,未来的版本应该会有所改进。
ScalarFunction 以及其它所有的 UDF 接口,都在 org.apache.flink.table.functions 中。
下面我们来看一个具体的例子。我们实现一个自定义的哈希( hash )函数 HashFunction
用来求传入对象的哈希值。
public static class HashFunction extends ScalarFunction {
// 接受任意类型输入,返回 INT 型输出
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
// 注册函数
tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// SQL 里调用注册好的函数
tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
这里我们自定义了一个 ScalarFunction ,实现了 eval() 求值方法,将任意类型的对象传入,
得到一个 Int 类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的 hashCode()
方法即可。
353 另外注意,由于 Table API 在对函数进行解析时需要提取求值方法参数的类型引用,所以
我们用 DataTypeHint(inputGroup = InputGroup.ANY) 对输入参数的类型做了标注,表示 eval
参数可以是任意类型。
3. 表函数( Table Functions
跟标量函数一样,表函数的输入参数也可以是 0 个、 1 个或多个标量值;不同的是,它可
以返回任意多行数据。 多行数据 事实上就构成了一个表,所以 表函数 可以认为就是返回一
个表的函数,这是一个 一对多 的转换关系。之前我们介绍过的窗口 TVF ,本质上就是表函
数。
类似地,要实现自定义的表函数,需要自定义类来继承抽象类 TableFunction ,内部必须
要实现的也是一个名为 eval 的求值方法。与标量函数不同的是, TableFunction 类本身是有一
个泛型参数 T 的,这就是表函数返回数据的类型;而 eval() 方法没有返回类型,内部也没有 return
语句,是通过调用 collect() 方法来发送想要输出的行数据的。多么熟悉的感觉——回忆一下
DataStream API 中的 FlatMapFunction ProcessFunction ,它们的 flatMap processElement
法也没有返回值,也是通过 out.collect() 来向下游发送数据的。
我们使用表函数,可以对一行数据得到一个表,这和 Hive 中的 UDTF 非常相似。那对于
原先输入的整张表来说,又该得到什么呢?一个简单的想法是,就让输入表中的每一行,与它
转换得到的表进行联结( join ),然后再拼成一个完整的大表,这就相当于对原来的表进行了
扩展。在 Hive SQL 语法中,提供了 侧向视图 lateral view ,也叫横向视图)的功能,可
以将表中的一行数据拆分成多行; Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来
实现的。
SQL 中调用表函数,需要使用 LATERAL TABLE(<TableFunction>) 来生成扩展的 侧向
,然后与原始表进行联结( Join )。这里的 Join 操作可以是直接做交叉联结( cross join ),
FROM 后用逗号分隔两个表就可以;也可以是以 ON TRUE 为条件的左联结( LEFT JOIN )。
下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数 SplitFunction ,可以将
一个字符串转换成(字符串,长度)的二元组。
// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
 public void eval(String str) {
 for (String s : str.split(" ")) {
 // 使用 collect()方法发送一行数据
 collect(Row.of(s, s.length()));
 }
 }
}
// 注册函数
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// 在 SQL 里调用注册好的函数
// 1. 交叉联结
tableEnv.sqlQuery(
 "SELECT myField, word, length " +
 "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
// 2. 带 ON TRUE 条件的左联结
tableEnv.sqlQuery(
 "SELECT myField, word, length " +
 "FROM MyTable " +
 "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
// 重命名侧向表中的字段
tableEnv.sqlQuery(
 "SELECT myField, newWord, newLength " +
 "FROM MyTable " +
 "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON 
TRUE");
这里我们直接将表函数的输出类型定义成了 ROW ,这就是得到的侧向表中的数据类型;
每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在 SQL 中进行了调用,
还可以对侧向表的中字段进行重命名。
4. 聚合函数( Aggregate Functions
用户自定义聚合函数( User Defined AGGregate function UDAGG )会把一行或多行数据
(也就是一个表)聚合成一个标量值。这是一个标准的 多对一 的转换。
聚合函数的概念我们之前已经接触过多次,如 SUM() MAX() MIN() AVG() COUNT()
都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定
义聚合函数来实现功能了。
自定义聚合函数需要继承抽象类 AggregateFunction AggregateFunction 有两个泛型参数
<T, ACC> T 表示聚合输出的结果类型, ACC 则表示聚合的中间状态类型。
Flink SQL 中的聚合函数的工作原理如下:
1 )首先,它需要创建一个累加器( accumulator ),用来存储聚合的中间结果。这与
DataStream API 中的 AggregateFunction 非常类似,累加器就可以看作是一个聚合状态。调用
createAccumulator() 方法可以创建一个空的累加器。
2 )对于输入的每一行数据,都会调用 accumulate() 方法来更新累加器,这是聚合的核心
过程。
3 )当所有的数据都处理完之后,通过调用 getValue() 方法来计算并返回最终的结果。
所以,每个 AggregateFunction 都必须实现以下几个方法:
createAccumulator()
这是创建累加器的方法。没有输入参数,返回类型为累加器类型 ACC
accumulate()
355 这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是
当前的累加器,类型为 ACC ,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传
入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。
需要注意的是, accumulate() 与之前的求值方法 eval() 类似,也是底层架构要求的,必须为 public
方法名必须为 accumulate ,且无法直接 override 、只能手动实现。
getValue()
这是得到最终返回结果的方法。输入参数是 ACC 类型的累加器,输出类型为 T
在遇到复杂类型时, Flink 的类型推导可能会无法得到正确的结果。所以 AggregateFunction
也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()
getResultType() 两个方法来指定的。
除了上面的方法,还有几个方法是可选的。这些方法有些可以让查询更加高效,有些是在
某些特定场景下必须要实现的。比如,如果是对会话窗口进行聚合, merge() 方法就是必须要
实现的,它会定义累加器的合并操作,而且这个方法对一些场景的优化也很有用;而如果聚合
函数用在 OVER 窗口聚合中,就必须实现 retract() 方法,保证数据可以进行撤回操作;
resetAccumulator() 方法则是重置累加器,这在一些批处理场景中会比较有用。
AggregateFunction 的所有方法都必须是 公有的( public ),不能是静态的( static ),而且
名字必须跟上面写的完全一样。 createAccumulator getValue getResultType 以 及
getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以 override ;而
其他则都是底层架构约定的方法。
下面举一个具体的示例。在常用的系统内置聚合函数里,可以用 AVG() 来计算平均值;如
果我们现在希望计算的是某个字段的“加权平均值”,又该怎么做呢?系统函数里没有现成的
实现,所以只能自定义一个聚合函数 WeightedAvg 来计算了。
比如我们要从学生的分数表 ScoreTable 中计算每个学生的加权平均分。为了计算加权平均
值,应该从输入的每行数据中提取两个值作为参数:要计算的分数值 score ,以及它的权重
weight 。而在聚合过程中,累加器( accumulator )需要存储当前的加权总和 sum ,以及目前数
据的个数 count 。这可以用一个二元组来表示,也可以单独定义一个类 WeightedAvgAccum
里面包含 sum count 两个属性,用它的对象实例来作为聚合的累加器。
具体代码如下:
// 累加器类型定义
public static class WeightedAvgAccumulator {
 public long sum = 0; // 加权和
 public int count = 0; // 数据个数
}
// 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator
public static class WeightedAvg extends AggregateFunction<Long, 
WeightedAvgAccumulator> {
 @Override
 public WeightedAvgAccumulator createAccumulator() {
 return new WeightedAvgAccumulator(); // 创建累加器
 }
 @Override
 public Long getValue(WeightedAvgAccumulator acc) {
 if (acc.count == 0) {
 return null; // 防止除数为 0
 } else {
 return acc.sum / acc.count; // 计算平均值并返回
 }
 }
 // 累加计算方法,每来一行数据都会调用
 public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer 
iWeight) {
 acc.sum += iValue * iWeight;
 acc.count += iWeight;
 }
}
// 注册自定义聚合函数
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
// 调用函数计算加权平均值
Table result = tableEnv.sqlQuery(
 "SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY 
student"
);
聚合函数的 accumulate() 方法有三个输入参数。第一个是 WeightedAvgAccum 类型的累加
器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight 。这里
我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。
5. 表聚合函数( Table Aggregate Functions
用户自定义表聚合函数( UDTAGG )可以把一行或多行数据(也就是一个表)聚合成另
一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多
对多”的转换。
自定义表聚合函数需要继承抽象类 TableAggregateFunction TableAggregateFunction 的结
构和原理与 AggregateFunction 非常类似,同样有两个泛型参数 <T, ACC> ,用一个 ACC 类型的
累加器( accumulator )来存储聚合的中间结果。聚合函数中必须实现的三个方法,在
TableAggregateFunction 中也必须对应实现:
createAccumulator()
创建累加器的方法,与 AggregateFunction 中用法相同。
357 accumulate()
聚合计算的核心方法,与 AggregateFunction 中用法相同。
emitValue()
所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着 AggregateFunction
中的 getValue() 方法;区别在于 emitValue 没有输出类型,而输入参数有两个:第一个是 ACC
类型的累加器,第二个则是用于输出数据的“收集器” out ,它的类型为 Collect<T> 。所以很
明显,表聚合函数输出数据不是直接 return ,而是调用 out.collect() 方法,调用多次就可以输出
多行数据了;这一点与表函数非常相似。另外, emitValue() 在抽象类中也没有定义,无法 override
必须手动实现。
表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输
出。如果输入一条数据后,只是对结果表里一行或几行进行了更新( Update ),这时我们重新
计算整个表、全部输出显然就不够高效了。为了提高处理效率, TableAggregateFunction 还提
供了一个 emitUpdateWithRetract() 方法,它可以在结果表发生变化时,以 撤回 retract )老数
据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue() emitUpdateWithRetract()
两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()
表聚合函数相对比较复杂,它的一个典型应用场景就是 Top N 查询。比如我们希望选出
一组数据排序后的前两名,这就是最简单的 TOP-2 查询。没有线程的系统函数,那么我们就
可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每
当来一条新数据就在 accumulate() 方法中进行比较更新,最终在 emitValue() 中调用两次
out.collect() 将前两名数据输出。
具体代码如下:
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator {
public Integer first;
public Integer second;
}
// 自定义表聚合函数,查询一组数中最大的两个,返回值为 ( 数值,排名 ) 的二元组
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>,
Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE; // 为方便比较,初始值给最小值
acc.second = Integer.MIN_VALUE;
return acc;
}
// 每来一个数据调用一次,判断是否更新累加器
public void accumulate(Top2Accumulator acc, Integer value) {
358 if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
// 输出 ( 数值,排名 ) 的二元组,输出两行数据
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>>
out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
目前 SQL 中没有直接使用表聚合函数的方式,所以需要使用 Table API 的方式来调用:
// 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// Table API 中调用函数
tableEnv.from("MyTable")
.groupBy($("myField"))
.flatAggregate(call("Top2", $("value")).as("value", "rank"))
.select($("myField"), $("value"), $("rank"));
这里使用了 flatAggregate() 方法,它就是专门用来调用表聚合函数的接口。对 MyTable
数据按 myField 字段进行分组聚合,统计 value 值最大的两个;并将聚合结果的两个字段重命
名为 value rank ,之后就可以使用 select() 将它们提取出来了。
11.8 SQL 客户端
有了 Table API SQL ,我们就可以使用熟悉的 SQL 来编写查询语句进行流处理了。不
过,这种方式还是将 SQL 语句嵌入到 Java/Scala 代码中进行的;另外,写完的代码后想要提
交作业还需要使用工具进行打包。这都给 Flink 的使用设置了门槛,如果不是 Java/Scala 程序
员,即使是非常熟悉 SQL 的工程师恐怕也会望而生畏了。
基于这样的考虑, Flink 为我们提供了一个工具来进行 Flink 程序的编写、测试和提交,这
工具叫作 “SQL 客户端 SQL 客户端提供了一个命令行交互界面( CLI ),我们可以在里面非
常容易地编写 SQL 进行查询,就像使用 MySQL 一样;整个 Flink 应用编写、提交的过程全变
成了写 SQL ,不需要写一行 Java/Scala 代码。
359 具体使用流程如下:
1 )首先启动本地集群
./bin/start-cluster.sh
2 )启动 Flink SQL 客户端
./bin/sql-client.sh
SQL 客户端的启动脚本同样位于 Flink bin 目录下。默认的启动模式是 embedded ,也就
是说客户端是一个嵌入在本地的进程,这是目前唯一支持的模式。未来会支持连接到远程 SQL
客户端的模式。
3 )设置运行模式
启动客户端后,就进入了命令行界面,这时就可以开始写 SQL 了。一般我们会在开始之
前对环境做一些设置,比较重要的就是运行模式。
首先是表环境的运行时模式,有流处理和批处理两个选项。默认为流处理:
Flink SQL> SET 'execution.runtime-mode' = 'streaming';
其次是 SQL 客户端的“执行结果模式”,主要有 table changelog tableau 三种,默认为
table 模式:
Flink SQL> SET 'sql-client.execution.result-mode' = 'table';
table 模式就是最普通的表处理模式,结果会以逗号分隔每个字段; changelog 则是更新日
志模式,会在数据前加上“ + ”(表示插入)或“ - ”(表示撤回)的前缀;而 tableau 则是经典
的可视化表模式,结果会是一个虚线框的表格。
此外我们还可以做一些其它可选的设置,比如之前提到的空闲状态生存时间( TTL ):
Flink SQL> SET 'table.exec.state.ttl' = '1000';
除了在命令行进行设置,我们也可以直接在 SQL 客户端的配置文件 sql-cli-defaults.yaml
中进行各种配置,甚至还可以在这个 yaml 文件里预定义表、函数和 catalog 。关于配置文件的
更多用法,大家可以查阅官网的详细说明。
4 )执行 SQL 查询
接下来就可以愉快的编写 SQL 语句了,这跟操作 MySQL Oracle 等关系型数据库没什么
区别。
我们可以尝试把一开始举的简单聚合例子写一下:
Flink SQL> CREATE TABLE EventTable(
> user STRING,
> url STRING,
> `timestamp` BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'events.csv',
> 'format' = 'csv'
> );
Flink SQL> CREATE TABLE ResultTable (
> user STRING,
> cnt BIGINT
> ) WITH (
> 'connector' = 'print'
> );
Flink SQL> INSERT INTO ResultTable SELECT user, COUNT(url) as cnt FROM EventTable
GROUP BY user;
这里我们直接用 DDL 创建两张表,注意需要有 WITH 定义的外部连接。一张表叫作
EventTable ,是从外部文件 events.csv 中读取数据的,这是输入数据表;另一张叫作 ResultTable
连接器为“ print ”,其实就是标准控制台打印,当然就是输出表了。所以接下来就可以直接执
SQL 查询,并将查询结果 INSERT 写入结果表中了。
SQL 客户端中,每定义一个 SQL 查询,就会把它作为一个 Flink 作业提交到集群上执
行。所以通过这种方式,我们可以快速地对流处理程序进行开发测试。
11.9 连接到外部系统
Table API SQL 编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器
connector ),这样就可以连接到外部系统进行数据交互了。
架构中的 TableSource 负责从外部系统中读取数据并转换成表, TableSink 则负责将结果表
写入外部系统。在 Flink 1.13 API 调用中,已经不去区分 TableSource TableSink ,我们只
要建立到外部系统的连接并创建表就可以, Flink 自动会从程序的处理逻辑中解析出它们的用
途。
Flink Table API SQL 支持了各种不同的连接器。当然,最简单的其实就是上一节中
提到的连接到控制台打印输出:
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);
这里只需要在 WITH 中定义 connector print 就可以了。而对于其它的外部系统,则需要
增加一些配置项。下面我们就分别进行讲解。
11.9.1 Kafka
Kafka SQL 连接器可以从 Kafka 的主题( topic )读取数据转换成表,也可以将表数据
写入 Kafka 的主题。换句话说,创建表的时候指定连接器为 Kafka ,则这个表既可以作为输入
表,也可以作为输出表。
1. 引入依赖
想要在 Flink 程序中使用 Kafka 连接器,需要引入如下依赖:
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
这里我们引入的 Flink Kafka 的连接器,与之前 DataStream API 中引入的连接器是一样
的。如果想在 SQL 客户端里使用 Kafka 连接器,还需要下载对应的 jar 包放到 lib 目录下。
另外, Flink 为各种连接器提供了一系列的“表格式”( table formats ),比如 CSV JSON
Avro Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相
当于表的序列化工具。对于 Kafka 而言, CSV JSON Avro 等主要格式都是支持的,
根据 Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。以 CSV 为例:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
由于 SQL 客户端中已经内置了 CSV JSON 的支持,因此使用时无需专门引入;而对于
没有内置支持的格式(比如 Avro ),则仍然要下载相应的 jar 包。关于连接器的格式细节详见
官网说明,我们后面就不再讨论了。
2. 创建连接到 Kafka 的表
创建一个连接到 Kafka 表,需要在 CREATE TABLE DDL 中在 WITH 子句里指定连接
器为 Kafka ,并定义必要的配置参数。
下面是一个具体示例:
CREATE TABLE KafkaTable (
`user` STRING,
 `url` STRING,
 `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
 'connector' = 'kafka',
 'topic' = 'events',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
)
这里定义了 Kafka 连接器对应的主题( topic ), Kafka 服务器,消费者组 ID ,消费者起始
模式以及表格式。需要特别说明的是,在 KafkaTable 的字段中有一个 ts ,它的声明中用到了
METADATA FROM ,这是表示一个“元数据列”( metadata column ),它是由 Kafka 连接器的
元数据“ timestamp ”生成的。这里的 timestamp 其实就是 Kafka 中数据自带的时间戳,我们把
它直接作为元数据提取出来,转换成一个新的字段 ts
3. Upsert Kafka
正常情况下, Kafka 作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对
应在表中就是仅追加( append-only )模式。如果我们想要将有更新操作(比如分组聚合)的结
果表写入 Kafka ,就会因为 Kafka 无法识别撤回( retract )或更新插入( upsert )消息而导致异
常。
为了解决这个问题, Flink 专门增加了一个 更新插入 Kafka” Upsert Kafka )连接器。这
个连接器支持以更新插入( UPSERT )的方式向 Kafka topic 中读写数据。
具体来说, Upsert Kafka 连接器处理的是更新日志( changlog )流。如果作为 TableSource
连接器会将读取到的 topic 中的数据( key, value ),解释为对当前 key 的数据值的更新( UPDATE ),
也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所
以如果没有 key 对应的行,那么也会执行插入( INSERT )操作。另外,如果遇到 value 为空
null ),连接器就把这条数据理解为对相应 key 那一行的删除( DELETE )操作。
如果作为 TableSink Upsert Kafka 连接器会将有更新操作的结果表,转换成更新日志
changelog )流。如果遇到插入( INSERT )或者更新后( UPDATE_AFTER )的数据,对应
的是一个添加( add )消息,那么就直接正常写入 Kafka 主题;如果是删除( DELETE )或者
更新前的数据,对应是一个撤回( retract )消息,那么就把 value 为空( null )的数据写入 Kafka
由于 Flink 是根据键( key )的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删
除消息都会落到同一个分区中。
下面是一个创建和使用 Upsert Kafka 表的例子:
CREATE TABLE pageviews_per_region (
 user_region STRING,
 pv BIGINT,
 uv BIGINT,
 PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
 'connector' = 'upsert-kafka',
 'topic' = 'pageviews_per_region',
 'properties.bootstrap.servers' = '...',
 'key.format' = 'avro',
 'value.format' = 'avro'
);
CREATE TABLE pageviews (
 user_id BIGINT,
 page_id BIGINT,
 viewtime TIMESTAMP,
 user_region STRING,
 WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'pageviews',
363
 'properties.bootstrap.servers' = '...',
 'format' = 'json'
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECT
 user_region,
 COUNT(*),
 COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
这里我们从 Kafka pageviews 中读取数据,统计每个区域的 PV (全部浏览量)和 UV
(对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。为了将结
果表写入 Kafka pageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中
需要用 PRIMARY KEY 来指定主键,并且在 WITH 子句中分别指定 key value 的序列化格式。
11.9.2 文件系统
另一类非常常见的外部系统就是文件系统( File System )了。 Flink 提供了文件系统的连
接器,支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在 Flink 中的,所以
使用它并不需要额外引入依赖。
下面是一个连接到文件系统的示例:
CREATE TABLE MyTable (
 column_name1 INT,
 column_name2 STRING,
 ...
 part_name1 INT,
 part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
 'connector' = 'filesystem', -- 连接器类型
 'path' = '...', -- 文件路径
 'format' = '...' -- 文件格式
)
这里在 WITH 前使用了 PARTITIONED BY 对数据进行了分区操作。文件系统连接器支持
对分区文件的访问。
11.9.3 JDBC
关系型数据表本身就是 SQL 最初应用的地方,所以我们也会希望能直接向关系型数据库
中读写表数据。 Flink 提供的 JDBC 连接器可以通过 JDBC 驱动程序( driver )向任意的关系型
数据库读写数据,比如 MySQL PostgreSQL Derby 等。
364 作为 TableSink 向数据库写入数据时,运行的模式取决于创建表的 DDL 是否定义了主键
primary key )。如果有主键,那么 JDBC 连接器就将以更新插入( Upsert )模式运行,可以向
外部数据库发送按照指定键( key )的更新( UPDATE )和删除( DELETE )操作;如果没有
定义主键,那么就将在追加( Append )模式下运行,不支持更新和删除操作。
1. 引入依赖
想要在 Flink 程序中使用 JDBC 连接器,需要引入如下依赖:
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
此外,为了连接到特定的数据库,我们还用引入相关的驱动器依赖,比如 MySQL
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.38</version>
</dependency>
这里引入的驱动器版本是 5.1.38 ,读者可以依据自己的 MySQL 版本来进行选择。
2. 创建 JDBC
创建 JDBC 表的方法与前面 Upsert Kafka 大同小异。下面是一个具体示例:
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (
 id BIGINT,
 name STRING,
 age INT,
 status BOOLEAN,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
 'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;
这里创建表的 DDL 中定义了主键,所以数据会以 Upsert 模式写入到 MySQL 表中;而到
MySQL 的连接,是通过 WITH 子句中的 url 定义的。要注意写入 MySQL 中真正的表名称是
users ,而 MyTable 是注册在 Flink 表环境中的表。
11.9.4 Elasticsearch
Elasticsearch 作为分布式搜索分析引擎,在大数据应用中有非常多的场景。 Flink 提供的
Elasticsearch SQL 连接器只能作为 TableSink ,可以将表数据写入 Elasticsearch 的索引( index )。
Elasticsearch 连接器的使用与 JDBC 连接器非常相似,写入数据的模式同样是由创建表的 DDL
中是否有主键定义决定的。
1. 引入依赖
想要在 Flink 程序中使用 Elasticsearch 连接器,需要引入对应的依赖。具体的依赖与
Elasticsearch 服务器的版本有关,对于 6.x 版本引入依赖如下:
<dependency>
 <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
对于 Elasticsearch 7 以上的版本,引入的依赖则是:
<dependency>
 <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
2. 创建连接到 Elasticsearch 的表
创建 Elasticsearch 表的方法与 JDBC 表基本一致。下面是一个具体示例:
-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (
 user_id STRING,
 user_name STRING
 uv BIGINT,
 pv BIGINT,
 PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
 'connector' = 'elasticsearch-7',
 'hosts' = 'http://localhost:9200',
 'index' = 'users'
);
这里定义了主键,所以会以更新插入( Upsert )模式向 Elasticsearch 写入数据。
11.9.5 HBase
作为高性能、可伸缩的分布式列存储数据库, HBase 在大数据分析中是一个非常重要的工
具。 Flink 提供的 HBase 连接器支持面向 HBase 集群的读写操作。
366 在流处理场景下,连接器作为 TableSink HBase 写入数据时,采用的始终是更新插入
Upsert )模式。也就是说, HBase 要求连接器必须通过定义的主键( primary key )来发送更
新日志( changelog )。所以在创建表的 DDL 中,我们必须要定义行键( rowkey )字段,并将
它声明为主键;如果没有用 PRIMARY KEY 子句声明主键,连接器会默认把 rowkey 作为主键。
1. 引入依赖
想要在 Flink 程序中使用 HBase 连接器,需要引入对应的依赖。目前 Flink 只对 HBase
1.4.x 2.2.x 版本提供了连接器支持,而引入的依赖也应该与具体的 HBase 版本有关。对于
1.4 版本引入依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-1.4_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
对于 HBase 2.2 版本,引入的依赖则是:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
2. 创建连接到 HBase 的表
由于 HBase 并不是关系型数据库,因此转换为 Flink SQL 中的表会稍有一些麻烦。在 DDL
创建出的 HBase 表中,所有的列族( column family )都必须声明为 ROW 类型,在表中占据一
个字段;而每个 family 中的列( column qualifier )则对应着 ROW 里的嵌套字段。我们不需要
HBase 中所有的 family qualifier 都在 Flink SQL 的表中声明出来,只要把那些在查询中用
到的声明出来就可以了。
除了所有 ROW 类型的字段(对应着 HBase 中的 family ),表中还应有一个原子类型的字
段,它就会被识别为 HBase rowkey 。在表中这个字段可以任意取名,不一定非要叫 rowkey
下面是一个具体示例:
-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);

-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
我们将另一张 T 中的数据提取出来,并用 ROW() 函数来构造出对应的 column family ,最
终写入 HBase 中名为 mytable 的表。
11.9.6 Hive
Apache Hive 作为一个基于 Hadoop 的数据仓库基础框架,可以说已经成为了进行海量数
据分析的核心组件。 Hive 支持类 SQL 的查询语言,可以用来方便对数据进行处理和统计分析,
而且基于 HDFS 的数据存储有非常好的可扩展性,是存储分析超大量数据集的唯一选择。 Hive
的主要缺点在于查询的延迟很高,几乎成了离线分析的代言人。而 Flink 的特点就是实时性强,
所以 Flink SQL Hive 的结合势在必行。
Flink Hive 的集成比较特别。 Flink 提供了“ Hive 目录”( HiveCatalog )功能,允许使用
Hive 的“元存储”( Metastore )来管理 Flink 的元数据。这带来的好处体现在两个方面:
1 Metastore 可以作为一个持久化的目录,因此使用 HiveCatalog 可以跨会话存储 Flink
特定的元数据。这样一来,我们在 HiveCatalog 中执行执行创建 Kafka 表或者 ElasticSearch 表,
就可以把它们的元数据持久化存储在 Hive Metastore 中;对于不同的作业会话就不需要重复
创建了,直接在 SQL 查询中重用就可以。
2 )使用 HiveCatalog Flink 可以作为读写 Hive 表的替代分析引擎。这样一来,在 Hive
中进行批处理会更加高效;与此同时,也有了连续在 Hive 中读写数据、进行流处理的能力,
这也使得“实时数仓”( real-time data warehouse )成为了可能。
HiveCatalog 被设计为“开箱即用”,与现有的 Hive 配置完全兼容,我们不需要做任何的
修改与调整就可以直接使用。注意只有 Blink 的计划器( planner )提供了 Hive 集成的支持,
所以需要在使用 Flink SQL 时选择 Blink planner 。下面我们就来看以下与 Hive 集成的具体步骤。
1. 引入依赖
Hive 各版本特性变化比较大,所以使用时需要注意版本的兼容性。目前 Flink 支持的 Hive
版本包括:
Hive 1.x 1.0.0~1.2.2
Hive 2.x 2.0.0~2.2.0 2.3.0~2.3.6
Hive 3.x 3.0.0~3.1.2
目前 Flink Hive 的集成程度在持续加强,支持的版本信息也会不停变化和调整,大家
可以随着关注官网的更新信息。
由于 Hive 是基于 Hadoop 的组件,因此我们首先需要提供 Hadoop 的相关支持,在环境变
量中设置 HADOOP_CLASSPATH
export HADOOP_CLASSPATH=`hadoop classpath` 369
Flink 程序中可以引入以下依赖:
<!-- Flink 的 Hive 连接器-->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

<!-- Hive 依赖 -->
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-exec</artifactId>
 <version>${hive.version}</version>
</dependency>
建议不要把这些依赖打包到结果 jar 文件中,而是在运行时的集群环境中为不同的 Hive
版本添加不同的依赖支持。具体版本对应的依赖关系,可以查询官网说明。
2. 连接到 Hive
Flink 中连接 Hive ,是通过在表环境中配置 HiveCatalog 来实现的。需要说明的是,配
HiveCatalog 本身并不需要限定使用哪个 planner ,不过对 Hive 表的读写操作只有 Blink
planner 才支持。所以一般我们需要将表环境的 planner 设置为 Blink
下面是代码中配置 Catalog 的示例:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
// 创建一个 HiveCatalog,并在表环境中注册
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用 HiveCatalog 作为当前会话的 catalog
tableEnv.useCatalog("myhive");
当然,我们也可以直接启动 SQL 客户端,用 CREATE CATALOG 语句直接创建 HiveCatalog
Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = 
'/opt/hive-conf');
[INFO] Execute statement succeed.
Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.
3. 设置 SQL 方言 我们知道, Hive 内部提供了类 SQL 的查询语言,不过语法细节与标准 SQL 会有一些出入,
相当于是 SQL 的一种“方言”( dialect )。为了提高与 Hive 集成时的兼容性, Flink SQL 提供了
一个非常有趣而强大的功能:可以使用方言来编写 SQL 语句。换句话说,我们可以直接在 Flink
中写 Hive SQL 来操作 Hive 表,这无疑给我们的读写处理带来了极大的方便。
Flink 目前支持两种 SQL 方言的配置: default hive 。所谓的 default 就是 Flink SQL 默认
SQL 语法了。我们需要先切换到 hive 方言,然后才能使用 Hive SQL 的语法。具体设置可
以分为 SQL Table API 两种方式。
1 SQL 中设置
我们可以通过配置 table.sql-dialect 属性来设置 SQL 方言:
set table.sql-dialect=hive;
当然,我们可以在代码中执行上面的 SET 语句,也可以直接启动 SQL 客户端来运行。如
果使用 SQL 客户端,我们还可以在配置文件 sql-cli-defaults.yaml 中通过 “configuration” 模块来
设置:
execution:
planner: blink
type: batch
result-mode: table
configuration:
table.sql-dialect: hive
2 Table API 中设置
另外一种方式就是在代码中,直接使用 Table API 获取表环境的配置项来进行设置:
// 配置 hive 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 配置 default 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
4. 读写 Hive
有了 SQL 方言的设置,我们就可以很方便的在 Flink 中创建 Hive 表并进行读写操作了。
Flink 支持以批处理和流处理模式向 Hive 中读写数据。在批处理模式下, Flink 会在执行查询
语句时对 Hive 表进行一次性读取,在作业完成时将结果数据向 Hive 表进行一次性写入;而在
流处理模式下, Flink 会持续监控 Hive 表,在新数据可用时增量读取,也可以持续写入新数据
并增量式地让它们可见。
更灵活的是,我们可以随时切换 SQL 方言,从其它数据源(例如 Kafka )读取数据、经
转换后再写入 Hive 。下面是以纯 SQL 形式编写的一个示例,我们可以启动 SQL 客户端来运行:
-- 设置 SQL 方言为 hive,创建 Hive 表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
 user_id STRING,
 order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
370
 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 设置 SQL 方言为 default,创建 Kafka 表
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
 user_id STRING,
 order_amount DOUBLE,
 log_ts TIMESTAMP(3),
 WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND – 定义水位线
) WITH (...);
-- 将 Kafka 中读取的数据经转换后写入 Hive 
INSERT INTO TABLE hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
这里我们创建 Hive 表时设置了通过分区时间来触发提交的策略。将 Kafka 中读取的数据
经转换后写入 Hive ,这是一个流处理的 Flink SQL 程序。
11.10 本章总结
在本章中,我们从一个简单示例入手,由浅入深地介绍了 Flink Table API SQL 的用法。
由于这两套 API 底层原理一致,而 Table API 功能不够完善、应用不够方便,实际项目开发往
往写 SQL 居多;因此本章内容是以 Flink SQL 的各种功能特性为主线贯穿始终,对 Table API
只做原理性讲解。
11.2 节主要介绍 Table API SQL 的基本用法,有了这部分知识,就可以写出完整的 Flink
SQL 程序了。 11.3 节深入讲解了表和 SQL 在流处理中的一些核心概念,比如动态表和持续查
询,更新查询和追加查询等;这些知识或许对于应用逻辑没有太大帮助,然而却是深入理解流
式处理架构的关键,也是从程序员向着架构师迈进的路上必须跨越的门槛。 11.4~11.6 节主要
介绍 Flink SQL 中的高级特性:窗口、聚合查询和联结查询,在这一部分中标准 SQL 语法和
Flink DataStream API 彼此渗透融合,在流处理中使用 SQL 查询的特色体现得淋漓尽致;另
一方面,这几节也是对 SQL DataStream API 知识的一个总结。 11.7 节详细讲解了函数的用
法,这部分主要是一个知识扩展,实际应用的场景较少,一般只需要知道系统函数的用法就够
了。 11.8 11.9 两节介绍了 SQL 客户端工具和外部系统的连接器,内容相对比较简单,主要
侧重于实际应用场景。
本章内容较多,如果仅以快速应用为目的,读者可以主要浏览 11.1 11.2 11.4 11.5
371 11.9 这五节内容;当然如果时间精力充沛,还是建议完整通读,并在官网详细浏览相关资料。
Table API SQL Flink 最上层的应用接口,目前尚不完善,但发展非常迅速,每个小版本
都会有底层优化和功能扩展。可以想到不久的将来, Flink SQL 将会是最为高效、最为普遍的
开发手段,我们应该时刻保持跟进,随着框架的发展完善不断提升自己的技术能力。

12. Flink CEP

Flink 的学习过程中,从基本原理和核心层 DataStream API 到底层的处理函数、再到应
用层的 Table API SQL ,我们已经掌握了 Flink 编程的各种手段,可以应对实际应用开发的
各种需求了。
在大数据分析领域,一大类需求就是诸如 PV UV 这样的统计指标,我们往往可以直接
SQL 搞定;对于比较复杂的业务逻辑, SQL 中可能没有对应功能的内置函数,那么我们也
可以使用 DataStream API ,利用状态编程来进行实现。不过在实际应用中,还有一类需求是要
检测以特定顺序先后发生的一组事件,进行统计或做报警提示,这就比较麻烦了。例如,网站
做用户管理,可能需要检测“连续登录失败”事件的发生,这是个组合事件,其实就是“登录
失败”和“登录失败”的组合;电商网站可能需要检测用户“下单支付”行为,这也是组合事
件,“下单”事件之后一段时间内又会有“支付”事件到来,还包括了时间上的限制。
类似的多个事件的组合,我们把它叫作“复杂事件”。对于复杂时间的处理,由于涉及到
事件的严格顺序,有时还有时间约束,我们很难直接用 SQL 或者 DataStream API 来完成。于
是只好放大招——派底层的处理函数( process function )上阵了。处理函数确实可以搞定这些
需求,不过对于非常复杂的组合事件,我们可能需要设置很多状态、定时器,并在代码中定义
各种条件分支( if-else )逻辑来处理,复杂度会非常高,很可能会使代码失去可读性。怎样处
理这类复杂事件呢? Flink 为我们提供了专门用于处理复杂事件的库—— CEP ,可以让我们更
加轻松地解决这类棘手的问题。这在企业的实时风险控制中有非常重要的作用。
本章我们就来了解一下 Flink CEP 的用法。
12.1 基本概念
在写代码之前,我们首先需要了解一些基本概念,这要从 CEP 的基本定义和特点说起。
12.1.1 CEP 是什么
所谓 CEP ,其实就是“复杂事件处理( Complex Event Processing )”的缩写;而 Flink CEP
就是 Flink 实现的一个用于复杂事件处理的库( library )。
那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行
处理,比如说“连续登录失败”,或者“订单支付超时”等等。
具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就
复杂事件 ;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行
输出。
总结起来,复杂事件处理( CEP )的流程可以分成三个步骤:
1 )定义一个匹配规则
2 )将匹配规则应用到事件流上,检测满足规则的复杂事件
3 )对检测到的复杂事件进行处理,得到结果进行输出
如图 12-1 所示,输入是不同形状的事件流,我们可以定义一个匹配规则:在圆形后面紧
跟着三角形。那么将这个规则应用到输入流上,就可以检测到三组匹配的复杂事件。它们构成
了一个新的“复杂事件流”,流中的数据就变成了一组一组的复杂事件,每个数据都包含了一
个圆形和一个三角形。接下来,我们就可以针对检测到的复杂事件,处理之后输出一个提示或
报警信息了。
所以, CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流。它的主要目的,
就是在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。
12.1.2 模式( Pattern
CEP 的第一步所定义的匹配规则,我们可以把它叫作“模式”( Pattern )。模式的定义主要
就是两部分内容:
每个简单事件的特征
简单事件之间的组合关系
当然,我们也可以进一步扩展模式的功能。比如,匹配检测的时间限制;每个简单事件是
否可以重复出现;对于事件可重复出现的模式,遇到一个匹配后是否跳过后面的匹配;等等。
所谓“事件之间的组合关系”,一般就是定义“谁后面接着是谁”,也就是事件发生的顺序。
我们把它叫作“近邻关系”。可以定义严格的近邻关系,也就是两个事件之前不能有任何其他
事件;也可以定义宽松的近邻关系,即只要前后顺序正确即可,中间可以有其他事件。另外,
还可以反向定义,也就是“谁后面不能跟着谁”。
CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连
续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有
满足匹配条件,就会导致模式匹配超时( timeout )。
Flink CEP 为我们提供了丰富的 API ,可以实现上面关于模式的所有功能,这套 API 就叫
作“模式 API ”( Pattern API )。关于 Pattern API ,我们会在后面的 12.3 节中详细介绍。
12.1.3 应用场景
CEP 主要用于实时流数据的分析处理。 CEP 可以帮助在复杂的、看似不相关的事件流中
找出那些有意义的事件组合,进而可以接近实时地进行分析判断、输出通知信息或报警。这在
企业项目的风控管理、用户画像和运维监控中,都有非常重要的应用。
风险控制
设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行
为模式,比如短时间内频繁登录并失败、大量下单却不支付(刷单),就可以向用户发送通知
信息,或是进行报警提示、由人工进一步判定用户是否有违规操作的嫌疑。这样就可以有效地
控制用户个人和平台的风险。
用户画像
利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有
特定行为习惯的一些用户,做出相应的用户画像。基于用户画像可以进行精准营销,即对行为
匹配预定义规则的用户实时发送相应的营销推广;这与目前很多企业所做的精准推荐原理是一
样的。
运维监控
对于企业服务的运维管理,可以利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控
模式。 CEP 的应用场景非常丰富。很多大数据框架,如 Spark Samza Beam 等都提供了不同的
CEP 解决方案,但没有专门的库( library )。而 Flink 提供了专门的 CEP 库用于复杂事件处理,
可以说是目前 CEP 的最佳解决方案。
12.2 快速上手
了解了 CEP 是什么,接下来我们就可以在代码中进行调用,尝试用 Flink CEP 来实现具
体的需求了。
12.2.1 需要引入的依赖
想要在代码中使用 Flink CEP ,需要在项目的 pom 文件中添加相关依赖:
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-cep_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
为了精简和避免依赖冲突, Flink 会保持尽量少的核心依赖。所以核心依赖中并不包括任
何的连接器( conncetor )和库,这里的库就包括了 SQL CEP 以及 ML 等等。所以如果想要
Flink 集群中提交运行 CEP 作业,应该向 Flink SQL 那样将依赖的 jar 包放在 /lib 目录下。
从这个角度来看, Flink CEP Flink SQL 一样,都是最顶层的应用级 API
12.2.2 一个简单实例
接下来我们考虑一个具体的需求:检测用户行为,如果连续三次登录失败,就输出报警信
息。很显然,这是一个复杂事件的检测处理,我们可以使用 Flink CEP 来实现。
我们首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单
独定义一个登录事件 POJO 类。具体实现如下:
public class LoginEvent {
 public String userId;
 public String ipAddress;
 public String eventType;
 public Long timestamp;
 public LoginEvent(String userId, String ipAddress, String eventType, Long 
timestamp) {
 this.userId = userId;
 this.ipAddress = ipAddress;
 this.eventType = eventType;
 this.timestamp = timestamp;
 }
 public LoginEvent() {}
 @Override
 public String toString() {
 return "LoginEvent{" +
 "userId='" + userId + '\'' +
 ", ipAddress='" + ipAddress + '\'' +
 ", eventType='" + eventType + '\'' +
 ", timestamp=" + timestamp +
 '}';
 }
}
接下来就是业务逻辑的编写。 Flink CEP 在代码中主要通过 Pattern API 来实现。之前我们
已经介绍过, CEP 的主要处理流程分为三步,对应到 Pattern API 中就是:
1 )定义一个模式( Pattern );
2 )将 Pattern 应用到 DataStream 上,检测满足规则的复杂事件,得到一个 PatternStream
3 )对 PatternStream 进行转换处理,将检测到的复杂事件提取出来,包装成报警信息输
出。
具体代码实现如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
public class LoginFailDetect {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 获取登录事件流,并提取时间戳、生成水位线
 KeyedStream<LoginEvent, String> stream = env
 .fromElements(
 new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
 )
 .assignTimestampsAndWatermarks(
 WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
 .withTimestampAssigner(
 new SerializableTimestampAssigner<LoginEvent>() {
 @Override
 public long extractTimestamp(LoginEvent loginEvent, long l) 
{
 return loginEvent.timestamp;
 }
 }
 )
 )
 .keyBy(r -> r.userId);
 // 1. 定义 Pattern,连续的三个登录失败事件
 Pattern<LoginEvent, LoginEvent> pattern = Pattern
 .<LoginEvent>begin("first") // 以第一个登录失败事件开始
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 })
 .next("second") // 接着是第二个登录失败事件
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 })
 .next("third") // 接着是第三个登录失败事件
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 });
 // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
 PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
377
 // 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
 patternStream
 .select(new PatternSelectFunction<LoginEvent, String>() {
 @Override
 public String select(Map<String, List<LoginEvent>> map) throws 
Exception {
 LoginEvent first = map.get("first").get(0);
 LoginEvent second = map.get("second").get(0);
 LoginEvent third = map.get("third").get(0);
 return first.userId + " 连续三次登录失败!登录时间:" + 
first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
 }
 })
 .print("warning");
 env.execute();
 }
}
在上面的程序中,模式中的每个简单事件,会用一个 .where() 方法来指定一个约束条件,
指明每个事件的特征,这里就是 eventType 为“ fail ”。
而模式里表示事件之间的关系时,使用了 .next() 方法。 next 下一个 的意思,表示紧挨
着、中间不能有其他事件(比如登录成功),这是一个严格近邻关系。第一个事件用 .begin()
法表示开始。所有这些“连接词”都可以有一个字符串作为参数,这个字符串就可以认为是当
前简单事件的名称。所以我们如果检测到一组匹配的复杂事件,里面就会有连续的三个登录失
败事件,它们的名称分别叫作“ first ”“ second ”和“ third ”。
在 第 三 步 处 理 复 杂 事 件 时 , 调 用 了 PatternStream .select() 方 法 , 传 入 一 个
PatternSelectFunction 对检测到的复杂事件进行处理。而检测到的复杂事件,会放在一个 Map
中; PatternSelectFunction .select() 方法有一个类型为 Map<String, List<LoginEvent>> 的参数
map ,里面就保存了检测到的匹配事件。这里的 key 是一个字符串,对应着事件的名称,而 value
LoginEvent 的一个列表,匹配到的登录失败事件就保存在这个列表里。最终我们提取 userId
和三次登录的时间戳,包装成字符串输出一个报警信息。
运行代码可以得到结果如下:
warning> user_1 连续三次登录失败!登录时间: 2000, 3000, 5000
可以看到, user_1 连续三次登录失败被检测到了;而 user_2 尽管也有三次登录失败,但
中间有一次登录成功,所以不会被匹配到。
12.3 模式 API Pattern API
Flink CEP 的核心是复杂事件的模式匹配。 Flink CEP 库中提供了 Pattern 类,基于它可以
调用一系列方法来定义匹配模式,这就是所谓的模式 API Pattern API )。 Pattern API 可以让我
378 们定义各种复杂的事件组合规则,用于从事件流中提取复杂事件。在上节中我们已经对 Pattern
API 有了初步的认识,接下来就对其中的一些概念和用法进行展开讲解。
12.3.1 个体模式
12.1.2 小节中我们已经知道,模式( Pattern )其实就是将一组简单事件组合成复杂事件
匹配规则 。由于流中事件的匹配是有先后顺序的,因此一个匹配规则就可以表达成先后发
生的一个个简单事件,按顺序串联组合在一起。
这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个
简单事件的匹配规则,叫作“个体模式”( Individual Pattern )。
1. 基本形式
12.2.2 小节中,每一个登录失败事件的选取规则,就都是一个个体模式。比如:
.<LoginEvent>begin("first") // 以第一个登录失败事件开始
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 })
或者后面的:
.next("second") // 接着是第二个登录失败事件
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 })
这些都是个体模式。个体模式一般都会匹配接收一个事件。
每个个体模式都以一个“连接词”开始定义的,比如 begin next 等等,这是 Pattern 对象
的一个方法( begin Pattern 类的静态方法),返回的还是一个 Pattern 。这些“连接词”方法
有一个 String 类型参数,这就是当前个体模式唯一的名字,比如这里的“ first ”、“ second ”。在
之后检测到匹配事件时,就会以这个名字来指代匹配事件。
个体模式需要一个“过滤条件”,用来指定具体的匹配规则。这个条件一般是通过调
.where() 方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition 内的 .filter() 方法来定
义。
另外,个体模式可以匹配接收一个事件,也可以接收多个事件。这听起来有点奇怪,一个
单独的匹配规则可能匹配到多个事件吗?这是可能的,我们可以给个体模式增加一个“量词”
quantifier ),就能够让它进行循环匹配,接收多个事件。接下来我们就对量词和条件( condition
进行展开说明。
2. 量词( Quantifiers
个体模式后面可以跟一个“量词”,用来指定循环的次数。从这个角度分类,个体模式可
以包括“单例( singleton )模式”和“循环( looping )模式”。默认情况下,个体模式是单例
模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。
在循环模式中,对同样特征的事件可以匹配多次。比如我们定义个体模式为“匹配形状为
三角形的事件”,再让它循环多次,就变成了“匹配连续多个三角形的事件”。注意这里的“连
续”,只要保证前后顺序即可,中间可以有其他事件,所以是“宽松近邻”关系。
Flink CEP 中,可以使用不同的方法指定循环模式,主要有:
.oneOrMore ()
匹配事件出现一次或多次,假设 a 是一个个体模式, a.oneOrMore() 表示可以匹配 1 个或多
a 的事件组合。我们有时会用 a+ 来简单表示。
.times times
匹配事件发生特定次数( times ),例如 a.times(3) 表示 aaa
.times fromTimes toTimes
指定匹配事件出现的次数范围,最小次数为 fromTimes ,最大次数为 toTimes 。例如 a.times(2,
4) 可以匹配 aa aaa aaaa
.greedy()
只能用在循环模式后,使当前循环模式变得“贪心”( greedy ),也就是总是尽可能多地去
匹配。例如 a.times(2, 4).greedy() ,如果出现了连续 4 a ,那么会直接把 aaaa 检测出来进行处
理,其他任意 2 a 是不算匹配事件的。
.optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
对于一个个体模式 pattern 来说,后面所有可以添加的量词如下:
// 匹配事件出现 4 次
pattern.times(4);
// 匹配事件出现 4 次,或者不出现
pattern.times(4).optional();
// 匹配事件出现 2, 3 或者 4 次
pattern.times(2, 4);
// 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现 2, 3, 4 次,或者不出现
pattern.times(2, 4).optional();
// 匹配事件出现 2, 3, 4 次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现 1 次或多次
pattern.oneOrMore();
// 匹配事件出现 1 次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();
// 匹配事件出现 1 次或多次,或者不出现
pattern.oneOrMore().optional();
// 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现 2 次或多次
pattern.timesOrMore(2);
// 匹配事件出现 2 次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();
// 匹配事件出现 2 次或多次,或者不出现
pattern.timesOrMore(2).optional()
// 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();
正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以之
前代码中事件的检测接收才会用 Map 中的一个列表( List )来保存。而之前代码中没有定义量
词,都是单例模式,所以只会匹配一个事件,每个 List 中也只有一个元素:
LoginEvent first = map.get("first").get(0);
3. 条件( Conditions
对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。 Flink
CEP 会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。
对于条件的定义,主要是通过调用 Pattern 对象的 .where() 方法来实现的,主要可以分为简
单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern 对象的 .subtype()
方法来限定匹配事件的子类型。接下来我们就分别进行介绍。
限定子类型
调用 .subtype() 方法可以为当前模式增加子类型限制条件。例如:
pattern .subtype( SubEvent .class);
这里 SubEvent 是流中数据类型 Event 的子类型。这时,只有当事件是 SubEvent 类型时,
才可以满足当前模式 pattern 的匹配条件。
简单条件( Simple Conditions
简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其
实就是一个 filter 操作。
381 代码中我们为 .where() 方法传入一个 SimpleCondition 的实例作为参数。 SimpleCondition
表示“简单条件”的抽象类,内部有一个 .filter() 方法,唯一的参数就是当前事件。所以它可以
当作 FilterFunction 来使用。
下面是一个具体示例:
pattern.where(new SimpleCondition<Event>() {
 @Override
 public boolean filter(Event value) {
 return value.user.startsWith("A");
 }
});
这里我们要求匹配事件的 user 属性以“ A ”开头。
迭代条件( Iterative Conditions
简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能
需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事
件来做判断的条件,就叫作“迭代条件”( Iterative Condition )。
Flink CEP 中,提供了 IterativeCondition 抽象类。这其实是更加通用的条件表达,查看
源码可以发现, .where() 方法本身要求的参数类型就是 IterativeCondition ;而之前 的
SimpleCondition 是它的一个子类。
IterativeCondition 中同样需要实现一个 filter() 方法,不过与 SimpleCondition 中不同的
是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context 。调用这个上下文
.getEventsForPattern() 方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数
据了。
下面是一个具体示例:
middle.oneOrMore()
 .where(new IterativeCondition<Event>() {
 @Override
 public boolean filter(Event value, Context<Event> ctx) throws Exception {
 // 事件中的 user 必须以 A 开头
 if (!value.user.startsWith("A")) {
 return false;
 }
 
 int sum = value.amount;
 // 获取当前模式之前已经匹配的事件,求所有事件 amount 之和
 for (Event event : ctx.getEventsForPattern("middle")) {
 sum += event.amount;
 }
 // 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功
 return sum < 100;
 }
 });
上面代码中当前模式名称就叫作“ middle ”,这是一个循环模式,可以接受事件发生一次
382 或多次。于是下面的迭代条件中,我们通过 ctx.getEventsForPattern("middle") 获取当前模式已
经接受的事件,计算它们的数量( amount )之和;再加上当前事件中的数量,如果总和小于
100 ,就接受当前事件,否则就不匹配。当然,在迭代条件中我们也可以基于当前事件做出判
断,比如代码中要求 user 必须以 A 开头。最终我们的匹配规则就是:事件的 user 必须以 A
头;并且循环匹配的所有事件 amount 之和必须小于 100 。这里的 Event 与之前定义的 POJO
同,增加了 amount 属性。
可以看到,迭代条件能够获取已经匹配的事件,如果自身又是循环模式(比如量词
oneOrMore ),那么两者结合就可以捕获自身之前接收的数据,据此来判断是否接受当前事件。
这个功能非常强大,我们可以由此实现更加复杂的需求,比如可以要求“只有大于之前数据的
平均值,才接受当前事件”。
另外迭代条件中的上下文 Context 也可以获取到时间相关的信息,比如事件的时间戳和当
前的处理时间( processing time )。
组合条件( Combining Conditions
如果一个个体模式有多个限定条件,又该怎么定义呢?
最直接的想法是,可以在简单条件或者迭代条件的 .filter() 方法中,增加多个判断逻辑。可
以通过 if-else 的条件分支分别定义多个条件,也可以直接在 return 返回时给一个多条件的逻辑
组合(与、或、非)。不过这样会让代码变得臃肿,可读性降低。更好的方式是独立定义多个
条件,然后在外部把它们连接起来,构成一个“组合条件”( Combining Condition )。
最简单的组合条件,就是 .where() 后面再接一个 .where() 。因为前面提到过,一个条件就像
是一个 filter 操作,所以每次调用 .where() 方法都相当于做了一次过滤,连续多次调用就表示多
重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”
AND )。
而多个条件的逻辑或( OR ),则可以通过 .where() 后加一个 .or() 来实现。这里的 .or() 方法
.where() 一样,传入一个 IterativeCondition 作为参数,定义一个独立的条件;它和之前 .where()
定义的条件只要满足一个,当前事件就可以成功匹配。
当然,子类型限定条件( subtype )也可以和其他条件结合起来,成为组合条件,如下所
示:
pattern.subtype(SubEvent.class)
.where(new SimpleCondition<SubEvent>() {
 @Override
 public boolean filter(SubEvent value) {
 return ... // some condition
 }
});
这里可以看到, SimpleCondition 的泛型参数也变成了 SubEvent ,所以匹配出的事件就既
满足子类型限制,又符合过滤筛选的简单条件;这也是一个逻辑与的关系。
终止条件( Stop Conditions
对于循环模式而言,还可以指定一个 终止条件 Stop Condition ),表示遇到某个特定事
件时当前模式就不再继续循环匹配了。
终 止 条 件 的 定 义 是 通 过 调 用 模 式 对 象 的 .until() 方 法 来 实 现 的 , 同 样 传 入 一 个
IterativeCondition 作为参数。需要注意的是,终止条件只与 oneOrMore() 或 者
oneOrMore().optional() 结合使用。因为在这种循环模式下,我们不知道后面还有没有事件可以
匹配,只好把之前匹配的事件作为状态缓存起来继续等待,这等待无穷无尽;如果一直等下去,
缓存的状态越来越多,最终会耗尽内存。所以这种循环模式必须有个终点,当 .until() 指定的条
件满足时,循环终止,这样就可以清空状态释放内存了。
12.3.2 组合模式
有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂
事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”( Combining
Pattern ),为了跟个体模式区分有时也叫作“模式序列”( Pattern Sequence )。
一个组合模式有以下形式:
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
 .next("next").where(...)
 .followedBy("follow").where(...)
 ...
可以看到,组合模式确实就是一个“模式序列”,是用诸如 begin next followedBy 等表
示先后顺序的“连接词”将个体模式串连起来得到的。在这样的语法调用中,每个事件匹配的
条件是什么、各个事件之间谁先谁后、近邻关系如何都定义得一目了然。每一个“连接词”方
法调用之后,得到的都仍然是一个 Pattern 的对象;所以从 Java 对象的角度看,组合模式与个
体模式是一样的,都是 Pattern
1. 初始模式( Initial Pattern
所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用 Pattern
静态方法 .begin() 来创建。如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
这里我们调用 Pattern .begin() 方法创建了一个初始模式。传入的 String 类型的参数就是
模式的名称;而 begin 方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,
这里我们定义为 Event 。调用的结果返回一个 Pattern 的对象实例。 Pattern 有两个泛型参数,第
一个就是检测事件的基本类型 Event ,跟 begin 指定的类型一致;第二个则是当前模式里事件
的子类型,由子类型限制条件指定。我们这里用类型通配符(?)代替,就可以从上下文直接
推断了。
2. 近邻条件( Contiguity Conditions
在初始模式之后,我们就可以按照复杂事件的顺序追加模式,组合成模式序列了。模式之
384 间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻
关系,这就是所谓的“近邻条件”( Contiguity Conditions ,也叫“连续性条件”)。
Flink CEP 中提供了三种近邻关系:
严格近邻( Strict Contiguity
如图 12-2 所示,匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。
代码中对应的就是 Pattern .next() 方法,名称上就能看出来,“下一个”自然就是紧挨着的。
宽松近邻( Relaxed Contiguity
如图 12-2 所示,宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,
也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应 .followedBy() 方法,
很明显这表示“跟在后面”就可以,不需要紧紧相邻。
        
非确定性宽松近邻( Non-Deterministic Relaxed Contiguity
这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这
种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽
松近邻更多,如图 11-3 所示。代码中对应 .followedByAny() 方法。
                
从图中可以看到,我们定义的模式序列中有两个个体模式:一是 选择圆形事件 ,一是
择三角形事件 ;这时它们之间的近邻条件就会导致匹配出的复杂事件有所不同。很明显,严
格近邻由于条件苛刻,匹配的事件最少;宽松近邻可以匹配不紧邻的事件,匹配结果会多一些;
而非确定性宽松近邻条件最为宽松,可以匹配到最多的复杂事件。
3. 其他限制条件
除了上面提到的 next() followedBy() followedByAny() 可以分别表示三种近邻条件,我
们还可以用否定的“连接词”来组合个体模式。主要包括:
.notNext()
表示前一个模式匹配到的事件后面,不能紧跟着某种事件。
.notFollowedBy()
表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于
notFollowedBy() 是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某
种事件”。所以一个模式序列不能以 notFollowedBy() 结尾,这个限定条件主要用来表示“两个
事件中间不会出现某种事件”。
另外, Flink CEP 中还可以为模式指定一个时间限制,这是通过调用 .within() 方法实现的。
方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只
有在这期间成功匹配的复杂事件才是有效的。一个模式序列中只能有一个时间限制,调
.within() 的位置不限;如果多次调用则会以最小的那个时间间隔为准。
下面是模式序列中所有限制条件在代码中的定义:
// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);
// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = 
start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// 时间限制条件
middle.within(Time.seconds(10));
4. 循环模式中的近邻条件
之前我们讨论的都是模式序列中限制条件,主要用来指定前后发生的事件之间的近邻关系。
而循环模式虽说是个体模式,却也可以匹配多个事件;那这些事件之间自然也会有近邻关系的
讨论。
在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于
定义了量词(如 oneOrMore() times() )的循环模式,默认内部采用的是宽松近邻。也就是说,
当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、
再用 followedBy() 连接起来。这就解释了在 12.2.2 小节的示例代码中,为什么我们检测连续三
次登录失败用了三个单例模式来分别定义,而没有直接指定 times(3) :因为我们需要三次登录
失败必须是严格连续的,中间不能有登录成功的事件,而 times() 默认是宽松近邻关系。
不过把多个同样的单例模式组合在一起,这种方式还是显得有些笨拙了。连续三次登录失
败看起来不太复杂,那如果要检测连续 100 次登录失败呢?显然使用 times() 是更明智的选择。
不过它默认匹配事件之间是宽松近邻关系,我们可以通过调用额外的方法来改变这一点。
.consecutive()
为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是
说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的
next() 一样,需要与循环量词 times() oneOrMore() 配合使用。
于是, 12.2.2 小节中检测连续三次登录失败的代码可以改成:
// 1. 定义 Pattern,登录失败事件,循环检测 3 次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
 .<LoginEvent>begin("fails")
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 }).times(3).consecutive();
这样显得更加简洁;而且即使要扩展到连续 100 次登录失败,也只需要改动一个参数而已。
不过这样一来,后续提取匹配事件的方式也会有所不同,我们将在稍后的 12.4.2 节继续讲解。
.allowCombinations()
除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使
用 已 经 匹 配 的 事 件 。 这 需 要 调 用 .allowCombinations() 方 法 来 实 现 , 实 现 的 效 果
.followedByAny() 相同。
12.3.3 模式组
一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在
有些非常复杂的场景中,可能需要划分多个 阶段 ,每个 阶段 又有一连串的匹配规则。为了
应对这样的需求, Flink CEP 允许我们以“嵌套”的方式来定义模式。 之前在模式序列中,我们用 begin() next() followedBy() followedByAny() 这样的“连
接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一
个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”
Groups of Patterns )。
在模式组中,每一个模式序列就被当作了某一阶段的匹配条件,返回的类型是一个
GroupPattern 。而 GroupPattern 本身是 Pattern 的子类;所以个体模式和组合模式能调用的方法,
比如 times() oneOrMore() optional() 之类的量词,模式组一般也是可以用的。
具体在代码中的应用如下所示:
// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start_start").where(...)
.followedBy("start_middle").where(...)
);
// 在 start 后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...)
.followedBy("next_middle").where(...)
).times(2);
// 在 start 后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...)
.followedBy("followedby_middle").where(...)
).oneOrMore();
//在 start 后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...)
.followedBy("followedbyany_middle").where(...)
).optional();
12.3.4 匹配后跳过策略
Flink CEP 中,由于有循环模式和非确定性宽松近邻的存在,同一个事件有可能会重复
利用,被分配到不同的匹配结果中。这样会导致匹配结果规模增大,有时会显得非常冗余。当
然,非确定性宽松近邻条件,本来就是为了放宽限制、扩充匹配结果而设计的;我们主要是针
对循环模式来考虑匹配结果的精简。
之前已经讲过,如果对循环模式增加了 .greedy() 的限制,那么就会“尽可能多地”匹配事
件,这样就可以砍掉那些子集上的匹配了。不过这种方式还是略显简单粗暴,如果我们想要精
确控制事件的匹配应该跳过哪些情况,那就需要制定另外的策略了。
388 Flink CEP 中,提供了模式的“匹配后跳过策略”( After Match Skip Strategy ),专门用
来精准控制循环模式的匹配结果。这个策略可以在 Pattern 的初始模式定义中,作为 begin()
第二个参数传入:
Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
.where(...)
...
匹配后跳过策略 AfterMatchSkipStrategy 是一个抽象类,它有多个具体的实现,可以通过
调用对应的静态方法来返回对应的策略实例。这里我们配置的是不做跳过处理,这也是默认策
略。
下面我们举例来说明不同的跳过策略。例如我们要检测的复杂事件模式为:开始是用户名
a 的事件(简写为事件 a ,下同),可以重复一次或多次;然后跟着一个用户名为 b 的事件,
a 事件和 b 事件之间可以有其他事件(宽松近邻)。用简写形式可以直接写作: “a+ followedBy
b” 。在代码中定义 Pattern 如下:
Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
 @Override
 public boolean filter(Event value) throws Exception {
 return value.user.equals("a");
 }
}).oneOrMore()
.followedBy("b").where(new SimpleCondition<Event>() {
 @Override
 public boolean filter(Event value) throws Exception {
 return value.user.equals("b");
 }
});
我们如果输入事件序列“ a a a b ”——这里为了区分前后不同的 a 事件,可以记作“ a1 a2
a3 b ”——那么应该检测到 6 个匹配结果:( a1 a2 a3 b ),( a1 a2 b ),( a1 b ),( a2 a3 b ),( a2 b ),
a3 b )。如果在初始模式的量词 .oneOrMore() 后加上 .greedy() 定义为贪心匹配,那么结果就是:
a1 a2 a3 b ),( a2 a3 b ),( a3 b ),每个事件作为开头只会出现一次。
接下来我们讨论不同跳过策略对匹配结果的影响:
不跳过( NO_SKIP
代码调用 AfterMatchSkipStrategy.noSkip() 。这是默认策略,所有可能的匹配都会输出。所
以这里会输出完整的 6 个匹配。
跳至下一个( SKIP_TO_NEXT
代码调用 AfterMatchSkipStrategy.skipToNext() 。找到一个 a1 开始的最大匹配之后,跳过
a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最
终得到( a1 a2 a3 b ),( a2 a3 b ),( a3 b )。可以看到,这种跳过策略跟使用 .greedy() 效果是相同
的。
跳过所有子匹配( SKIP_PAST_LAST_EVENT
389 390
代码调用 AfterMatchSkipStrategy.skipPastLastEvent() 。找到 a1 开始的匹配( a1 a2 a3 b )之
后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到( a1 a2 a3
b ),这是最为精简的跳过策略。
跳至第一个( SKIP_TO_FIRST[a]
代码调用 AfterMatchSkipStrategy.skipToFirst(“a”) ,这里传入一个参数,指明跳至哪个模式
的第一个匹配事件。找到 a1 开始的匹配( a1 a2 a3 b )后,跳到以最开始一个 a (也就是 a1
为开始的匹配,相当于只留下 a1 开始的匹配。最终得到( a1 a2 a3 b ),( a1 a2 b ),( a1 b )。
跳至最后一个( SKIP_TO_LAST[a]
代码调用 AfterMatchSkipStrategy.skipToLast(“a”) ,同样传入一个参数,指明跳至哪个模式
的最后一个匹配事件。找到 a1 开始的匹配( a1 a2 a3 b )后,跳过所有 a1 a2 开始的匹配,跳
到以最后一个 a (也就是 a3 )为开始的匹配。最终得到( a1 a2 a3 b ),( a3 b )。
12.4 模式的检测处理
Pattern API Flink CEP 的核心,也是最复杂的一部分。不过利用 Pattern API 定义好模式
还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复
杂事件并定义处理转换的方法,最终得到想要的输出信息。
12.4.1 将模式应用到流上
将模式应用到事件流上的代码非常简单,只要调用 CEP 类的静态方法 .pattern() ,将数据
流( DataStream )和模式( Pattern )作为两个参数传入就可以了。最终得到的是一个 PatternStream
DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
这里的 DataStream ,也可以通过 keyBy 进行按键分区得到 KeyedStream ,接下来对复杂事
件的检测就会针对不同的 key 单独进行了。
模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时
间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时
间语义,那么所谓先后就是数据到达的顺序。对于时间戳相同或是同时到达的事件,我们还可
以在 CEP.pattern() 中传入一个比较器作为第三个参数,用来进行更精确的排序:
// 可选的事件比较器
EventComparator<Event> comparator = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。 391
12.4.2 处理匹配事件
基于 PatternStream 可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终
得到一个正常的 DataStream 。这个转换的过程与窗口的处理类似:将模式应用到流上得到
PatternStream ,就像在流上添加窗口分配器得到 WindowedStream ;而之后的转换操作,就像
定义具体处理操作的窗口函数,对收集到的数据进行分析计算,得到结果进行输出,最后回到
DataStream 的类型来。
PatternStream 的转换操作主要可以分成两种:简单便捷的选择提取( select )操作,和更
加通用、更加强大的处理( process )操作。与 DataStream 的转换类似,具体实现也是在调用
API 时传入一个函数类:选择操作传入的是一个 PatternSelectFunction ,处理操作传入的则是一
PatternProcessFunction
1. 匹配事件的选择提取( select
处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,
包装成想要的信息输出,这个操作就是“选择”( select )。
PatternSelectFunction
代码中基于 PatternStream 直接调用 .select() 方法,传入一个 PatternSelectFunction 作为参数。
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());
这 里 的 MyPatternSelectFunction PatternSelectFunction 的 一 个 具 体 实 现 。
PatternSelectFunction Flink CEP 提供的一个函数类接口,它会将检测到的匹配事件保存在一
Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的
每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存
Map 里的 value 就是一个事件的列表( List )。
下面是 MyPatternSelectFunction 的一个具体实现:
class MyPatternSelectFunction implements PatternSelectFunction<Event, String>{
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
Event startEvent = pattern.get("start").get(0);
Event middleEvent = pattern.get("middle").get(0);
return startEvent.toString() + " " + middleEvent.toString();
}
}
PatternSelectFunction 里需要实现一个 select() 方法,这个方法每当检测到一组匹配的复杂
事件时都会调用一次。它以保存了匹配复杂事件的 Map 作为输入,经自定义转换后得到输出
信息返回。这里我们假设之前定义的模式序列中,有名为“ start” “middle” 的两个个体模式,
于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map .get(key) 方法后得
到的是一个事件的 List ;如果个体模式是单例的,那么 List 中只有一个元素,直接调用 .get(0)
就可以把它取出。 当然,如果个体模式是循环的, List 中就有可能有多个元素了。例如我们在 12.3.2 小节中
对连续登录失败检测的改进,我们可以将匹配到的事件包装成 String 类型的报警信息输出,代
码如下:
// 1. 定义 Pattern,登录失败事件,循环检测 3 次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
 .<LoginEvent>begin("fails")
 .where(new SimpleCondition<LoginEvent>() {
 @Override
 public boolean filter(LoginEvent loginEvent) throws Exception {
 return loginEvent.eventType.equals("fail");
 }
 }).times(3).consecutive();
// 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream
 .select(new PatternSelectFunction<LoginEvent, String>() {
 @Override
 public String select(Map<String, List<LoginEvent>> map) throws 
Exception {
// 只有一个模式,匹配到了 3 个事件,放在 List 中
 LoginEvent first = map.get("fails").get(0);
 LoginEvent second = map.get("fails").get(1);
 LoginEvent third = map.get("fails").get(2);
 return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp 
+ ", " + second.timestamp + ", " + third.timestamp;
 }
 })
 .print("warning");
我们定义的模式序列中只有一个循环模式 fails ,它会将检测到的 3 个登录失败事件保存到
一个列表( List )中。所以第三步处理匹配的复杂事件时,我们从 map 中获取模式名 fails 对应
的事件,拿到的是一个 List ,从中按位置索引依次获取元素就可以得到匹配的三个登录失败事
件。
运行程序进行测试,会发现结果与之前完全一样。
PatternFlatSelectFunction
除此之外, PatternStream 还有一个类似的方法是 .flatSelect() ,传入的参数是一个
PatternFlatSelectFunction 。从名字上就能看出,这是 PatternSelectFunction 扁平化 版本;内
部需要实现一个 flatSelect() 方法,它与之前 select() 的不同就在于没有返回值,而是多了一个收
集器( Collector )参数 out ,通过调用 out.collet() 方法就可以实现多次发送输出数据了。
例如上面的代码可以写成:
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
@Override
public void flatSelect(Map<String, List<LoginEvent>> map,
Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + 
", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");
可见 PatternFlatSelectFunction 使用更加灵活,完全能够覆盖 PatternSelectFunction 的功能。
这跟 FlatMapFunction MapFunction 的区别是一样的。
2. 匹配事件的通用处理( process
1.8 版本之后, Flink CEP 引入了对于匹配事件的通用检测处理方式,那就是直接调用
PatternStream .process() 方法,传入一个 PatternProcessFunction 。这看起来就像是我们熟悉的
处理函数( process function ),它也可以访问一个上下文( Context ),进行更多的操作。
所以 PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就
成为了目前官方推荐的处理方式。事实上, PatternSelectFunction PatternFlatSelectFunction
CEP 内部执行时也会被转换成 PatternProcessFunction
我们可以使用 PatternProcessFunction 将之前的代码重写如下:
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, 
Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + 
", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");
可以看到, PatternProcessFunction 中必须实现一个 processMatch() 方法;这个方法与之前
flatSelect() 类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间
信息,比如事件的时间戳( timestamp )或者处理时间( processing time );还可以调用 .output()
方法将数据输出到侧输出流。侧输出流的功能是处理函数的一大特性,我们已经非常熟悉;而
CEP 中,侧输出流一般被用来处理超时事件,我们会在下一小节详细讨论。
393 394
12.4.3 处理超时事件
复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:
1 )如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的 Map 中;
2 )如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行
检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用
PatternProcessFunction processMatch() 方法进行处理;
3 )如果当前事件不符合模式匹配的条件,就丢弃该事件;
4 )如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么
当前已检测的一组部分匹配事件都被丢弃,重新开始检测。
不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用 .within() 指定了
模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种“超时失败”跟真
正的“匹配失败”不同,它其实是一种“部分成功匹配”;因为只有在开头能够正常匹配的前
提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或
报警信息。这就要求我们有能力捕获并处理超时事件。
1. 使用 PatternProcessFunction 的侧输出流
Flink CEP 中 , 提 供 了 一 个 专 门 捕 捉 超 时 的 部 分 匹 配 事 件 的 接 口 , 叫 作
TimedOutPartialMatchHandler 。这个接口需要实现一个 processTimedOutMatch() 方法,可以将
超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个
参数则是 PatternProcessFunction 的上下文 Context 。所以这个接口必须与 PatternProcessFunction
结合使用,对处理结果的输出则需要利用侧输出流来进行。
代码中的调用方式如下:
class MyPatternProcessFunction extends PatternProcessFunction<Event, String> 
implements TimedOutPartialMatchHandler<Event> {
 // 正常匹配事件的处理
@Override
 public void processMatch(Map<String, List<Event>> match, Context ctx, 
Collector<String> out) throws Exception{
 ...
 }
 // 超时部分匹配事件的处理
 @Override
 public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) 
throws Exception{
 Event startEvent = match.get("start").get(0);
 OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){};
 ctx.output(outputTag, startEvent);
 }
}
我们在 processTimedOutMatch() 方法中定义了一个输出标签( OutputTag )。调用 ctx.output()
方法,就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。
2. 使用 PatternTimeoutFunction
上文提到的 PatternProcessFunction 通过实现 TimedOutPartialMatchHandler 接口扩展出了处
理超时事件的能力,这是官方推荐的做法。此外, Flink CEP 中也保留了简化版的
PatternSelectFunction ,它无法直接处理超时事件,不过我们可以通过调用 PatternStream
.select() 方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。
PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()
方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的
时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。
由于调用 .select() 方法后会得到唯一的 DataStream ,所以正常匹配事件和超时事件的处理
结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的 DataStream ,而超
时事件的处理结果则会进入侧输出流;这个侧输出流需要另外传入一个侧输出标签( OutputTag
来指定。
所以最终我们在调用 PatternStream .select() 方法时需要传入三个参数:侧输出流标签
OutputTag ), 超 时 事 件 处 理 函 数 PatternTimeoutFunction , 匹 配 事 件 提 取 函 数
PatternSelectFunction 。下面是一个代码中的调用方式:
// 定义一个侧输出流标签,用于标识超时侧输出流
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};
// 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
SingleOutputStreamOperator<String> resultStream = patternStream
.select(timeoutTag,
// 超时部分匹配事件的处理
 new PatternTimeoutFunction<Event, String>() {
 @Override
 public String timeout(Map<String, List<Event>> pattern, long 
timeoutTimestamp) throws Exception {
 Event event = pattern.get("start").get(0);
 return "超时:" + event.toString();
 }
 },
// 正常匹配事件的处理
 new PatternSelectFunction<Event, String>() {
 @Override
 public String select(Map<String, List<Event>> pattern) throws Exception 
{
...
 }
 }
);
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");
这里需要注意的是,在超时事件处理的过程中,从 Map 里只能取到已经检测到匹配的那
些事件;如果取可能未匹配的事件并调用它的对象方法,则可能会报空指针异常
NullPointerException )。另外,超时事件处理的结果进入侧输出流,正常匹配事件的处理结
果进入主流,两者的数据类型可以不同。
3. 应用实例
接下来我们看一个具体的应用场景。
在电商平台中,最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明
用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,
用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订
单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15
分钟),如果下单后一段时间仍未支付,订单就会被取消。
首先定义出要处理的数据类型。我们面对的是订单事件,主要包括用户对订单的创建(下
单)和支付两种行为。因此可以定义 POJO OrderEvent 如下,其中属性字段包括用户 ID
订单 ID 、事件类型(操作类型)以及时间戳。
public class OrderEvent {
 public String userId;
 public String orderId;
 public String eventType;
 public Long timestamp;
 public OrderEvent() {
 }
 public OrderEvent(String userId, String orderId, String eventType, Long 
timestamp) {
 this.userId = userId;
 this.orderId = orderId;
 this.eventType = eventType;
 this.timestamp = timestamp;
 }
 @Override
 public String toString() {
 return "OrderEvent{" +
"userId='" + userId + '\'' +
 "orderId='" + orderId + '\'' +
 ", eventType='" + eventType + '\'' +
396
 ", timestamp=" + timestamp +
 '}';
 }
}
当前需求的重点在于对超时未支付的用户进行监控提醒,也就是需要检测有下单行为、但
15 分钟内没有支付行为的复杂事件。在下单和支付之间,可以有其他操作(比如对订单的修
改),所以两者之间是宽松近邻关系。可以定义 Pattern 如下:
Pattern<OrderEvent, ?> pattern = Pattern
 .<OrderEvent>begin("create") // 首先是下单事件
 .where(new SimpleCondition<OrderEvent>() {
 @Override
 public boolean filter(OrderEvent value) throws Exception {
 return value.eventType.equals("create");
 }
 })
 .followedBy("pay") // 之后是支付事件;中间可以修改订单,宽松近邻
 .where(new SimpleCondition<OrderEvent>() {
 @Override
 public boolean filter(OrderEvent value) throws Exception {
 return value.eventType.equals("pay");
 }
 })
 .within(Time.minutes(15)); // 限制在 15 分钟之内
很明显,我们重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单 ID
行分组,然后检测每个订单的“下单 - 支付”复杂事件,如果出现超时事件需要输出报警提示
信息。
整体代码实现如下:
import 
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
397
import java.util.List;
import java.util.Map;
public class OrderTimeoutDetect {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 获取订单事件流,并提取时间戳、生成水位线
 KeyedStream<OrderEvent, String> stream = env
 .fromElements(
 new OrderEvent("user_1", "order_1", "create", 1000L),
 new OrderEvent("user_2", "order_2", "create", 2000L),
 new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
 new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
 new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
 new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
 )
 .assignTimestampsAndWatermarks(
 WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
 .withTimestampAssigner(
 new 
SerializableTimestampAssigner<OrderEvent>() {
 @Override
 public long extractTimestamp(OrderEvent 
event, long l) {
 return event.timestamp;
 }
 }
 )
 )
 .keyBy(order -> order.orderId); // 按照订单 ID 分组
 // 1. 定义 Pattern
 Pattern<OrderEvent, ?> pattern = Pattern
 .<OrderEvent>begin("create") // 首先是下单事件
 .where(new SimpleCondition<OrderEvent>() {
 @Override
 public boolean filter(OrderEvent value) throws Exception {
 return value.eventType.equals("create");
 }
 })
 .followedBy("pay") // 之后是支付事件;中间可以修改订单,宽松近邻
 .where(new SimpleCondition<OrderEvent>() {
 @Override
398
 public boolean filter(OrderEvent value) throws Exception {
 return value.eventType.equals("pay");
 }
 })
 .within(Time.minutes(15)); // 限制在 15 分钟之内
 // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
 PatternStream<OrderEvent> patternStream = CEP.pattern(stream, pattern);
 // 3. 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
 SingleOutputStreamOperator<String> payedOrderStream = 
patternStream.process(new OrderPayPatternProcessFunction());
 // 将正常匹配和超时部分匹配的处理结果流打印输出
 payedOrderStream.print("payed");
 payedOrderStream.getSideOutput(timeoutTag).print("timeout");
 env.execute();
 }
// 实现自定义的 PatternProcessFunction,需实现 TimedOutPartialMatchHandler 接口
 public static class OrderPayPatternProcessFunction extends 
PatternProcessFunction<OrderEvent, String> implements 
TimedOutPartialMatchHandler<OrderEvent> {
 // 处理正常匹配事件
@Override
 public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, 
Collector<String> out) throws Exception {
 OrderEvent payEvent = match.get("pay").get(0);
 out.collect("订单 " + payEvent.orderId + " 已支付!");
 }
// 处理超时未支付事件
 @Override
 public void processTimedOutMatch(Map<String, List<OrderEvent>> match, 
Context ctx) throws Exception {
 OrderEvent createEvent = match.get("create").get(0);
 ctx.output(new OutputTag<String>("timeout"){}, "订单 " + 
createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);
 }
 }
}
运行代码,控制台打印结果如下:
payed> 订单 order_1 已支付!
payed> 订单 order_3 已支付!
timeout> 订单 order_2 超时未支付!用户为: user_2
分析测试数据可以很直观地发现,订单 1 和订单 3 都在 15 分钟进行了支付,订单 1 中间
的修改行为不会影响结果;而订单 2 未能支付,因此侧输出流输出了一条报警信息。且同一用
户可以下多个订单,最后的判断只是基于同一订单做出的。这与我们预期的效果完全一致。用
处理函数进行状态编程,结合定时器也可以实现同样的功能,但明显 CEP 的实现更加方便,
也更容易迁移和扩展。
12.4.4 处理迟到数据
CEP 主要处理的是先后发生的一组复杂事件,所以事件的顺序非常关键。前面已经说过,
事件先后顺序的具体定义与时间语义有关。如果是处理时间语义,那比较简单,只要按照数据
处理的系统时间算就可以了;而如果是事件时间语义,需要按照事件自身的时间戳来排序。这
就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象,也就是所谓的 乱序数据
迟到数据
Flink CEP 中沿用了通过设置水位线( watermark )延迟来处理乱序数据的做法。当一个
事件到来时,并不会立即做检测匹配处理,而是先放入一个缓冲区( buffer )。缓冲区内的数据,
会按照时间戳由小到大排序;当一个水位线到来时,就会将缓冲区中所有时间戳小于水位线的
事件依次取出,进行检测匹配。这样就保证了匹配事件的顺序和事件时间的进展一致,处理的
顺序就一定是正确的。这里水位线的延迟时间,也就是事件在缓冲区等待的最大时间。
这样又会带来另一个问题:水位线延迟时间不可能保证将所有乱序数据完美包括进来,总
会有一些事件延迟比较大,以至于等它到来的时候水位线早已超过了它的时间戳。这时之前的
数据都已处理完毕,这样的“迟到数据”就只能被直接丢弃了——这与窗口对迟到数据的默认
处理一致。
我们自然想到,如果不希望迟到数据丢掉,应该也可以借鉴窗口的做法。 Flink CEP 同样
提 供 了 将 迟 到 事 件 输 出 到 侧 输 出 流 的 方 式 : 我 们 可 以 基 于 PatternStream 直接调
.sideOutputLateData() 方法,传入一个 OutputTag ,将迟到数据放入侧输出流另行处理。代码
中调用方式如下:
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
 .sideOutputLateData(lateDataOutputTag) // 将迟到数据输出到侧输出流
 .select( 
// 处理正常匹配数据
 new PatternSelectFunction<Event, ComplexEvent>() {...}
 );
// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
可以看到,整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后,可以调
.getSideOutput() 方法来提取侧输出流,捕获迟到数据进行额外处理。
12.5 CEP 的状态机实现
Flink CEP 中对复杂事件的检测,关键在模式的定义。我们会发现 CEP 中模式的定义方式
比较复杂,而且与正则表达式非常相似:正则表达式在字符串上匹配符合模板的字符序列,而
Flink CEP 则是在事件流上匹配符合模式定义的复杂事件。
前面我们分析过 CEP 检测处理的流程,可以认为检测匹配事件的过程中会有“初始(没
有任何匹配)”“检测中(部分匹配成功)”“匹配成功”“匹配失败”等不同的“状态”。随着每
个事件的到来,都会改变当前检测的“状态”;而这种改变跟当前事件的特性有关、也跟当前
所处的状态有关。这样的系统,其实就是一个“状态机”( state machine )。这也正是正则表达
式底层引擎的实现原理。
所以 Flink CEP 的底层工作原理其实与正则表达式是一致的,是一个“非确定有限状态自
动机”( Nondeterministic Finite Automaton NFA )。 NFA 的原理涉及到较多数学知识,我们这
里不做详细展开,而是用一个具体的例子来说明一下状态机的工作方式,以更好地理解 CEP
的原理。
我们回顾一下 12.2.2 小节中的应用案例,检测用户连续三次登录失败的复杂事件。用 Flink
CEP 中的 Pattern API 可以很方便地把它定义出来;如果我们现在不用 CEP ,而是用 DataStream
API 和处理函数来实现,应该怎么做呢?
这需要设置状态,并根据输入的事件不断更新状态。当然因为这个需求不是很复杂,我们
也可以用嵌套的 if-else 条件判断将它实现,不过这样做的代码可读性和扩展性都会很差。更好
的方式,就是实现一个状态机。
        
如图 12-4 所示,即为状态转移的过程,从初始状态( INITIAL )出发,遇到一个类型为
fail 的登录失败事件,就开始进入部分匹配的状态;目前只有一个 fail 事件,我们把当前状态
记作 S1 。基于 S1 状态,如果继续遇到 fail 事件,那么就有两个 fail 事件,记作 S2 。基于 S2
状态如果再次遇到 fail 事件,那么就找到了一组匹配的复杂事件,把当前状态记作 Matched
就可以输出报警信息了。需要注意的是,报警完毕,需要立即重置状态回 S2 ;因为如果接下
来再遇到 fail 事件,就又满足了新的连续三次登录失败,需要再次报警。
而不论是初始状态,还是 S1 S2 状态,只要遇到类型为 success 的登录成功事件,就会
跳转到结束状态,记作 Terminal 。此时当前检测完毕,之前的部分匹配应该全部清空,所以需
要立即重置状态到 Initial ,重新开始下一轮检测。所以这里我们真正参与状态转移的,其实只
Initial S1 S2 三个状态, Matched Terminal 是为了方便我们做其他操作(比如输出报警、
清空状态)的“临时标记状态”,不等新事件到来马上就会跳转。
完整代码如下:
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.Serializable;
402
import static org.apache.flink.util.Preconditions.checkNotNull;
public class NFAExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // 获取登录事件流,这里与时间无关,就不生成水位线了
 KeyedStream<LoginEvent, String> stream = env.fromElements(
 new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
 new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
 new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
 new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
 new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
 new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
 new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
 )
 .keyBy(r -> r.userId);
 // 将数据依次输入状态机进行处理
 DataStream<String> alertStream = stream
 .flatMap(new StateMachineMapper());
 alertStream.print("warning");
 env.execute();
 }
 @SuppressWarnings("serial")
 public static class StateMachineMapper extends RichFlatMapFunction<LoginEvent, 
String> {
 // 声明当前用户对应的状态
 private ValueState<State> currentState;
 @Override
 public void open(Configuration conf) {
 // 获取状态对象
 currentState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("state", State.class));
 }
 @Override
 public void flatMap(LoginEvent event, Collector<String> out) throws 
Exception {
403
 // 获取状态,如果状态为空,置为初始状态
 State state = currentState.value();
 if (state == null) {
 state = State.Initial;
 }
 // 基于当前状态,输入当前事件时跳转到下一状态
 State nextState = state.transition(event.eventType);
 if (nextState == State.Matched) {
 // 如果检测到匹配的复杂事件,输出报警信息
 out.collect(event.userId + " 连续三次登录失败");
 // 需要跳转回 S2 状态,这里直接不更新状态就可以了
 }
 else if (nextState == State.Terminal) {
 // 如果到了终止状态,就重置状态,准备重新开始
 currentState.update(State.Initial);
 } else {
 // 如果还没结束,更新状态(状态跳转),继续读取事件
 currentState.update(nextState);
 }
 }
 }
 // 状态机实现
 public enum State {
 Terminal, // 匹配失败,当前匹配终止
 Matched, // 匹配成功
 // S2 状态
 S2(new Transition("fail", Matched), new Transition("success", Terminal)),
 // S1 状态
 S1(new Transition("fail", S2), new Transition("success", Terminal)),
 // 初始状态
 Initial(new Transition("fail", S1), new Transition("success", Terminal));
 private final Transition[] transitions; // 状态转移规则
 // 状态的构造方法,可以传入一组状态转移规则来定义状态
 State(Transition... transitions) {
 this.transitions = transitions;
 }
404
 // 状态的转移方法,根据当前输入事件类型,从定义好的转移规则中找到下一个状态
 public State transition(String eventType) {
 for (Transition t : transitions) {
 if (t.getEventType().equals(eventType)) {
 return t.getTargetState();
 }
 }
 // 如果没有找到转移规则,说明已经结束,回到初始状态
 return Initial;
 }
 }
 // 定义状态转移类,包括两个属性:当前事件类型和目标状态
 public static class Transition implements Serializable {
 private static final long serialVersionUID = 1L;
 // 触发状态转移的当前事件类型
 private final String eventType;
 // 转移的目标状态
 private final State targetState;
 public Transition(String eventType, State targetState) {
 this.eventType = checkNotNull(eventType);
 this.targetState = checkNotNull(targetState);
 }
 public String getEventType() {
 return eventType;
 }
 public State getTargetState() {
 return targetState;
 }
 }
}
运行代码,可以看到输出与之前 CEP 的实现是完全一样的。显然,如果所有的复杂事件
处理都自己设计状态机来实现是非常繁琐的,而且中间逻辑非常容易出错;所以 Flink CEP
底层 NFA 全部实现好并封装起来,这样我们处理复杂事件时只要调上层的 Pattern API 就可以,
无疑大大降低了代码的复杂度,提高了编程的效率。
405 12.6 本章总结
Flink CEP Flink 对复杂事件处理提供的强大而高效的应用库。本章中我们从一个简单
的应用实例出发,详细讲解了 CEP 的核心内容—— Pattern API 和模式的检测处理,并以案例
说明了对超时事件和迟到数据的处理。最后进行了深度扩展,举例讲解了 CEP 的状态机实现,
这部分大家可以只做原理了解,不要求完全实现状态机的代码。
CEP 在实际生产中有非常广泛的应用。对于大数据分析而言,应用场景主要可以分为统
计分析和逻辑分析。企业的报表统计、商业决策都离不开统计分析,这部分需求在目前企业的
分析指标中占了很大的比重,实时的流数据统计可以通过 Flink SQL 方便地实现;而逻辑分析
可以进一步细分为风险控制、数据挖掘、用户画像、精准推荐等各个应用场景,如今对实时性
要求也越来越高, Flink CEP 就可以作为对流数据进行逻辑分析、进行实时风控和推荐的有力
工具。
所以 DataStream API 和处理函数是 Flink 应用的基石,而 SQL CEP 就是 Flink 大厦顶层
扩展的两大工具。 Flink SQL 也提供了与 CEP 相结合的“模式识别”( Pattern Recognition )语
句—— MATCH_RECOGNIZE ,可以支持在 SQL 语句中进行复杂事件处理。尽管目前还不完
善,不过相信随着 Flink 的进一步发展, Flink SQL CEP 将对程序员更加友好,功能也将更
加强大,全方位实现大数据实时流处理的各种应用需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/782380.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android 简单快速实现 下弧形刻度尺(滑动事件)

效果图&#xff1a; 直接上代码&#xff1a; package com.my.view;import android.content.Context; import android.graphics.Bitmap; import android.graphics.BitmapFactory; import android.graphics.Canvas; import android.graphics.Color; import android.graphics.Pai…

iptables实现端口转发ssh

iptables实现端口转发 实现使用防火墙9898端口访问内网front主机的22端口&#xff08;ssh连接&#xff09; 1. 防火墙配置(lb01) # 配置iptables # 这条命令的作用是将所有目的地为192.168.100.155且目标端口为19898的TCP数据包的目标IP地址改为10.0.0.148&#xff0c;并将目标…

基于Java+SpringMvc+Vue技术的在线学习交流平台的设计与实现---60页论文参考

博主介绍&#xff1a;硕士研究生&#xff0c;专注于Java技术领域开发与管理&#xff0c;以及毕业项目实战✌ 从事基于java BS架构、CS架构、c/c 编程工作近16年&#xff0c;拥有近12年的管理工作经验&#xff0c;拥有较丰富的技术架构思想、较扎实的技术功底和资深的项目管理经…

【PB案例学习笔记】-29制作一个调用帮助文档的小功能

写在前面 这是PB案例学习笔记系列文章的第29篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

【Spring Boot】关系映射开发(三):多对多映射

《JPA 从入门到精通》系列包含以下文章&#xff1a; Java 持久层 API&#xff1a;JPA认识 JPA 的接口JPA 的查询方式基于 JPA 开发的文章管理系统&#xff08;CRUD&#xff09;关系映射开发&#xff08;一&#xff09;&#xff1a;一对一映射关系映射开发&#xff08;二&#…

Java_网络编程

网络通信的关键三要素 IP、端口号、协议 IP地址 IP地址&#xff08;Internet Protocol&#xff09;&#xff1a;全程“互联网协议地址”&#xff0c;是分配给上网设备的唯一标志。 IP地址有两种形式&#xff1a;IPv4、IPv6 InetAddress 代表IP地址 InetAddress 的常用方法…

【算法训练记录——Day42】

Day42——动态规划Ⅳ 1.leetcode_1049最后一块石头的重量II2.leetcode_494目标和3.leetcode_474一和零 1.leetcode_1049最后一块石头的重量II 思路&#xff1a;石头只能用一次。。。怎么才能让碰撞后重量最小呢&#xff0c;还要转换成动态规划&#xff0c;难以理解。。 看题解&…

J024_打印电影的全部信息

一、需求描述 展示多部电影的信息。 电影信息包括&#xff1a;电影名称、电影得分、电影票价格。 二、代码实现 2.1 Movie类 package com.itheima.collection;public class Movie {//电影名称private String name;//电影得分private int score;//电影票价格private double…

【Excel】输入内容自动添加边框线

1. 选中表格区域 → 新建条件规则 2. 设置公式 3. 设置格式 测试生效

[吃瓜教程]南瓜书第6章支持向量机

0.补充知识 0.1 超平面 定义&#xff1a; 超平面是指在&#x1d45b;维空间中&#xff0c;维度为 &#x1d45b;−1的子空间。它是分割空间的一个平面。 性质&#xff1a; n维空间的超平面 ( w T x b 0 , 其中 w , x ∈ R n ) (w^Tx_b0,其中w,x\in \mathbb R^n) (wTxb​0,其…

C++的set / multiset容器

一、介绍 C的set容器又被称为集合&#xff0c;所有元素在被插入后都会自动排序。 二、数据结构 set / multiset属于关联式容器&#xff0c;底层数据结构是用二叉树实现的。 其余的容器比如vector、deque和list等为序列式容器&#xff0c;因为他们底层使用线性序列结构&#xf…

Windows环境安装Redis和Redis Desktop Manager图文详解教程

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl Redis概述 Redis是一个开源的高性能键值对数据库&#xff0c;以其卓越的读写速度而著称&#xff0c;广泛用于数据库、缓存和消息代理。它主要将数据存储在内存中&#xff0…

CISC和RISC指令集

文章目录 1. 指令集 2. CISC&#xff08;复杂指令集计算&#xff09; 3. RISC&#xff08;精简指令集计算&#xff09; 4. RISC的设计初衷 5. CISC和RISC流程对比 CISC&#xff08;复杂指令集计算&#xff09;的实现 RISC&#xff08;精简指令集计算&#xff09;的实现 …

【高中数学之函数】四种幂函数图线(二次、三次、开方、开立方)

【图像】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>UNASSIGNED</title><style type"text/css">.c…

【智能算法应用】灰狼算法求解二维栅格路径规划问题

目录 1.算法原理2.二维路径规划数学模型3.结果展示4.参考文献5.代码获取 1.算法原理 【智能算法】灰狼算法&#xff08;GWO&#xff09;原理及实现 2.二维路径规划数学模型 栅格法模型最早由 W.E. Howden 于 1968 年提出&#xff0c;障碍物的栅格用黑色表示&#xff0c;可通…

基于pi控制的数字锁相环simulink建模与仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 (完整程序运行后无水印) 2.算法运行软件版本 matlab2022a 3.部分核心程序 &#xff08;完整版代码包含详细中文注释和操作步骤视频&#xff09…

基于MATLAB的PEF湍流风场生成器模拟与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于MATLAB的PEF湍流风场生成器模拟与仿真。PEF&#xff08;Primitive Equations Formulation&#xff09;湍流风场模型&#xff0c;是大气科学和气象学中用来描述大气流动和气…

咬文嚼字:词元是当今生成式人工智能失败的一个重要原因

生成式人工智能模型处理文本的方式与人类不同。了解它们基于"标记"的内部环境可能有助于解释它们的一些奇怪行为和顽固的局限性。从 Gemma 这样的小型设备上模型到 OpenAI 业界领先的 GPT-4o 模型&#xff0c;大多数模型都建立在一种称为转换器的架构上。由于转换器在…

subset使用

在R语言中&#xff0c;subset()函数用于从数据框中选择满足特定条件的观测。其语法如下&#xff1a; subset(x, subset, select, drop FALSE) 参数说明&#xff1a; x&#xff1a;数据框或矩阵。 subset&#xff1a;逻辑条件&#xff0c;用于筛选满足特定条件的行。 select…

Linux Bridge - Part 2

概览 在前一篇文章中&#xff0c;我描述了Linux 网桥&#xff08;bridge&#xff09;的配置&#xff0c;并展示了一个实验&#xff0c;其中使用Wireshark来分析流量。在本文中&#xff0c;我将讨论当创建一个网桥时会发生什么&#xff0c;以及Linux 网桥&#xff08;bridge&am…