Flink CEP(二) 运行源码解析

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern
                .<Tuple3<String, Long, String>>begin("begin")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("success");
                    }
                })
                .followedByAny("middle")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("fail");
                    }
                })
                .followedBy("end")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("end");
                    }
                });

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {

    /** Name of the pattern. */
    private final String name;

    /** Previous pattern. */
    private final Pattern<T, ? extends T> previous;

    /** The condition an event has to satisfy to be considered a matched. */
    private IterativeCondition<F> condition;

    /** Window length in which the pattern match has to occur. */
    private final Map<WithinType, Time> windowTimes = new HashMap<>();

    /**
     * A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.
     */
    private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);

    /** The condition an event has to satisfy to stop collecting events into looping state. */
    private IterativeCondition<F> untilCondition;

    /** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */
    private Times times;

    private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /**
       * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.
       *
       * <ol>
       *   <li>Single
       *   <li>Looping
       *   <li>Times
       * </ol>
       *
       * <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times
       * also hava an additional inner consuming strategy that is applied between accepted events in the
       * pattern.
       */
      public class Quantifier {
      
          private final EnumSet<QuantifierProperty> properties;
      
          private final ConsumingStrategy consumingStrategy;
      
          private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(
            final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
        return new PatternStreamBuilder<>(
                inputStream, pattern, TimeBehaviour.EventTime, null, null);
    }

    PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
        this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
    }

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(
            final PatternSelectFunction<T, R> patternSelectFunction,
            final TypeInformation<R> outTypeInfo) {

        final PatternProcessFunction<T, R> processFunction =
                fromSelect(builder.clean(patternSelectFunction)).build();

        return process(processFunction, outTypeInfo);
    }

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(
            final PatternProcessFunction<T, R> patternProcessFunction,
            final TypeInformation<R> outTypeInfo) {

        return builder.build(outTypeInfo, builder.clean(patternProcessFunction));
    }

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(
            final TypeInformation<OUT> outTypeInfo,
            final PatternProcessFunction<IN, OUT> processFunction) {

        checkNotNull(outTypeInfo);
        checkNotNull(processFunction);

        final TypeSerializer<IN> inputSerializer =
                inputStream.getType().createSerializer(inputStream.getExecutionConfig());
        final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;

        final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
        final NFACompiler.NFAFactory<IN> nfaFactory =
                NFACompiler.compileFactory(pattern, timeoutHandling);

        CepOperator<IN, K, OUT> operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
  

        final SingleOutputStreamOperator<OUT> patternStream;
        if (inputStream instanceof KeyedStream) {
            KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;

            patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
        } else {
            KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

            patternStream =
                    inputStream
                            .keyBy(keySelector)
                            .transform("GlobalCepOperator", outTypeInfo, operator)
                            .forceNonParallel();
        }

        return patternStream;
    }

