探索Flink动态CEP:杭州银行的实战案例

本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容:

  1. Flink动态CEP简介

  2. Flink动态CEP的应用场景

  3. Flink动态CEP的技术实现

  4. Flink动态CEP的使用方式

  5. 杭州银行应用实践

Tips:点击「阅读原文」跳转阿里云实时计算 Flink~

金融行业大数据技术正在进入成熟期,数据的实时性在金融的实时监控和分析交易数据以识别洗钱行为、欺诈行为、和确保合规性是至关重要的。随着业务环境的快速变化,传统的静态规则引擎已经无法满足这些需求,因为它们在规则变更时需要重启服务,这会导致服务中断和延迟响应。我们引入由 Flink 发展过来的 Flink 动态 CEP 作为行内的动态规则引擎,它能够在不中断服务的情况下动态更新规则,适应不断变化的业务需求。

CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。Flink 动态 CEP,作为 Flink CEP 的高级功能,进一步扩展了这一能力,它支持在不重启服务的情况下动态更新规则。这种动态性不仅提高了系统的灵活性和响应速度,还大大降低了维护成本和复杂性。

01

Flink 动态 CEP 简介

1. Flink 动态 CEP 的定义和核心概念

Flink 动态 CEP 是 Apache Flink 流处理框架的一个高级功能,它允许通过DataStream(数据流)作业方式运行支持动态规则更新的 Flink CEP 作业,对 数据流进行动态的捕获、清洗和分析。Flink 动态 CEP 做到了基于 Flink 全托管快速构建动态加载最新规则来处理上游的数据流,让用户有机会实时掌握数据中重要的高阶特征。

关键概念

①pattern(模式):模式是规则,也是定义规则的方式。一个模式可以是单例或者循环模式,单例只接受一个事件,循环模式可以接受多个事件。用户可以使用pattern 来识别匹配到的事件。多个 pattern 可以组成复杂模式,我们把由多个pattern 组成的复杂模式序列称为 patternProcessor(模式处理器)。

②事件流:事件流可以来自异构上游,可以是 kafka 数据,也可以是数据库表数据(如交易流水类的实时事件流)。当 Flink 动态 CEP 作业启动后,遇到实际输入事件流,Flink 会尝试识别定义的 patternProcessor 并进行动态匹配,最终得到匹配结果。

③动态匹配:Flink 动态 CEP 会实时识别事件流变化,并不断发送给下游算子,下游算子接收到发送的事件进行解析和反序列化后生成真正使用的 patternProcessor,根据最新的 patternProcessor 定义的规则进行动态匹配。

2. Flink 动态 CEP 解决的问题

Flink CEP 是一种规则引擎,是通过设置规则模式来匹配事件的。而频繁变化的交易、记账场景要求我们对初始规则进行调整或者对规则进行新增。例如一个 CEP 作业初始规则是转账用户在一分钟内连续进行3次转账后将其认为是风险操作。而在特殊场景,预期转账次数会多一点,一分钟3次的转账次数阈值可能不合适,在当前开源 Flink CEP 实现下,没法做到使用户无感的转换,只能重新编写 Java 代码,然后重启作业,以使最新的规则生效。这样的操作带来时间成本较高和重启作业代价高的问题。因为要走一遍完整的代码开发和打包上线流程对于对时间延迟敏感程度高的银行风控领域是难以接受的,且规则引擎里通常会维护很多不同的规则,如果简单的规则修改都需要较长的时间窗口,会影响其他人的使用,维护起来也比较困难。Flink 动态 CEP 很好的降低了传统规则引擎较高的时间成本并做到无需重启作业就能丝滑更新规则,以下是 Flink 动态 CEP 解决的主要问题:

①动态规则更新:传统规则引擎在规则变更时需要重新部署和启动作业,这会导致服务中断,影响系统的实时性和可用性。而 Flink 动态 CEP 允许在不中断服务的情况下动态加载和更新 CEP 规则,这意味着可以在运行时修改模式匹配逻辑,而无需重启整个 Flink 作业。

②多规则支持:在静态场景下使用多条规则时,传统 Flink CEP 需要创建多个 CepOperator(CEP算子),这会导致数据的额外拷贝,增加处理开销。Flink动态 CEP 支持在一个 Operator(算子)中处理多条规则,减少了数据拷贝,提高了处理效率。

③参数化 Condition 支持:Flink 动态 CEP 支持在 Json 格式规则描述中定义参数化的 Condition,提高了自定义 Condition 的拓展性,解决了动态添加新的 Condition 类实现的需求。

02

Flink 动态 CEP 的应用场景

Flink 动态 CEP 就像是一个智能监控系统,它不仅能在线识别风险行为(比如洗钱或欺诈),还能为实时营销助力,为业务赋能。和金融领域相关的应用场景如下:

1. 反洗钱

Flink 动态 CEP 可以监控银行账户的交易活动,识别出类似洗钱的行为。例如,可以设置规则来识别短时间内频繁的大额存款和取款行为,或者识别出与洗钱交易相关的账户之间的资金流动,从而触发反洗钱调查。也可以结合大数据技术和机器学习技术构建洗钱风险监测模型,更准确地识别可疑交易和潜在洗钱风险客户。还可以运用 Flink 动态 CEP 的流式计算技术实时分析处理客户的全链路交易信息,结合知识图谱、实时智能等技术,构建起全行级别反洗钱领域客户关系网络图,深入融合可疑交易特征,动态完整展现资金流转全貌。

2. 反欺诈

国内电信网络诈骗非常的猖獗,金融领域的反欺诈系统对电信网络诈骗案件能起到非常关键的作用,能及时阻断欺诈案件中的资金流动减少用户资金损失。反欺诈系统对系统本身分布式、实时性、规则灵活、复杂规则匹配能力要求非常高,而 Flink 动态 CEP 在 Flink 的分布式、实时性的特性基础上,增加复杂规则匹配和规则动态配置能力,为反欺诈系统提供一种很好的解决方案。

