Flink CEP(三)pattern动态更新

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

import cep.pattern.Pattern;

/**
 * @author StephenYou
 * Created on 2023-07-23
 * Description: 动态Pattern接口(用户调用API)不区分key
 */
public interface DynamicPatternFunction<T> extends Function, Serializable {

    /***
     * 初始化
     * @throws Exception
     */
    public void init() throws Exception;

    /**
     * 注入新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用

CEP.pattern(dataStream,pattern);

//动态Pattern

CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {
    /***
     * Dynamic injection pattern function 
     * @param input
     * @param dynamicPatternFunction
     * @return
     * @param <T>
     */
    public static <T> PatternStream<T> injectionPattern throws Exception (
            DataStream<T> input,
            DynamicPatternFunction<T> dynamicPatternFunction){
        return new PatternStream<>(input, dynamicPatternFunction); 
    }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {
    PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
        this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
    }
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;
        if (patternFunction == null ) {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
        } else {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    patternFunction,
                    comparator,
                    null,
                    processFunction,
                    lateDataOutputTag);
        }

        增加对应的CepOperator构造函数。

    public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final DynamicPatternFunction patternFunction,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.patternFunction = patternFunction;
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
        this.nfaFactory = null;
    }

        加载Pattern,构造NFA

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);

        //初始化
        if (patternFunction != null) {
            patternFunction.init();
            Pattern pattern = patternFunction.inject();
            afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period = patternFunction.getPeriod();
            // 注册定时器检测规则是否变更
            if (period > 0) {
                getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
            }
        }


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        if (patternFunction != null) {
            // 规则版本更新
            if (needRefresh.value() < refreshVersion.get()) {
                //清除状态
                computationStates.clear();
                elementQueueState.clear();
                partialMatches.releaseCacheStatisticsTimer();
                //清除定时器
                Iterable<Long> registerTime = registerTimeState.get();
                if (registerTime != null) {
                    Iterator<Long> iterator = registerTime.iterator();
                    while (iterator.hasNext()) {
                        Long l = iterator.next();
                        //删除定时器
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                        //状态清理
                        iterator.remove();
                    }
                }
                //更新当前的版本
                needRefresh.update(refreshVersion.get());
            }
        }
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        //初始化状态
        if (patternFunction != null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                    .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
        }
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
        patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
            @Override
            public Map select(Map map) throws Exception {
                map.put("processingTime", System.currentTimeMillis());
                return map;
            }
        }).print();
        env.execute("SyCep");

    }

    public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {

        public TestDynamicPatternFunction() {
            this.flag = true;
        }

        boolean flag;
        int time = 0;
        @Override
        public void init() throws Exception {
            flag = true;
        }

        @Override
        public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
                            throws Exception {

            // 2种pattern
            if (flag) {
                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success");
                            }
                        })
                        .times(1)
                        .followedBy("middle")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail");
                            }
                        })
                        .times(1)
                        .next("end");
                return pattern;
            } else {

                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success2");
                            }
                        })
                        .times(2)
                        .next("middle2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail2");
                            }
                        })
                        .times(2)
                        .next("end2");
                return pattern;
            }
        }

        @Override
        public long getPeriod() throws Exception {
            return 10000;
        }

        @Override
        public boolean isChanged() throws Exception {
            flag = !flag ;
            time += getPeriod();
            System.out.println("change pattern : " + time);
            return true;
        }
    }

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/62278.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue-virtual-scroller的使用,展示巨量数据,长列表优化,虚拟列表

一、原理 计算显示区域的高度&#xff08;或宽度&#xff09; 和显示区域的起始位置&#xff08;scrollTop或scrollLeft&#xff09;根据每个元素的尺寸和总数目&#xff0c;计算出整个列表的高度&#xff08;或宽度&#xff09;显示区域的高度&#xff08;或宽度&#xff09…

基于Orangepi 3 lts 的云台相机

利用orangepi 3 lts 和arduino nano 制作了一个云台相机&#xff0c;可用于室内监控。 硬件&#xff1a; orangepi 3 ,arduino nano ,usb相机&#xff0c;180度舵机两个 WeChat_20230806213004 软件&#xff1a; 整体采用mqtt进行消息的中转。 相机采用python 利用opencv…

LeetCode每日一题Day5——21. 合并两个有序链表

✨博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;算法修炼之练气篇&#xff08;C\C版&#xff09; &#x1f353;专栏&#xff1a;算法修炼之筑基篇&#xff08;C\C版&#xff09; &#x1f433;专栏&#xff1a;算法修炼之练气篇&#xff08;Python版&#xff09; …

第三章 图论 No.2单源最短路之虚拟源点,状压最短路与最短路次短路条数

文章目录 1137. 选择最佳线路1131. 拯救大兵瑞恩1134. 最短路计数383. 观光 dp是特殊的最短路&#xff0c;是无环图&#xff08;拓扑图&#xff09;上的最短路问题 1137. 选择最佳线路 1137. 选择最佳线路 - AcWing题库 // 反向建图就行 #include <iostream> #include…

C++ 类型兼容规则

类型兼容规则是指在需要基类对象的任何地方&#xff0c;都可以使用公有派生类的对象来替代。 通过公有继承&#xff0c;派生类得到了基类中除构造函数和析构函数之外的所有成员。这样&#xff0c;公有派生类实际就具备了基类的所有功能&#xff0c;凡是基类能解决的问题&#x…

vcomp100.dll丢失怎样修复?总结三个修复方法

