目录
1.Flink CEP 原理
2.Flink API开发
2.1 模式 pattern
2.2 模式 pattern属性
2.3 模式间的关系
1.Flink CEP 原理
Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。
- take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
- ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
- proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。
2.Flink API开发
CEP程序开发主要分为两部分:定义事件pattern和匹配结果处理。
官方demo:
DataStream<Event> input = ...
//定义一个模式
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
//定义一个take操作,先匹配Id = 42的事件
.where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
)
//接下来的模式
.next("middle").subtype(SubEvent.class)
// 接下来匹配volume > 10的事件
.where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
)
// 最后匹配name = "end"的事件
.followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
// 对input流绑定上面定义好的时间pattern
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 获取匹配的事件流
DataStream<Alert> result = patternStream.select(
new PatternProcessFunction<Event, Alert>() {
@Override
public void select(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
// pattern中的key是上面状态节点的名字,value是匹配的事件(可能匹配多次)
out.collect(createAlertFrom(pattern));
}
});
上图中,蓝色方框代表的是一个个单独的模式;浅黄色的椭圆代表的是这个模式上可以添加的属性,包括模式可以发生的循环次数,或者这个模式是贪婪的还是可选的;橘色的椭圆代表的是模式间的关系,定义了多个模式之间是怎么样串联起来的。通过定义模式,添加相应的属性,将多个模式串联起来三步,就可以构成了一个完整的Flink CEP程序。
2.1 模式 pattern
定义一个pattern需要包括:
start:模式名称
where:模式的内容
filter:核心处理逻辑
正如上面例子中的:
//名称
Pattern.<Event>begin("start")
//内容
.where(
new SimpleCondition<Event>() {
//核心逻辑
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
);
2.2 模式 pattern属性
模式的属性主要分为循环属性和可选属性。
循环属性可以定义模式匹配发生固定次数(times),匹配发生一次以上(oneOrMore),匹配发生多次以上。(timesOrMore)。
可选属性可以设置模式是贪婪的(greedy),即匹配最长的串,或设置为可选的(optional),有则匹配,无则忽略。
另外,由于模式的匹配事件存放在状态中进行管理,所以需要设置一个全局的有效期(within)。
2.3 模式间的关系
主要分为三种:严格连续性(next/notNext),宽松连续性(followedBy/notFollowedBy),和非确定宽松连续性(followedByAny)。
- 严格连续性:需要消息的顺序到达与模式完全一致。
- 宽松连续性:允许忽略不匹配的事件。
- 非确定宽松连性:不仅可以忽略不匹配的事件,也可以忽略已经匹配的事件。
下一篇实战 CEP pattern 动态更新