3. 实时营销

在金融客户申请信用卡的时候,客户通常需要完成填充基本信息、个人身份信息认证等多个步骤完成信用卡的申请。用户在多步骤申请信用卡的过程中,有可能会因为各种原因在其中的任意一个环节退出、失败或超时。针对这种情况,利用客户行为日志作为数据源,Flink 动态 CEP 可以利用多种规则对各个环节客户的行为数据做规则匹配、计算。并可以根据输出结果做多种营销策略的输出,如推送客户优惠券、推送消息给客户经理及时联系客户来提高营销效率,为业务赋能。

03

Flink 动态 CEP 的技术实现

根据以上背景并基于阿里在社区提出的 FLIP-200 方案,ververica-cep 开源demo,数据架构研发团队在部门内实现了一版 Flink 动态 CEP 的支持。下面详细介绍我们是如何实现的。

在 Flink 动态 CEP 中我们复用了 Flink 的 OperatorCoordinator(算子协调器)机制,用它来负责协调FLink作业中的各个 operator(算子)。OperatorCoordinator 在 JobManager 中运行,会给 TaskManager 的 Operator 发送事件,我们实现的 DynamicCEPOperatorCoordinator(动态 CEP 算子协调器)是 OperatorCoordinator 的实现类,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer(模式处理器探查器)接口拿到最新的 PatternProcessor。Flink 动态 CEP 的整体架构图如下所示:

28e9f5eb864f2dda2f6a927c58f8f682.png

上图展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到OperatorCoordinator 会调用 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,拿到后它会发送给和它关联的DynamicCEPOp(动态cep算子)。DynamicCEPOp 接收到发送的事件进行解析和反序列化后,最终生成要使用的 PatternProcessor 并构造相应的NFA(非确定有限状态机)。之后即可使用新构造的NFA来处理上游发生的事件,并最终输出到下游。基于这样的方式,可以做到不停机的规则更新,且只有 OperatorCoordinator 和规则数据库交互,可以减少对数据库的访问,并利用Flink 的特性保证下游 sub_task 中使用规则的一致性。

了解了 Flink 动态CEP获取规则的流程,接下来要构建FlinkCEP作业,最重要的方法,就是构建 CEP.dynamicPatterns(),阿里云实时计算 Flink 版已经定义了CEP.dynamicPatterns()Api,该 API 定义代码如下:

public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
        DataStream<T> input,
        PatternProcessorDiscovererFactory<T> patternProcessorDiscovererFactory,
        TimeBehaviour timeBehaviour,
        TypeInformation<R> outTypeInfo)

该方法入参说明如下:

 参数  说明  
 DataStream<T>  input  输入事件流  
PatternProcessorDiscovererFactory<T>  patternProcessorDiscovererFactory  工厂对象,负责构造一个探查器(PatternProcessorDiscoverer),探查器负责获取最新规则,即构造一个PatternProcessor接口  
 TimeBehaviour  TimeBehaviour  描述FlinkCEP作业如何处理事件的时间属性。参数取值如下:ProcessingTime:代表按照ProcessingTime处理事件  EventTime:代表按照Event Time处理事件  
 TypeInformation<R>  OutTypeInfo  描述输出流的类型信息  

dynamicPatterns() 方法中 input、OutTypeInfo 分别定义输入和输出流,TimeBehaviour 定义时间属性,这里不需要多做介绍,PatternProcessorDiscovererFactory<T>接口负责构造探查器 PatternProcessorDiscoverer 以拿到最新 PatternProcessor,在实现Flink动态CEP功能中起到关键作用,故本文着重对 patternProcessor、 PatternProcessorDiscoverer 两个接口及其实现类和负责拿到 PatternProcessor 并发送给下游算子的 DynamicCEPOperatorCoordinator 的代码进行详细。

1. patternProcessor接口及其实现

public  interface PatternProcessor<IN> extends Serializable, Versioned{
    String getId();

    default Long getTimestamp(){
        return Long.MIN_VALUE;
    }

    Pattern<IN,?> getPattern(ClassLoader classLoader);

    PatternProcessFunction<IN,?> getPatternProcessFunction();
}

PatternProcessor 接口用于完整定义CEP中的一条规则。一个PatternProcessor 实现类包含一个确定的模式(Pattern)用于描述如何去匹配事件、一个 PatternProcessFunction 用于描述怎么处理一个匹配事件。除此之外还包括id和 version(可选)等用于标识 PatternProcessFunction 的信息属性。因此一个PatternProcessor既包含规则本身,也指明了规则触发时,Flink 作业如何做出响应。

@PublicEvolving
public class DefaultPatternProcessor<T> implements PatternProcessor<T> {

    /** The ID of the pattern processor. */
    private final String id;

    /** The version of the pattern processor. */
    private final Integer version;

    /** The pattern of the pattern processor. */
    private final String patternStr;

    private final @Nullable PatternProcessFunction<T, ?> patternProcessFunction;

    public DefaultPatternProcessor(
            final String id,
            final Integer version,
            final String pattern,
            final @Nullable PatternProcessFunction<T, ?> patternProcessFunction,
            final ClassLoader userCodeClassLoader) {
        this.id = checkNotNull(id);
        this.version = checkNotNull(version);
        this.patternStr = checkNotNull(pattern);
        this.patternProcessFunction = patternProcessFunction;
    }

    @Override
    public String toString() {
        return "DefaultPatternProcessor{"
                + "id='"
                + id
                + '\''
                + ", version="
                + version
                + ", pattern="
                + patternStr
                + ", patternProcessFunction="
                + patternProcessFunction
                + '}';
    }

    @Override
    public String getId() {
        return id;
    }

    @Override
    public int getVersion() {
        return version;
    }