在使用Windows系统的电脑的过程中&#xff0c;我们有时会遇到一些错误提示&#xff0c;其中之一就是关于vcomp100.dll文件缺失或损坏的问题。我第一次看到这个错误提示时&#xff0c;我并不知道vcomp100.dll是什么文件&#xff0c;也不了解它在电脑中的作用。我猜测它可能是一个…

​币安或面临「美司法部」欺诈指控

作者&#xff1a;维特根斯坦他弟 美国媒体semafor独家报道&#xff0c;知情人士透露&#xff0c;美国司法部正计划对币安提出欺诈指控&#xff0c;但又担心消费者会为此付出的巨大代价。 知情人士表示&#xff0c;联邦检察官担心他们起诉币安&#xff0c;可能会引发该交易所发生…

linux服务器之-nethogs命令

文章目录 NetHogs 工具安装安装依赖包安装epel源安装Nethogs 使用 NetHogs 工具 NetHogs是一个小型的net top工具&#xff0c;不像大多数工具那样拖慢每个协议或者是每个子网的速度而是依照进程进行带宽分组。 安装 安装依赖包 yum install libpcap libpcap-devel epel-rel…

企业架构NOSQL数据库之MongoDB

目录 一、背景描述及其方案设计 (一)业务背景描述 &#xff08;二&#xff09;模拟运维设计方案 二、Mongodb介绍 &#xff08;一&#xff09;nosql介绍 &#xff08;二&#xff09;产品特点 1、存储性 2、 效率性 3、结构 三、安装和配置 &#xff08;一&#xff09…

Statefulset 实战 3

上一部分我们说到如何使用 Statefulset 部署有状态的应用&#xff0c;Statefulset 可以做到部署的 每一个 pod 能够独立的拥有一个持久卷声明和持久卷 之前我们 用 Statefulset 和 ReplicaSet 对比&#xff0c;自然他们是有相似之处和不同之处&#xff0c;不同之处前面的文章已…

【嵌入式环境下linux内核及驱动学习笔记-(18)LCD驱动框架1-LCD控制原理】

目录 1、LCD显示系统介绍1.1 LCD显示基本原理1.1.1 颜色的显示原理&#xff1a;1.1.2 图像的构成 1.2 LCD接口介绍1.2.1 驱动接口 - MCU接口1.2.2 驱动接口 - RGB接口1.2.3 驱动接口 - LVDS接口1.2.4 驱动接口 - MIPI接口1.2.5 RGB / MIPI / LVDS三种接口方式的区别&#xff1a…

【雕爷学编程】MicroPython动手做(30)——物联网之Blynk

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

《MySQL高级篇》十五、其他数据库日志

文章目录 1. MySQL支持的日志1.1 日志类型1.2 日志的弊端 2. 慢查询日志(slow query log)3. 通用查询日志3.1 问题场景3.2 查看当前状态3.3 启动日志3.4 查看日志3.5 停止日志3.6 删除\刷新日志 4. 错误日志(error log)4.1 启动日志4.2 查看日志4.3 删除\刷新日志4.4 MySQL8.0新…

el-table 去掉边框(修改颜色)

原始&#xff1a; 去掉表格的border属性&#xff0c;每一行下面还会有一条线&#xff0c;并且不能再拖拽表头 为了满足在隐藏表格边框的情况下还能拖动表头&#xff0c;修改相关css即可&#xff0c;如下代码 <style lang"less"> .table {//避免单元格之间出现白…

计算机网络(3) --- 网络套接字TCP

计算机网络&#xff08;2&#xff09; --- 网络套接字UDP_哈里沃克的博客-CSDN博客https://blog.csdn.net/m0_63488627/article/details/131977544?spm1001.2014.3001.5501 目录 1.TCP 1.服务端接口介绍 1.listen状态 2.accept获取链接 2.客户端接口介绍 2.TCP的服务器…

【TypeScript】TS接口interface类型(三)

【TypeScript】TS接口interface类型&#xff08;三&#xff09; 【TypeScript】TS接口interface类型&#xff08;三&#xff09;一、接口类型二、实践使用2.1 常规类型2.2 设置属性只读 readonly2.3 设置索引签名2.4 设置可选属性2.5 函数类型接口 一、接口类型 TypeScript中的…

QtWebApp同时开启http服务和https服务,接受来自客户端的不同请求并进行相应的处理

零、前言 在 QtWebApp开发https服务器&#xff0c;完成客户端与服务器基于ssl的双向认证&#xff0c;纯代码操作 一文中已经用纯代码的形式完成了客户端和服务端的 https 协议交互。 不过&#xff0c;只是开放了https服务&#xff0c;更多情况下&#xff0c;http服务和https服…

奥威BI—数字化转型首选,以数据驱动企业发展

奥威BI系统BI方案可以迅速构建企业级大数据分析平台&#xff0c;可以将大量数据转化为直观、易于理解的图表和图形&#xff0c;推动和促进数字化转型的进程&#xff0c;帮助企业更好地了解自身的运营状况&#xff0c;及时发现问题并采取相应的措施&#xff0c;提高运营效率和质…

多线程(JavaEE初阶系列7)

目录 前言&#xff1a; 1.常见的锁策略 1.1乐观锁和悲观锁 1.2轻量级锁和重量级锁 1.3自旋锁和挂起等待锁 1.4互斥锁与读写锁 1.5可重入锁与不可重入锁 1.6公平锁与非公平锁 2.CAS 2.1什么是CAS 2.2自旋锁的实现 2.3原子类 3.synchronized 3.1synchronized的原理以…