本系列包含:
- 【大数据】Flink 详解(一):基础篇
- 【大数据】Flink 详解(二):核心篇 Ⅰ
- 【大数据】Flink 详解(三):核心篇 Ⅱ
- 【大数据】Flink 详解(四):核心篇 Ⅲ
- 【大数据】Flink 详解(五):核心篇 Ⅳ
- 【大数据】Flink 详解(六):源码篇 Ⅰ
- 【大数据】Flink 详解(七):源码篇 Ⅱ
- 【大数据】Flink 详解(八):SQL 篇 Ⅰ
- 【大数据】Flink 详解(九):SQL 篇 Ⅱ
Flink 详解(九):SQL 篇 Ⅱ
- 96.源码中分区提交的 PartitionCommitTrigger 介绍一下?
- 97.PartitionTimeCommitTigger 是如何知道该提交哪些分区的呢?(源码分析)
- 98.如何保证已经写入分区的数据对下游可见的标志问题(源码分析)
- 99.Flink SQL CEP 有没有接触过?
- 100.Flink SQL CEP 了解的参数介绍一下?
- 1️⃣ after match skip past last row
- 2️⃣ after match skip to next row
- 3️⃣ after match skip to last patternItem
- 4️⃣ after match skip to first patternItem
- 101.编写一个 CEP SQL 案例,如银行卡盗刷
96.源码中分区提交的 PartitionCommitTrigger 介绍一下?
在源码中,PartitionCommitTrigger 类图如下所示:
该类中维护了两对必要的信息:
pendingPartitions
/pendingPartitionsState
:等待提交的分区 以及 对应的状态。watermarks
/watermarksState
:watermarks(用 TreeMap 存储以保证有序)以及 对应的状态。
97.PartitionTimeCommitTigger 是如何知道该提交哪些分区的呢?(源码分析)
1️⃣ 检查 checkpoint ID
是否合法。
2️⃣ 取出当前 checkpoint ID
对应的水印,并调用 TreeMap 的 headMap()
和 clear()
方法删掉早于当前 checkpoint ID
的水印数据(没用了)。
3️⃣ 遍历等待提交的分区,调用之前定义的 PartitionTimeExtractor。比如:${year}-${month}-${day} ${hour}:00:00
,抽取分区时间。如果 watermark > partition-time + delay
,说明可以提交,并返回它们。
98.如何保证已经写入分区的数据对下游可见的标志问题(源码分析)
在源码中,主要涉及 PartitionCommitPolicy 类,如下图所示:
99.Flink SQL CEP 有没有接触过?
CEP
(Complex Event Processing
):复杂事件处理,用于识别输入流中符合指定规则的事件,并按照指定方式输出。
- 起床 ➡ 洗漱 ➡ 吃饭 ➡ 上班,一系列串联起来的事件流形成的模式。
- 浏览商品 ➡ 加入购物车 ➡ 创建订单 ➡ 支付完成 ➡ 发货 ➡ 收货,事件流形成的模式。
通过概念可以了解,CEP 主要是 识别输入流中用户指定的一些基本规则的事件,然后将这些事件再通过指定方式输出。
如下图所示: 我们指定 “方块、圆” 为基本规则的事件,在输入的原始流中,将这些事件作为一个结果流输出来。
- 用户异常检测:我们指定异常操作事件为要输出的结果流。
- 策略营销:指定符合要求的事件为结果流。
- 运维监控:指定一定范围的指标为结果流。
- 银行卡盗刷:指定同一时刻在两个地方被刷两次为异常结果流。
Flink CEP SQL 语法是通过 SQL 方式进行复杂事件处理,但是与 Flink SQL 语法也不太相同,其中包含许多规则。
100.Flink SQL CEP 了解的参数介绍一下?
CEP 包含的参数如下:
输出模式(每个找到的匹配项应该输出多少行)
one row per match
:每次检测到完整的匹配后进行汇总输出。all rows per match
(Flink 暂不支持):检测到完整的匹配后会把匹配过程中每条具体记录进行输出。
running
VS final
语义
- 在计算中使用那些匹配的事件:
running
匹配中,final
匹配结束。 define
语句中只可以使用running
,measure
语句中两者都可以。- 输出结果区别
- 对于
one row per match
,输出没区别。 - 对于
all rows per match
,输出不同。
- 对于
匹配后跳转模式介绍
after match
(匹配后,从哪里开始重新匹配)
skip to next row
:从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。skip past last row
:从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。skip to first patternItem
:从匹配成功的事件序列中第一个对应于patternItem
的事件开始进行下一次匹配。skip to last patternItem
:从匹配成功的事件序列中最后一个对应于patternItem
的事件开始进行下一次匹配。
注意:使用 skip to first / last patternItem
容易出现循环匹配问题,需要慎重。
针对上面的匹配后跳转模式分别介绍:
1️⃣ after match skip past last row
2️⃣ after match skip to next row
3️⃣ after match skip to last patternItem
4️⃣ after match skip to first patternItem
101.编写一个 CEP SQL 案例,如银行卡盗刷
通过 Flink CEP SQL 写的关于金融场景银行卡盗刷案例。
案例介绍:在金融场景中,有时会出现银行卡盗刷现象,犯罪分子利用互联网等技术,在间隔 10 分钟或者更短时间内,使一张银行卡在不同的两个地方出现多次刷卡记录,这从常规操作来说,在间隔时间很多的情况下,用户是无法同时在两个城市进行刷卡交易的,所以出现这种问题,就需要后台做出触发报警机制。
要求:当相同的 cardId
在十分钟内,从两个不同的 Location 发生刷卡现象,触发报警机制,以便检测信用卡盗刷现象。
(1)编写 CEP SQL 时,包含许多技巧,首先我们编写最基础的查询语句,从一张表中查询需要的字段。
select starttime,endtime,cardId,event from dataStream
(2)match_recognize();
- 该字段是 CEP SQL 的前提条件,用于生成一个追加表,所有的 CEP SQL 都是书写在这里面。
(3)分区,排序
- 由于是对同一 ID,所以需要使用
partition by
,还要根据时间进行排序order by
。
(4)理解 CEP SQL 核心的编写顺序,如上图标的顺序
- 1️⃣ CEP SQL 的类为 Pattern,检测在 10 分钟内两个地方出现刷卡现象,所以定义两个事件。
Pattern (e1 e2+) within interval '10' minute
- 2️⃣ 定义在 Pattern 中要求的判断语句,规定使用
define
。
define
e1 as a1.action = ''
e2 as e2.action = '' and e2.location <> e1.location
- 3️⃣ 根据上述的输入条件构建输出条件,规定使用
measures
。
measures
e2.action as event
e1.timestamp as starttime
last(e2.timestamp) as endtime
- 4️⃣ 输出条件匹配成功,输出一条,规定写法(这块根据不同的规则写不同的语句)。
one row per match
- 5️⃣ 匹配后跳转跳转到下一行(根据不同规则写不同语句)。
after match skip to next row
根据核心编写顺序进行理解,然后在按照书写正确的顺序进行编写。