    @Override
    public Pattern<T, ?> getPattern(ClassLoader classLoader) {
        try {
            return (Pattern<T, ?>) CepJsonUtils.convertJSONStringToPattern(patternStr, classLoader);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

        @Override
        public PatternProcessFunction<T,?> getPatternProcessFunction(){
         return patternProcessFunction;
    }
}

DefaultPatternProcessor 类是 PatternProcessor 的默认实现,它接收 id, version, pattern 字符串, PatternProcessFunction 和 ClassLoader 作为参数。并使用 checkNotNull 确保除了 patternProcessFunction 外的参数不为 null。它的 getPattern 方法中包括转换json字符串到CEP能识别的 pattern 的方法 convertJSONStringToPattern(),我们重写了 convertJSONStringToPattern() 方法,接受入参为我们指定的 classloader (类加载器)如下所示:

public static Pattern<?, ?> convertJSONStringToPattern(
        String jsonString, ClassLoader userCodeClassLoader) throws Exception {
    if (userCodeClassLoader == null) {
        LOG.warn(
                "The given userCodeClassLoader is null. Will try to use ContextClassLoader of current thread.");
        return convertJSONStringToPattern(jsonString);
    }
    GraphSpec deserializedGraphSpec = objectMapper.readValue(jsonString, GraphSpec.class);
    return deserializedGraphSpec.toPattern(userCodeClassLoader);
}

它的核心方法 toPattern() 涉及到 GraphSpec 类和方法本身,GraphSpec 类是Flink 中用于描述 Pattern 序列化和反序列化的工具,它用于处理由节点 (Nodes) 和边 (Edges) 组成的图形结构。这里的节点可以是单独的 Pattern 或者是嵌套的 GraphSpec,边则定义了节点之间的关系和数据流的方向,这和数据库中存储的规则Dag紧密相关,这里不做过多解释,具体来看 toPattern() 方法的实现:

public Pattern<?, ?> toPattern(final ClassLoader classLoader) throws Exception {
    // Construct cache of nodes and edges for later use
    final Map<String, NodeSpec> nodeCache = new HashMap<>();
    for (NodeSpec node : nodes) {
        nodeCache.put(node.getName(), node);
    }
    final Map<String, EdgeSpec> edgeCache = new HashMap<>();
    for (EdgeSpec edgeSpec : edges) {
        edgeCache.put(edgeSpec.getSource(), edgeSpec);
    }
    String currentNodeName = findBeginPatternName();
    Pattern<?, ?> prevPattern = null;
    String prevNodeName = null;
    while (currentNodeName != null) {
        NodeSpec currentNodeSpec = nodeCache.get(currentNodeName);
        EdgeSpec edgeToCurrentNode = edgeCache.get(prevNodeName);
        Pattern<?, ?> currentPattern =
                currentNodeSpec.toPattern(
                        prevPattern,
                        afterMatchStrategy.toAfterMatchSkipStrategy(),
                        prevNodeName == null
                                ? ConsumingStrategy.STRICT
                                : edgeToCurrentNode.getType(),
                        classLoader);
        if (currentNodeSpec instanceof GraphSpec) {
            ConsumingStrategy strategy =
                    prevNodeName == null
                            ? ConsumingStrategy.STRICT
                            : edgeToCurrentNode.getType();
            prevPattern =
                    buildGroupPattern(
                            strategy, currentPattern, prevPattern, prevNodeName == null);
        } else {
            prevPattern = currentPattern;
        }
        prevNodeName = currentNodeName;
        currentNodeName =
                edgeCache.get(currentNodeName) == null
                        ? null
                        : edgeCache.get(currentNodeName).getTarget();
    }
    // Add window semantics
    if (window != null && prevPattern != null) {
        prevPattern.within(this.window.getTime(), this.window.getType());
    }

    return prevPattern;
}

toPattern方法是 GraphSpec 类中的核心方法之一,它负责将 GraphSpec 对象序列化信息反序列化回 Pattern 对象。它的内部逻辑包含几个步骤:

①构建节点和边缓存:创建 nodeCache 和 edgeCache 映射,分别存储NodeSpec和 EdgeSpec 实例。这有助于在后续处理中快速查找和使用节点和边的信息

②确定开始节点:初始化 currentNodeName 变量,它表示当前处理的节点名称。这个值通过调用 findBeginPatternName() 方法获得,该方法确保从图中的开始节点开始处理。

③构建 Pattern 迭代:

使用循环迭代所有节点,从开始节点开始,根据边的信息向前构建模式。在每次迭代中:从 nodeCache 获取当前节点的 NodeSpec。从 edgeCache 获取从上一个节点到当前节点的 EdgeSpec(如果存在)。使用 NodeSpec 和 EdgeSpec 构建或更新当前的 Pattern。这涉及到根据消耗策略(ConsumingStrategy)来使用不同的 Pattern 方法,如 Pattern.begin(), Pattern.next(),Pattern.followedBy(), 或 Pattern.followedByAny()。最后更新 prevPattern 和 prevNodeName 为下一个迭代做准备。最终返回构建完成的Pattern对象。

以上详细介绍了 patternProcessor 接口实现和其中的关键方法,描述了可用的Pattern 构建过程。下面介绍 PatternProcessorDiscoverer 接口及其实现。

2. PatternProcessorDiscoverer接口及其实现  

public abstract interface PatternProcessorDiscoverer<T> extends Closeable
{
    public abstract void discoverPatternProcessorUpdates(PatternProcessorManager<T> paramPatternProcessorManager);
}

PatternProcessorDiscoverer 接口用于描述如何发现 Processor。

我们基于阿里云默认周期性扫描外部存储的抽象类periodicPatternProcessorDiscoverer,提供了一个用于从支持 JDBC 协议的数据库中拉取最新规则的实现:JDBCPeriodicPatternProcessorDiscoverer

public class JDBCPeriodicPatternProcessorDiscoverer<T>
        extends PeriodicPatternProcessorDiscoverer<T> {

    private static final Logger LOG =
            LoggerFactory.getLogger(JDBCPeriodicPatternProcessorDiscoverer.class);

    private final String tableName;
    private final String userName;
    private final String password;
    private final String jdbcUrl;
    private final String tenant;
    private final List<PatternProcessor<T>> initialPatternProcessors;
    private final ClassLoader userCodeClassLoader;
    private Connection connection;
    private Statement statement;
    private ResultSet resultSet;
    private Map<String, Tuple4<String, Integer, String, String>> latestPatternProcessors = new ConcurrentHashMap<>();

    /**
     * Creates a new using the given initial {@link PatternProcessor} and the time interval how
     * often to check the pattern processor updates.> *
     *
     * @param jdbcUrl The JDBC url of the database.> * @param jdbcDriver The JDBC driver of the database.> * @param initialPatternProcessors The list of the initial {@link PatternProcessor}.> * @param intervalMillis Time interval in milliseconds how often to check updates.>
     */
    public JDBCPeriodicPatternProcessorDiscoverer(
            final String jdbcUrl,
            final String jdbcDriver,
            final String tableName,
            final String userName,
            final String password,
            @Nullable final String tenant,
            final ClassLoader userCodeClassLoader,
            @Nullable final List<PatternProcessor<T>> initialPatternProcessors,
            @Nullable final Long intervalMillis)
            throws Exception {
        super(intervalMillis);
        this.tableName = requireNonNull(tableName);
        this.initialPatternProcessors = initialPatternProcessors;
        this.userCodeClassLoader = userCodeClassLoader;
        this.userName = userName;
        this.password = password;
        this.jdbcUrl = jdbcUrl;
        this.tenant = tenant;
        Class.forName(requireNonNull(jdbcDriver));
        this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);
        this.statement = this.connection.createStatement();
    }

JDBCPeriodicPatternProcessorDiscoverer 包括两个关键方法 arePatternProcessorsUpdated() 和 getLatestPatternProcessors(),分别用于判断 PatternProcessors 是否被更新和获取最新的 PatternProcessors。

@Override
public boolean arePatternProcessorsUpdated() throws SQLException {
    if (latestPatternProcessors == null
            && !CollectionUtil.isNullOrEmpty(initialPatternProcessors)) {
        return true;
    }
    LOG.info("Start check is pattern processor updated.");

    if (statement == null) {
        try {
            this.connection = DriverManager.getConnection(requireNonNull(jdbcUrl), userName, password);
            this.statement = this.connection.createStatement();
        } catch (SQLException e) {
            LOG.error("Connect to database error!", e);
            throw e;
        }
    }
    try {
        String sql = buildQuerySql();
        LOG.info("Statement execute sql is {}", sql);
        resultSet = statement.executeQuery(sql);
        Map<String, Tuple4<String, Integer, String, String>> currentPatternProcessors = new ConcurrentHashMap<>();
        while (resultSet.next()) {
            LOG.debug("check getLatestPatternProcessors start :{}", resultSet.getString(1));
            String id = resultSet.getString("id");
            if (currentPatternProcessors.containsKey(id)
                    && currentPatternProcessors.get(id).f1 >= resultSet.getInt("version")) {
                continue;
            }
            currentPatternProcessors.put(
                    id,
                    new Tuple4<>(
                            requireNonNull(resultSet.getString("id")),
                            resultSet.getInt("version"),
                            requireNonNull(resultSet.getString("pattern")),
                            resultSet.getString("function")));
        }
        if (latestPatternProcessors == null
                || isPatternProcessorUpdated(currentPatternProcessors)) {
            LOG.debug("latest pattern processors size is {}", currentPatternProcessors.size());
            latestPatternProcessors = currentPatternProcessors;
            return true;
        } else {
            return false;
        }
    } catch (SQLException e) {
        LOG.error(
                "Pattern processor discoverer failed to check rule changes, will recreate connection.", e);
        try {
            statement.close();
            connection.close();
            connection = DriverManager.getConnection(requireNonNull(this.jdbcUrl), this.userName, this.password);
            statement = connection.createStatement();
        } catch (SQLException ex) {
            LOG.error("Connect pattern processor discovery database error.", ex);
            throw new RuntimeException("Cannot recreate connection to database.");
        }
    }
    return false;
}

arePatternProcessorsUpdated() 用于检查数据库中存储的模式处理器是否发生了更新,它首先会检查是否有尚未处理的初始模式处理器列表(initialPatternProcessors),如果存在未被处理的 PatternProcessor,则直接返回true。接着建立数据库连接,调用 buildQuerySql() 来执行 sql,用于从 tableName 指定的表中获取所有或特定租户 (tenant) 的模式处理器信息。然后处理sql的执行结果,对每一个 currentPatternProcessors,检查是否已存在或版本是否更旧。如果存在更旧的版本则跳过,否则更新 currentPatternProcessors 映射。如果 latestPatternProcessors 为空或存在更新,则用 currentPatternProcessors 更新 latestPatternProcessors,并返回 true,表示有更新。

@Override
public List<PatternProcessor<T>> getLatestPatternProcessors() throws Exception {
    LOG.debug("Start convert pattern processors to default pattern processor.");
    return latestPatternProcessors.values().stream()
            .map(
                    patternProcessor -> {
                        try {
                            String patternStr = patternProcessor.f2;
                            GraphSpec graphSpec =
                                    CepJsonUtils.convertJSONStringToGraphSpec(patternStr);
                            LOG.debug("Latest pattern processor is {}",
                                    CepJsonUtils.convertGraphSpecToJSONString(graphSpec));
                            PatternProcessFunction<T, ?> patternProcessFunction = null;
                            String id = patternProcessor.f0;
                            int version = patternProcessor.f1;
                            if (!StringUtils.isNullOrWhitespaceOnly(patternProcessor.f3)) {
                                patternProcessFunction =
                                        (PatternProcessFunction<T, ?>)
                                                this.userCodeClassLoader
                                                        .loadClass(patternProcessor.f3)
                                                        .getConstructor(String.class, int.class, String.class)
                                                        .newInstance(id, version, tenant);
                            }
                            return new DefaultPatternProcessor<>(
                                    patternProcessor.f0,
                                    patternProcessor.f1,
                                    patternStr,
                                    patternProcessFunction,
                                    this.userCodeClassLoader);
                        } catch (Exception e) {
                            LOG.error(
                                    "Get the latest pattern processors of the discoverer failure. - ", e);
                            e.printStackTrace();
                        }
                        return null;
                    }).filter(pre -> pre != null).collect(Collectors.toList());
}

getLatestPatternProcessors() 方法涉及从数据库获取最新 PatternProcessors的过程,利用 StreamAPI,将存储在 ConcurrentHashMap 中的模式处理器信息转换为 PatternProcessor 列表。这里涉及到实例化的过程:根据模式处理器信息中的类名(patternProcessor.f3),通过类加载器加载并实例化自定义的 PatternProcessFunction。如果类名不为空或非空字符串,将其转换为对应的 Java 类,并调用构造函数,传入处理器的 id、version 和租户 tenant 信息。使用上述信息,创建一个 DefaultPatternProcessor 实例,封装模式字符串、自定义的处理器函数、类加载器等信息,最后返回一个PatternProcessor 列表,其中包含了从数据库中获取的所有模式处理器的最新实例。这些实例可以被 Flink 的 CEP 功能直接使用,以处理复杂事件模式匹配。

3. PatternProcessorDiscoverer接口及其实现

接下来介绍 DynamicCepOperatorCoordinator(动态CEP算子协调器),它承担着调用上文 PatternProcessorDiscoverer 接口从数据库中拿到最新的且序列化后的 PatternProcessor,并发送给和它关联的 DynamicCEPOp 的任务如下所示:

public class DynamicCepOperatorCoordinator<T> implements OperatorCoordinator {
    private static final Logger LOG =
            LoggerFactory.getLogger(DynamicCepOperatorCoordinator.class);

    private final DynamicCepOperatorCoordinatorContext cepCoordinatorContext;
    private final PatternProcessorDiscovererFactory discovererFactory;
    private final String operatorName;
    private boolean started;
    private volatile boolean closed;

    public DynamicCepOperatorCoordinator(String operatorName, PatternProcessorDiscovererFactory discovererFactory, DynamicCepOperatorCoordinatorContext context) {
        this.cepCoordinatorContext = context;
        this.discovererFactory = discovererFactory;
        this.operatorName = operatorName;
        this.started = false;
        this.closed = false;
    }

    @Override
    public void start() throws Exception {
        Preconditions.checkState(!started, "Dynamic Cep Operator Coordinator Started!");
        LOG.info("Starting Coordinator for {}:{}", this.getClass().getSimpleName(), operatorName);
        cepCoordinatorContext.runInCoordinatorThreadWithFixedRate(()->{
            if (discovererFactory instanceof PeriodicPatternProcessorDiscovererFactory) {
                try {
                PeriodicPatternProcessorDiscoverer patternProcessorDiscoverer =
                        (PeriodicPatternProcessorDiscoverer) discovererFactory
                                .createPatternProcessorDiscoverer(cepCoordinatorContext.getUserCodeClassloader());
                boolean updated = patternProcessorDiscoverer.arePatternProcessorsUpdated();
                    if (updated && started) {
                    Set<Integer> subtasks = cepCoordinatorContext.getSubtasks();
                    if (!patternProcessorDiscoverer.getLatestPatternProcessors().isEmpty()) {
                        UpdatePatternProcessorEvent updatePatternProcessorEvent =
                                new UpdatePatternProcessorEvent(patternProcessorDiscoverer.getLatestPatternProcessors());
                        subtasks.forEach(subtaskId -> {
                            cepCoordinatorContext.sendEventToOperator(subtaskId, updatePatternProcessorEvent);
                        });
                    }
                }
                } catch (Exception e) {
                    LOG.error("Starting Coordinator failed", e);
                }
            }
        });
        started = true;
    }

    @Override
    public void close() throws Exception {
        closed = true;
        cepCoordinatorContext.close();
    }

    @Override
    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {
        LOG.info("Received event {} from operator {}.", event, subtask);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {
//        cepCoordinatorContext.runInCoordinatorThread(() -> {
            LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
            try {
                resultFuture.complete("Dynamic cep".getBytes(StandardCharsets.UTF_8));
            } catch (Throwable e) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
                resultFuture.completeExceptionally(
                        new CompletionException(
                                String.format(
                                        "Failed to checkpoint for dynamic cep %s",
                                        operatorName),
                                e));
            }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {

    }

    @Override
    public void subtaskReset(int subtask, long checkpointId) {
    }

    @Override
    public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
        cepCoordinatorContext.subtaskNotReady(subtask);
    }

    @Override
    public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
        cepCoordinatorContext.subtaskReady(gateway);
    }
}

下面只介绍它的关键方法start(),用于负责初始化和激活协调器的运行流程:

start() 方法调用 cepCoordinatorContext.runInCoordinatorThreadWithFixedRate 来安排一个周期性执行的任务。这个方法将在框架的协调器线程中执行一个 lambda 表达式定义的任务,定期检查模式处理器更新。在这里我们定义的时间是10s,也就是每10s检查和执行一次 patternProcessors 的更新逻辑。然后构建UpdatePatternProcessorEvent,由 cepCoordinatorContext 来广播它给下游算子。需要注意的是,DynamicCepOperatorCoordinator 是 jobmanager 运行的线程,和 taskmanager 中 PatternProcessor 的产生过程是异步的。

04

Flink 动态 CEP 的使用方式

本章介绍如何编写 Flink 动态 CEP 作业,具体操作流程如下(以Kafka源为例):

1. 连接数据源(数据源也可以是来自数据库,配置不同的连接器即可)

public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Classloader initial
        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// Process args
// Build Kafka source with new Source API based on FLIP-27
        Properties prop =new Properties();
        prop.setProperty("security.protocol","SASL_PLAINTEXT");
        prop.setProperty("sasl.mechanism","SCRAM-SHA-256");
        prop.setProperty("sasl.jaas.config",
                "org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule" +
                " required username=\"100670\" password=\"000000000\";");
        KafkaSource<Event> kafkaSource = KafkaSource.<Event>builder()
                .setBootstrapServers("123.4.50.105:9292,123.4.60.106:9292,123.4.50.107:9292")
                .setTopics("cep_test1").setGroupId("test").setStartingOffsets(OffsetsInitializer.earliest())
                .setProperties(prop).setValueOnlyDeserializer((new KafkaJsonDeserializer())).build();
        env.setParallelism(1);
        DataStream<Event> input = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "source");