3.CepOperator的执行

        初始化。

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {


        if (isProcessingTime) {
            if (comparator == null) {
                // there can be no out of order elements in processing time
                NFAState nfaState = getNFAState();
                long timestamp = getProcessingTimeService().getCurrentProcessingTime();
                advanceTime(nfaState, timestamp);
                processEvent(nfaState, element.getValue(), timestamp);
                updateNFA(nfaState);
            } else {
                long currentTime = timerService.currentProcessingTime();
                bufferEvent(element.getValue(), currentTime);
            }

        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

            // In event-time processing we assume correctness of the watermark.
            // Events with timestamp smaller than or equal with the last seen watermark are
            // considered late.
            // Late events are put in a dedicated side output, if the user has specified one.

            if (timestamp > timerService.currentWatermark()) {

                // we have an event with a valid timestamp, so
                // we buffer it until we receive the proper watermark.

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    }

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {
        List<IN> elementsForTimestamp = elementQueueState.get(currentTime);
        if (elementsForTimestamp == null) {
            elementsForTimestamp = new ArrayList<>();
            registerTimer(currentTime);
        }

        elementsForTimestamp.add(event);
        elementQueueState.put(currentTime, elementsForTimestamp);
    }

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Override
    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

        // 1) get the queue of pending elements for the key and the corresponding NFA,
        // 2) process the pending elements in event time order and custom comparator if exists
        //		by feeding them in the NFA
        // 3) advance the time to the current watermark, so that expired patterns are discarded.
        // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
        //		have state to be used later.
        // 5) update the last seen watermark.

        // STEP 1
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFAState nfaState = getNFAState();

        // STEP 2
        while (!sortedTimestamps.isEmpty()
                && sortedTimestamps.peek() <= timerService.currentWatermark()) {
            long timestamp = sortedTimestamps.poll();
            advanceTime(nfaState, timestamp);
            // 对事件按时间进行排序
            try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
                elements.forEachOrdered(
                        event -> {
                            try {
                                processEvent(nfaState, event, timestamp);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
            }
            elementQueueState.remove(timestamp);
        }

        // STEP 3
        advanceTime(nfaState, timerService.currentWatermark());

        // STEP 4
        updateNFA(nfaState);
    }
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
            Collection<Map<String, List<IN>>> patterns =
                    nfa.process(
                            sharedBufferAccessor,
                            nfaState,
                            event,
                            timestamp,
                            afterMatchSkipStrategy,
                            cepTimerService);
            if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {
                registerTimer(timestamp + nfa.getWindowTime());
            }
            processMatchedSequences(patterns, timestamp);
        }
    }


    private void processMatchedSequences(
            Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
        PatternProcessFunction<IN, OUT> function = getUserFunction();
        setTimestamp(timestamp);
        for (Map<String, List<IN>> matchingSequence : matchingSequences) {
            function.processMatch(matchingSequence, context, collector);
        }
    }

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/50645.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

详解python中的垃圾回收机制

目录 什么是垃圾回收机制 垃圾回收的工作流程 为什么要进行垃圾回收 详解python中的垃圾回收机制 总结 什么是垃圾回收机制 垃圾回收&#xff08;Garbage Collection&#xff09;是一种自动内存管理机制&#xff0c;用于检测和释放不再被程序使用的内存资源&#xff0c;以…

基于开源IM即时通讯框架MobileIMSDK:RainbowChat v9.0版已发布

关于MobileIMSDK MobileIMSDK 是一套专门为移动端开发的开源IM即时通讯框架&#xff0c;超轻量级、高度提炼&#xff0c;一套API优雅支持UDP 、TCP 、WebSocket 三种协议&#xff0c;支持iOS、Android、H5、标准Java平台&#xff0c;服务端基于Netty编写。 工程开源地址是&am…

Linux 终端生成二维码

1、安装qrencode [rootnode1 script]# yum -y install qrencode2、输出正常的 [rootnode1 ~]# echo https://www.github.com|qrencode -o - -t utf83、输出彩色的 [rootnode1 ~]# qrencode -t utf8 -s 1 https://www.github.com|lolcatPS&#xff1a;没有lolcat命令 #由于…

【计算机视觉中的 GAN 】 - 生成学习简介(1)

一、说明 在阅读本文之前&#xff0c;强烈建议先阅读预备知识&#xff0c;否则缺乏必要的推理基础。本文是相同理论GAN原理的具体化范例&#xff0c;阅读后有两个好处&#xff1a;1 巩固了已经建立的GAN基本概念 2 对具体应用的过程和套路进行常识学习&#xff0c;这种练习题一…

数据结构——单链表

不能毁灭我的&#xff0c;终将使我更强大 文章目录 一、链表 二、单链表 三、实现单链表 1.定义节点 2.由数据生成节点 3.连接并打印链表 4.单链表的基本接口 头插 头删 尾插 尾删 由数据Data找节点 在pos之前插入节点 在pos之后插入节点 删除pos节点 删除po…

Matplotlib_绘制柱状图

绘制柱状图 &#x1f9e9;bar方法 bar()是Matplotlib.pyplot库中用于绘制条形图&#xff08;bar chart&#xff09;的函数。条形图是一种常见的数据可视化图表&#xff0c;用于显示不同类别之间的比较。 函数签名&#xff1a; matplotlib.pyplot.bar(x, height, width0.8, …

【KO】vite使用 git bash here创建vue3项目时方向键失败!

文章目录 起因过程结果 起因 今天使用vite创建ue3项目&#xff0c;因为git使用习惯了就直接用git运行创建命令&#xff0c;前两步都没啥问题&#xff0c;到选择框架的时候问题来了&#xff0c;方向键无效。如图&#xff1a; 过程 常理来说是直接用方向键↑和↓进行选择&…

3d激光slam建图与定位(1)_基于ndt算法定位

一.代码实现流程 二.ndt算法原理 一.该算法定位有三个进程文件 1.map_loader.cpp用于点云地图的读取&#xff0c;从文件中读取点云后对这个点云地图进行旋转平移后发布点云地图到ros #include "map_loader.h"MapLoader::MapLoader(ros::NodeHandle &nh){std::st…

【深度学习笔记】动量梯度下降法

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记&#xff0c;视频由网易云课堂与 deeplearning.ai 联合出品&#xff0c;主讲人是吴恩达 Andrew Ng 教授。感兴趣的网友可以观看网易云课堂的视频进行深入学习&#xff0c;视频的链接如下&#xff1a; 神经网络和…

Python开发之手动实现一维线性插值

Python开发之手动实现一维线性插值 1.线性插值法介绍2.手动实现线性插值3.案例一&#xff1a;手动实现线性插值4.使用pandas的插值方法实现要求(推荐)5.案例二&#xff1a;对一组数据进行线性插值和SG滤波处理 前言&#xff1a;主要介绍手动实现一维线性插值以及pandas里面的in…

【Docker】容器的数据卷

目录 一、数据卷的概念与作用 二、数据卷的配置 三、数据卷容器的配置 一、数据卷的概念与作用 在了解什么是数据卷之前我们先来思考以下这些问题&#xff1a; 1.如果我们一个容器在使用后被删除&#xff0c;那么他里面的数据是否也会丢失呢&#xff1f;比如容器内的MySQL的…

2023年的深度学习入门指南(21) - 百川大模型

2023年的深度学习入门指南(21) - 百川大模型 前面我们用了三节的篇幅介绍了目前最强大的开源模型LLaMA2。这一节我们说一说国产大模型的一个代表&#xff0c;百川大模型。 使用百川大模型 第一步我们先把百川用起来&#xff0c;然后再研究如何训练和其原理如何。 百川的使用…

Mybatis使用collection映射一对多查询分页问题

场景&#xff1a;页面展示列表&#xff0c;需要查询多的字段&#xff0c;和一的字段。并且还要分页。 这时候直接想到的是手写sql。 /*** 标签*/private List<BasicResidentTags> tags;Data TableName("basic_resident_tags") public class BasicResidentTag…

vue3 - element-plus 上传各种 word pdf 文件、图片视频并上传到服务器功能效果,示例代码开箱即用。

效果图 在 vue3 项目中,使用 element plus 组件库的 el-upload 上传组件,进行文件、图片图像的上传功能示例。 完整代码 可直接复制,再改个接口地址。 在这里上传图片和文件是分成

新产品:Stimulsoft Forms 2023.3.1 Crack

Stimulsoft Forms 是一个用于交互式收集和处理用户数据的组件。表单工具可以轻松集成到您的项目或应用程序中&#xff0c;具有直观且用户友好的界面&#xff0c;并允许您创建丰富的表单模板。Stimulsoft Forms 是应用程序中与用户交互的新水平 什么是 Stimulsoft Forms&#xf…

华为数通HCIP-EVPN基础

MP-BGP MP-BGP&#xff08;Multiprotocol Extensions for BGP-4&#xff09;在RFC4760中被定义&#xff0c;用于实现BGP-4的扩展以允许BGP携带多种网络层协议&#xff08;例如IPv6、L3VPN、EVPN等&#xff09;。这种扩展有很好的后向兼容性&#xff0c;即一个支持MP-BGP的路由…

cad丢失mfc140u.dll怎么办?找不到mfc140u.dll的解决方法

第一&#xff1a;mfc140u.dll有什么用途&#xff1f; mfc140u.dll是Windows操作系统中的一个动态链接库文件&#xff0c;它是Microsoft Foundation Class (MFC)库的一部分。MFC是 C中的一个框架&#xff0c;用于构建Windows应用程序的用户界面和功能。mfc140u.dll包含了MFC库中…

Flowable-中间事件-信号中间抛出事件

定义 当流程执行到达信号抛出事件时&#xff0c;流程引擎会直接抛出信号&#xff0c;其他引用了与其相同的信号捕获 事件会被触发&#xff0c;信号发出后事件结束&#xff0c;流程沿后继路线继续执行。其抛出的信号可以被信号开始事 件&#xff08;Signal Start Event&#xf…

从官网认识 JDK,JRE,JVM 三者的关系

点击下方关注我&#xff0c;然后右上角点击...“设为星标”&#xff0c;就能第一时间收到更新推送啦~~~ JVM 是一些大厂面试必问点&#xff0c;要想解决 OOM、性能调优方面的问题&#xff0c;掌握 JVM 知识必不可少&#xff0c;从今天开始&#xff0c;将为大家介绍 JVM 的常用知…

css实现有缺口的border

css实现有缺口的border 1.问题回溯2.css实现有缺口的border 1.问题回溯 通常会有那种两个div都有border重叠在一起就会有种加粗的效果。 div1,div2,div3都有个1px的border&#xff0c;箭头标记的地方是没有处理解决的&#xff0c;很明显看着是有加粗效果的。其实这种感觉把di…