线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。
1.实现分析
- 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
- 动态更新:需要提供定时去检测规则是否变更
- 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
- API:需要对外提供易用的API
2.代码实现
首先实现一个用户API。
package cep.functions;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import cep.pattern.Pattern;
/**
* @author StephenYou
* Created on 2023-07-23
* Description: 动态Pattern接口(用户调用API)不区分key
*/
public interface DynamicPatternFunction<T> extends Function, Serializable {
/***
* 初始化
* @throws Exception
*/
public void init() throws Exception;
/**
* 注入新的pattern
* @return
*/
public Pattern<T,T> inject() throws Exception;
/**
* 一个扫描周期:ms
* @return
*/
public long getPeriod() throws Exception;
/**
* 规则是否发生变更
* @return
*/
public boolean isChanged() throws Exception;
}
希望上述API的调用方式如下。
//正常调用
CEP.pattern(dataStream,pattern);
//动态Pattern
CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())
所以需要修改CEP-Lib源码
b.增加injectionPattern函数。
public class CEP {
/***
* Dynamic injection pattern function
* @param input
* @param dynamicPatternFunction
* @return
* @param <T>
*/
public static <T> PatternStream<T> injectionPattern throws Exception (
DataStream<T> input,
DynamicPatternFunction<T> dynamicPatternFunction){
return new PatternStream<>(input, dynamicPatternFunction);
}
}
增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。
public class PatternStream<T> {
PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
}
}
修改PatternStreamBuilder.build, 增加调用函数的过程。
final CepOperator<IN, K, OUT> operator = null;
if (patternFunction == null ) {
operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy(),
processFunction,
lateDataOutputTag);
} else {
operator = new CepOperator<>(
inputSerializer,
isProcessingTime,
patternFunction,
comparator,
null,
processFunction,
lateDataOutputTag);
}
增加对应的CepOperator构造函数。
public CepOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
final DynamicPatternFunction patternFunction,
@Nullable final EventComparator<IN> comparator,
@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
final PatternProcessFunction<IN, OUT> function,
@Nullable final OutputTag<IN> lateDataOutputTag) {
super(function);
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
this.patternFunction = patternFunction;
this.isProcessingTime = isProcessingTime;
this.comparator = comparator;
this.lateDataOutputTag = lateDataOutputTag;
if (afterMatchSkipStrategy == null) {
this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
} else {
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
}
this.nfaFactory = null;
}
加载Pattern,构造NFA
@Override
public void open() throws Exception {
super.open();
timerService =
getInternalTimerService(
"watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
//初始化
if (patternFunction != null) {
patternFunction.init();
Pattern pattern = patternFunction.inject();
afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
long period = patternFunction.getPeriod();
// 注册定时器检测规则是否变更
if (period > 0) {
getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
}
}
nfa = nfaFactory.createNFA();
nfa.open(cepRuntimeContext, new Configuration());
context = new ContextFunctionImpl();
collector = new TimestampedCollector<>(output);
cepTimerService = new TimerServiceImpl();
// metrics
this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
}
状态清理一共分为两块: 匹配状态数据清理、定时器清理;
进行状态清理:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
if (patternFunction != null) {
// 规则版本更新
if (needRefresh.value() < refreshVersion.get()) {
//清除状态
computationStates.clear();
elementQueueState.clear();
partialMatches.releaseCacheStatisticsTimer();
//清除定时器
Iterable<Long> registerTime = registerTimeState.get();
if (registerTime != null) {
Iterator<Long> iterator = registerTime.iterator();
while (iterator.hasNext()) {
Long l = iterator.next();
//删除定时器
timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
//状态清理
iterator.remove();
}
}
//更新当前的版本
needRefresh.update(refreshVersion.get());
}
}
}
上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
//初始化状态
if (patternFunction != null) {
/**
* 两个标识位状态
*/
refreshFlagState = context.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
if (context.isRestored()) {
if (refreshFlagState.get().iterator().hasNext()) {
refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
}
} else {
refreshVersion = new AtomicInteger(0);
}
needRefresh = context.getKeyedStateStore()
.getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
}
}
3.测试验证
设置每10s变更一次Pattern。
PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
@Override
public Map select(Map map) throws Exception {
map.put("processingTime", System.currentTimeMillis());
return map;
}
}).print();
env.execute("SyCep");
}
public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {
public TestDynamicPatternFunction() {
this.flag = true;
}
boolean flag;
int time = 0;
@Override
public void init() throws Exception {
flag = true;
}
@Override
public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
throws Exception {
// 2种pattern
if (flag) {
Pattern pattern = Pattern
.<Tuple3<String, Long, String>>begin("start")
.where(new IterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value,
Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f2.equals("success");
}
})
.times(1)
.followedBy("middle")
.where(new IterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value,
Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f2.equals("fail");
}
})
.times(1)
.next("end");
return pattern;
} else {
Pattern pattern = Pattern
.<Tuple3<String, Long, String>>begin("start2")
.where(new IterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value,
Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f2.equals("success2");
}
})
.times(2)
.next("middle2")
.where(new IterativeCondition<Tuple3<String, Long, String>>() {
@Override
public boolean filter(Tuple3<String, Long, String> value,
Context<Tuple3<String, Long, String>> ctx) throws Exception {
return value.f2.equals("fail2");
}
})
.times(2)
.next("end2");
return pattern;
}
}
@Override
public long getPeriod() throws Exception {
return 10000;
}
@Override
public boolean isChanged() throws Exception {
flag = !flag ;
time += getPeriod();
System.out.println("change pattern : " + time);
return true;
}
}
打印结果:符合预期
4.源码地址
感觉有用的话,帮忙点个小星星。^_^
GitHub - StephenYou520/SyCep: CEP 动态Pattern