// keyBy userId and productionId
// Notes, only events with the same key will be processd to see if there is a match
        KeyedStream<Event, Tuple2<String, String>> keyedStream =
                input.keyBy(
                        new KeySelector<Event, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> getKey(Event value) throws Exception {
                                return Tuple2.of(value.getName(), value.getName());
                            }
                        });

①初始化执行环境

②Kafka 源配置,并将事件流 Event 根据 name 字段进行 keyby

2. 构建动态规则匹配

long time = 1000;
SingleOutputStreamOperator<String> output = CEP.dynamicPatterns(
        keyedStream,
        new JDBCPeriodicPatternProcessorDiscovererFactory<>(
        "jdbc:mysql//123.45.6.789:3306/cep_demo_db",
        "com.mysql.cj.jdbc.Driver",
        "rds_demo",
        "riskcollateral",
        "riskcollateral",
        null,
        null,
        timer),
        TimeBehaviour.ProcessingTime,
        TypeInformation.of(new TypeHint<String>()){
        }));
    output.addSink(new PrintSinkFunction<>().name("cep"));
    env.excute("CEPDemo");
  }
}

3. 构建并运行

我们使用 Streampark 作为 Flink 作业的运维管控平台,根据以下步骤创建 Flink jar 包作业:

①添加jar包资源:

02d301498115fbbfeeec74fa21e3996f.png

②添加作业:

847d2819a9aa21892bbf84ecdff6015a.png

③添加作业相关配置:

84b4e0476974b0a66d1bd52c59707623.png

④发布及启动作业:

7789d032824da2123783cafcbe1eb030.png

847d666775508007dc6a175e6c1cd249.png

4. 插入规则

①建表 rds_demo 用于存储 cep 规则:

22fd317ca9cdda7e247d18f23ccbb261.png

②插入动态更新规则:

将表示 Pattern 的 JSON 字符串与 id、version、function 类名一起插入 rds_demo 表中(阿里云实时计算Flink版定义了一套 JSON 格式的规则描述,详情请参加阿里云文档——动态 CEP 中规则的 JSON 格式定义):

 id   version   pattern   function  
 1   1   {"name":"end","quantifier":{"consumingStrategy}...   xxxpackage.dynamic.cep.core.DemoPatternProcessFunction    

将 pattern 的 JSON 字符串解析后,展示如下:

{
    "name": "end",
    "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
            "SINGLE"
        ],
        "times": null,
        "untilCondition": null
    },
    "condition": null,
    "nodes": [
        {
            "name": "end",
            "quantifier": {
                "consumingStrategy": "SKIP_TILL_NEXT",
                "properties": [
                    "SINGLE"
                ],
                "times": null,
                "untilCondition": null
            },
            "condition": {
                "className": "xxxpackage.dynamic.cep.core.EndCondition",
                "type": "CLASS"
            },
            "type": "ATOMIC"
        },
        {
            "name": "start",
            "quantifier": {
                "consumingStrategy": "SKIP_TILL_NEXT",
                "properties": [
                    "LOOPING"
                ],
                "times": null,
                "untilCondition": null
  
            },
            "type": "ATOMIC"
        }
    ],
    "edges": [
        {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
        }
    ],
    "window": null,
    "afterMatchStrategy": {
        "type": "SKIP_PAST_LAST_EVENT",
        "patternName": null
    },
    "type": "COMPOSITE",
    "version": 1
}

这段 JSON 规则描述了一个复合模式 (COMPOSITE),它由两个原子节点(ATOMIC)组成:“start”和“end”。

这个模式目的是匹配一个特定的事件序列,其中“start”节点匹配 action 等于0的输入事件,而“end”节点匹配“xxxpackage.dynamic.cep.core.EndCondition”这个类定义的事件,这个条件由开发者定义,例如:

public class EndCondition extends SimpleCondition<Event> {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getAction() != 1;
    }
}

这个 EndCondition 用于检查事件的 action 属性是否不等于1.如果事件的 action 属性不等于1,那么 filter 方法将返回 true,表示事件满足 end 节点的条件。

结合起来,这个模式的匹配的事件序列满足:“start”节点匹配所有 action 等于0的事件,一旦遇到一个 action 不等于1的事件,“end”节点的条件被满足,整个模式匹配完成。

function 字段用 DemoPatternProcessFunction 类的全路径加类名指定,记录了匹配到记录以后的处理方法如下:

public class DemoPatternProcessFunction<IN> extends PatternProcessFunction<IN, String> {
    String id;
    int version;
    String tenant;

    public DemoPatternProcessFunction(String id, int version, String tenant) {
        this.id = id;
        this.version = version;
        this.tenant = tenant;
    }

    @Override
    public void processMatch(
            final Map<String, List<IN>> match, final Context ctx, final Collector<String> out) {
        StringBuilder sb = new StringBuilder();
        sb.append("A match for Pattern of (id, version): (")
                .append(id)
                .append(", ")
                .append(version)
                .append(") is found. The event sequence: ").append("\n");
        for (Map.Entry<String, List<IN>> entry : match.entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue().get(0).toString()).append("\n");
        }
        out.collect(sb.toString());
    }
}

这个处理方法是如果 PatternProcessor 匹配到一个事件序列,processMatch 方法将生成对应的描述性字符串,并由下游算子通过 Collector 将其输出。

5. 输入事件流

假如有一个事件序列如下:

private static void sendEvents(Producer<String, String> producer, String topic) {
    ObjectMapper objectMapper = new ObjectMapper();

    Event[] events = {
            new Event("ken", 1, 1, 0, 1662022777000L),
            new Event("ken", 2, 1, 0, 1662022778000L),
            new Event("ken", 3, 1, 1, 1662022779000L),
            new Event("ken", 4, 1, 2, 1662022780000L),
            new Event("ken", 5, 1, 1, 1662022780000L)
    };
    while (true) {
        try {
            for (Event event : events) {
                String json = objectMapper.writeValueAsString(event);
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, json);
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        LOG.error("Failed to send data to Kafka: ", exception);
                    } else {
                        System.out.println(metadata.topic());
                        LOG.info("Data sent successfully to topic {} at offset {}",
                                metadata.topic(), metadata.offset());
                    }
                });
            }
        } catch (Exception e) {
            LOG.error("Error while sending events to Kafka: ", e);
        }
    }
}

我们往 Kafka Topic 插入 events,我们将会观察到 “start” 节点会匹配前两个事件,因为它们的 action 属性为0。第四个事件 action 不等于1,因此“end”节点的条件被满足,模式匹配完成。第五个事件不会影响已经完成的模式匹配。

05

杭州银行应用实践

杭州银行在我们开发的 Flink 动态 CEP 规则引擎下,也有实际的业务场景落地和应用,如事件中心-行为序列事件模块。

事件中心是以用户行为埋点数据作为数据源,对他们进行处理和分析,并输出结果辅助业务决策的平台。其中行为序列事件模块应用了行内开发的 Flink 动态 CEP 技术。事件中心-行为序列事件模块如下:

新增一个行为序列事件,填好基础信息后,用户可在行为序列配置里可以新增事件或事件组,并配置事件过期时间。

0ace84bfe155477ed215aba89f5dea5c.jpeg

一个行为序列事件模板如下:

1648494f3010dd37113ac3fed0387d19.jpeg

如下图所示,1-5原子事件表示某用户的埋点行为序列,作为 Flink 动态 CEP 的输入流 event 按照埋点顺序进入动态规则匹配,而匹配的规则是事件过期时间,这里为 20分钟。例如某输入流在 20分钟内还未完成全部五个原子事件,而只完成到事件4,这样则视为模式匹配完成,匹配到的事件为事件1到事件4,可以通过配置输出流输出自定义的规则匹配结果(如用户名字、错误原因、用户手机号码等)到 kafka、rocketMQ 等消息队列。如此,就能给业务更有价值的数据支持,做针对性的用户推荐。

4c95a1b51b522890b9410c925b9caf6b.jpeg

Flink 动态 CEP 在事件中心实践中的优势体现在,修改或新增规则或事件序列,完全无需启停服务,只需直接编辑并保存。web 端修改会同步修改数据库中保存的规则,然后选择上线,动态规则转换就完成了。

55ffdc53daa7c37dc01d4bd4c8da4e72.jpeg

【参考文献】

[1]阿里云开发者社区.(2023−02−10).Flink CEP 新特性进展与在实时风控场景的落地.阿里云开发者社区.https://developer.aliyun.com/article/1157197

[2]阿里云帮助中心. (2023-11-07). Flink 动态 CEP 快速入门_实时计算 Flink版(Flink). 阿里云帮助中心. https://help.aliyun.com/zh/flink/getting-started/getting-started-with-dynamic-flink-cep

[3]Apache Flink. (2022-09-16). FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP). Apache Flink. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308

[4]Apache Flink. (v1.15.4). FlinkCEP-Flink的复杂事件处理 . https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/libs/cep/

[5]https://github.com/RealtimeCompute/ververica-cep-demohttps://github.com/RealtimeCompute/ververica-cep-demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944439.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32F103RCT6学习之三:串口

1.串口基础 2.串口发送 1&#xff09;基本配置 注意&#xff1a;实现串口通信功能需在keil中设置打开Use Micro LIB&#xff0c;才能通过串口助手观察到串口信息 2)编辑代码 int main(void) {/* USER CODE BEGIN 1 *//* USER CODE END 1 *//* MCU Configuration-------------…

Python中构建终端应用界面利器——Blessed模块

在现代开发中&#xff0c;命令行应用已经不再仅仅是一个简单的文本输入输出工具。随着需求的复杂化和用户体验的重视&#xff0c;终端界面也逐渐成为一个不可忽视的设计环节。 如果你曾经尝试过开发终端UI&#xff0c;可能对传统的 print() 或者 input() 函数感到不满足&#…

OpenHarmony-5.PM 子系统(2)

电池服务组件OpenHarmony-4.1-Release 1.电池服务组件 Battery Manager 提供了电池信息查询的接口&#xff0c;同时开发者也可以通过公共事件监听电池状态和充放电状态的变化。电池服务组件提供如下功能&#xff1a; 电池信息查询。充放电状态查询。关机充电。 电池服务组件架…

Java 网络原理 ①-IO多路复用 || 自定义协议 || XML || JSON

这里是Themberfue 在学习完简单的网络编程后&#xff0c;我们将更加深入网络的学习——HTTP协议、TCP协议、UDP协议、IP协议........... IO多路复用 ✨在上一节基于 TCP 协议 编写应用层代码时&#xff0c;我们通过一个线程处理连接的申请&#xff0c;随后通过多线程或者线程…

基于规则的系统架构:理论与实践

在当今信息化快速发展的时代&#xff0c;企业面临着日益复杂和多变的市场环境&#xff0c;传统的静态系统架构已难以满足快速响应业务变化的需求。基于规则的系统架构&#xff08;Rule-Based System Architecture, RBSA&#xff09;作为一种灵活、可扩展的架构模式&#xff0c;…

记一个itertools排列组合和列表随机排序的例子

朋友不知道哪里弄来了一长串单词列表&#xff0c;一定要搞个单词不重复的组合。那么这个时候我们就可以想到读书时所学的排列组合知识了&#xff0c;而这个在Python中可以怎么实现呢&#xff1f;我记录如下&#xff1a; 使用itertools模块实现排列组合 在 Python 中&#xff…

从0入门自主空中机器人-4-【PX4与Gazebo入门】

前言: 从上一篇的文章 从0入门自主空中机器人-3-【环境与常用软件安装】 | MGodmonkeyの世界 中我们的机载电脑已经安装了系统和常用的软件&#xff0c;这一篇文章中我们入门一下无人机常用的开源飞控PX4&#xff0c;以及ROS中无人机的仿真 1. PX4的安装 1.1 PX4固件代码的下载…

搭建vue项目

一、环境准备 1、安装node node官网&#xff1a;https://nodejs.org/zh-cn 1.1、打开官网&#xff0c;选择“下载”。 1.2、选择版本号&#xff0c;选择系统&#xff0c;根据需要自行选择&#xff0c;上面是命令安装方式&#xff0c;下载是下载安装包。 1.3、检查node安装…

深度学习笔记(5)——目标检测和图像分割

目标检测与图像分割 语义分割:如果没有语义信息,很难正确分类每个像素 解决方案:感知像素周围的语义,帮助正确分类像素 滑窗计算:计算非常低效,图像块的重叠部分会被重复计算很多次 解决方案:转向全卷积 全卷积问题:分类模型会大幅降低特征的分辨率,难以满足分割所需的高分辨…

go语言的成神之路-筑基篇-gin常用功能

第一节-gin参数绑定 目录 第一节-?gin参数绑定 ShouldBind简要概述 功能&#xff1a; 使用场景&#xff1a; 可能的错误&#xff1a; 实例代码 效果展示 第二节-gin文件上传 选择要上传的文件 选择要上传的文件。 效果展示? 代码部分 第三节-gin请求重定向 第…

【Leecode】Leecode刷题之路第93天之复原IP地址

题目出处 93-复原IP地址-题目描述 题目描述 个人解法 思路&#xff1a; todo代码示例&#xff1a;&#xff08;Java&#xff09; todo复杂度分析 todo官方解法 93-复原IP地址-官方解法 方法1&#xff1a;回溯 思路&#xff1a; 代码示例&#xff1a;&#xff08;Java&…

【新方法】通过清华镜像源加速 PyTorch GPU 2.5安装及 CUDA 版本选择指南

下面详细介绍所提到的两条命令&#xff0c;它们的作用及如何在你的 Python 环境中加速 PyTorch 等库的安装。 1. 设置清华镜像源 pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple这条命令的作用是将 pip &#xff08;Python 的包管理工具&#xf…

CES Asia 2025的低空经济展区有哪些亮点?

CES Asia 2025&#xff08;赛逸展&#xff09;的低空经济展区有以下亮点&#xff1a; • 前沿科技产品展示&#xff1a; 多款新型无人机将亮相&#xff0c;如固定翼无人机和系留无人机的最新型号&#xff0c;其在监测、救援和货物运输等方面功能强大。此外&#xff0c;还有可能…

python数据分析之爬虫基础:selenium详细讲解

目录 1、selenium介绍 2、selenium的作用&#xff1a; 3、配置浏览器驱动环境及selenium安装 4、selenium基本语法 4.1、selenium元素的定位 4.2、selenium元素的信息 4.3、selenium元素的交互 5、Phantomjs介绍 6、chrome handless模式 1、selenium介绍 &#xff08;1…

Python学生管理系统(MySQL)

上篇文章介绍的Python学生管理系统GUI有不少同学觉得不错来找博主要源码&#xff0c;也有同学提到老师要增加数据库管理数据的功能&#xff0c;本篇文章就来介绍下python操作数据库&#xff0c;同时也对上次分享的学生管理系统进行了改进了&#xff0c;增加了数据库&#xff0c…

二,Python常用库(共16个)

二&#xff0c;常用库(共15个 二&#xff0c;Python常用库(共15个)1&#xff0c;os模块2&#xff0c;json模块2.1 猴子补丁S 3&#xff0c;random模块4&#xff0c;string模块5&#xff0c;异常处理5.1 错误类型5.1 逻辑错误两种处理方式5.1.1 错误时可以预知的5.1.2 错误时不可…

Linux第99步_Linux之点亮LCD

主要学习如何在Linux开发板点亮屏&#xff0c;以及modetest命令的实现。 很多人踩坑&#xff0c;我也是一样。关键是踩坑后还是实现不了&#xff0c;这样的人确实很多&#xff0c;从群里可以知道。也许其他人没有遇到这个问题&#xff0c;我想是他运气好。 1、修改设备树 1)、…

解密MQTT协议:从QOS到消息传递的全方位解析

1、QoS介绍 1.1、QoS简介 使用MQTT协议的设备大部分都是运行在网络受限的环境下&#xff0c;而只依靠底层的TCP传输协议&#xff0c;并不 能完全保证消息的可靠到达。 MQTT提供了QoS机制&#xff0c;其核心是设计了多种消息交互机制来提供不同的服务质量&#xff0c;来满足…

网络安全 | 5G网络安全:未来无线通信的风险与对策

网络安全 | 5G网络安全&#xff1a;未来无线通信的风险与对策 一、前言二、5G 网络的技术特点2.1 超高速率与低延迟2.2 大容量连接与网络切片 三、5G 网络面临的安全风险3.1 网络架构安全风险3.2 设备终端安全风险3.3 应用场景安全风险3.4 用户隐私安全风险 四、5G 网络安全对策…

MyBatis知识点笔记

目录 mybatis mapper-locations的作用&#xff1f; mybatis configuration log-impl 作用&#xff1f; resultType和resultMap的区别&#xff1f; 参数 useGeneratedKeys &#xff0c;keyColumn&#xff0c;keyProperty作用和用法 取值方式#和$区别 动态标签有哪些 MyBat…