大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink CEP 核心组件
  • CEP 的应用场景
  • CEP 的优势

在这里插入图片描述

超时事件提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。

FlinkCEP开发流程

  • DataSource中的数据转换为DataStream
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
  • PatternStream 经过 Select、Process 等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库。

SELECT 方法:

SingleOutputStreamOperator<PayEvent> result =
    patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
        return map.get("begin").get(0);
    }
}, new PatternSelectFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
        return map.get("pay").get(0);
    }
});

对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。

非确定有限自动机

FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
在这里插入图片描述

上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。

CEP开发流程

FlinkCEP开发流程:

  • DataSource中数据转换为DataStream、Watermark、keyby
  • 定义Pattern,并将DataStream和Pattern组合转换为PatternStream
  • PatternStream经过select、process等算子转换为 DataStream
  • 再次转换为 DataStream 经过处理后,Sink到目标库

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

案例1:恶意登录检测

找出5秒内,连续登录失败的账号
以下是数据:

new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)

整体思路

  • 获取到数据
  • 在数据源上做Watermark
  • 在Watermark上根据ID分组keyBy
  • 做出模式Pattern
  • 在数据流上进行模式匹配
  • 提取匹配成功的数据

编写代码

package icu.wzk;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;


public class FlinkCepLoginTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource<CepLoginBean> data = env.fromElements(
                new CepLoginBean(1L, "fail", 1597905234000L),
                new CepLoginBean(1L, "success", 1597905235000L),
                new CepLoginBean(2L, "fail", 1597905236000L),
                new CepLoginBean(2L, "fail", 1597905237000L),
                new CepLoginBean(2L, "fail", 1597905238000L),
                new CepLoginBean(3L, "fail", 1597905239000L),
                new CepLoginBean(3L, "success", 1597905240000L)
        );
        SingleOutputStreamOperator<CepLoginBean> watermarks = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<CepLoginBean>() {

                    @Override
                    public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<CepLoginBean>() {

                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) {
                                maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                );
        KeyedStream<CepLoginBean, Long> keyed = watermarks
                .keyBy(new KeySelector<CepLoginBean, Long>() {
                    @Override
                    public Long getKey(CepLoginBean value) throws Exception {
                        return value.getUserId();
                    }
                });
        Pattern<CepLoginBean, CepLoginBean> pattern = Pattern
                .<CepLoginBean>begin("start")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .next("next")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .within(Time.seconds(5));
        PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern);
        SingleOutputStreamOperator<CepLoginBean> process = patternStream
                .process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() {
                    @Override
                    public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) throws Exception {
                        System.out.println("map: " + map);
                        List<CepLoginBean> start = map.get("start");
                        collector.collect(start.get(0));
                    }
                });
        process.print();
        env.execute("FlinkCepLoginTest");
    }

}


class CepLoginBean {


    private Long userId;

    private String operation;

    private Long timestamp;

    public CepLoginBean(Long userId, String operation, Long timestamp) {
        this.userId = userId;
        this.operation = operation;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getOperation() {
        return operation;
    }

    public void setOperation(String operation) {
        this.operation = operation;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "CepLoginBean{" +
                "userId=" + userId +
                ", operation='" + operation + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

运行结果

可以看到程序输出:

map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}

Process finished with exit code 0

运行截图如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/874442.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

8.10Laplacian算子

实验原理 Laplacian算子也是一种用于边缘检测的技术&#xff0c;它通过查找二阶导数的零交叉点来定位边缘。 cv::Laplacian()函数是OpenCV库提供的一个用于计算图像拉普拉斯算子的函数。拉普拉斯算子是一个二阶微分算子&#xff0c;常用于图像处理中检测边缘或突变区域。它通…

揭秘!全罐喂养值得吗?高性价比主食罐头推荐

家里的五岁的公猫&#xff0c;已绝育&#xff0c;不爱喝水&#xff0c;医生建议喂湿粮。一开始还是早干晚湿&#xff0c;干粮存货都处理完后&#xff0c;就开始全罐喂养了。身边也有许多铲屎官十分好奇全罐喂养到底值不值&#xff0c;那么今天就来分享一下全罐喂养的感想和经验…

卡诺图的绘制

目录 逻辑函数的卡诺图化简 最小项卡诺图的组成 相邻最小项 卡诺图的组成 二变量卡诺图 三变量卡诺图 四变量卡诺图 卡诺图中的相邻项&#xff08;几何相邻&#xff09; 逻辑函数的卡诺图化简 最小项卡诺图的组成 相邻最小项 互为反变量的那个变量可以消去。 卡诺图的…

.json文件的C#解析,基于Newtonsoft.Json插件

目录 1. 前言 2. 正文 2.1 问题 2.2 解决办法 2.2.1 思路 2.2.2 代码实现 2.2.3 测试结果 3. 备注 1. 前言 天气晚来秋&#xff0c;这几天天气变凉了&#xff0c;各位同学注意好多穿衣服。回归正题 由于需要&#xff0c;需要将json的配置里面的调理解析出来&#xff…

RV1126采集VI视频数据流

本章节内容 这个章节主要是讲解如何通过RKMEDIA的API获取RV1126的VI视频流&#xff0c;虽然这部分在之前的课程里面讲解了很多次&#xff0c;但还是要带着大家回顾一下这部分代码。 采集VI数据的代码实现 2.1. VI模块的初始化并使能 上图是VI模块的初始化&#xff0c;这部分的…

STM32+ESP8266 WiFi连接机智云平台APP远程控制教程

本文档将介绍如何用STM32ESP8266 WiFi模块从零开始连接上机智云&#xff0c;并通过APP进行远程控制。 机智云官网&#xff1a;机智云|智能物联网操作系统 (gizwits.com) 准备&#xff1a;STM32、ESP8266、手机、可上网的WiFi。 1.创建设备 1.1 注册登陆 请自行注册账号并登陆…

ASUS华硕ROG幻16 Air 2024款锐龙AI版GA605WI,GA605WV工厂模式原厂Win11系统,含MyASUS WinRE恢复重置还原功能

适用型号&#xff1a;【GA605WI、GA605WV】&#xff0c;原装出厂Windows11系统工厂包下载 链接&#xff1a;https://pan.baidu.com/s/1IVolLwB7fddGKZY0IxOqaA?pwd62e2 提取码&#xff1a;62e2 华硕原装系统工厂安装包&#xff0c;带有MyASUS WinRE RECOVERY恢复功能、自带…

健身房|基于springboot的健身房管理系统设计与实现(附项目源码+论文+数据库)

私信或留言即免费送开题报告和任务书&#xff08;可指定任意题目&#xff09; 目录 一、摘要 二、相关技术 三、系统设计 四、数据库设计 五、核心代码 六、论文参考 七、源码获取 一、摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理…

基于node.js koa2模拟快递柜存储取出快递微信小程序

本文介绍了一个基于Node.js Koa2框架的快递柜存储和取出快递的微信小程序。首先&#xff0c;我们使用Koa2框架搭建了一个简单的后端服务器&#xff0c;用于处理微信小程序发送的请求。然后&#xff0c;我们实现了快递柜的存储和取出功能&#xff0c;用户可以通过微信小程序扫描…

分布式通信:多计算平台的任务分配

目录 1. 分布式通信 1.1 树莓派配置流程​编辑 1.2 树莓派和laptop处于同一网络​编辑 1.3 laptop配置 1.4 通信测试 1.5 分组通信 ​编辑 1.6 分布式通信测试 ​编辑参考资料 1. 分布式通信 机器人体积较小&#xff0c;采用树莓派作为控制器&#xff0c;实现传感器处…

音视频入门基础:AAC专题(1)——AAC官方文档下载

一、AAC简介 高级音频编码&#xff08;英语&#xff1a;Advanced Audio Coding&#xff0c;AAC&#xff09;是有损音频压缩的专利数字音频编码标准&#xff0c;由Fraunhofer IIS、杜比实验室、贝尔实验室、Sony、Nokia等公司共同开发。出现于1997年&#xff0c;为一种基于MPEG…

HTML生日蛋糕

目录 写在前面 完整代码 下载代码 代码分析 系列文章 写在最后 写在前面 HTML实现的生日蛋糕来喽,小编亲测,发给好友可以直接打开哦。在代码的第183行可以写下对朋友的祝福,快拿去送给你的好朋友吧! 完整代码 <!DOCTYPE html> <html lang="en"…

OAuth 2.0 授权流程详解与 FastAPI 实现

在现代网络应用中&#xff0c;OAuth 2.0 已成为授权和认证的标准协议。它允许用户将访问权限授予第三方应用&#xff0c;而无需暴露自己的用户名和密码。本文将详细介绍 OAuth 2.0 的常见授权流程&#xff0c;并展示如何在 FastAPI 中实现这些流程。 OAuth 2.0 简介 OAuth 2…

N-152基于java贪吃蛇游戏5

开发工具eclipse,jdk1.8 文档截图&#xff1a; N-152基于java贪吃蛇游戏5

强!70.3K star ! 推荐一款功能强大、开源、可视化的性能实时监控系统:Netdata

在当今复杂多变的IT环境中&#xff0c;系统性能的实时监控与分析对于确保业务连续性、系统稳定运行以及快速故障排查至关重要。随着云计算、大数据和微服务架构的普及&#xff0c;对监控系统的要求也日益增高。 今天给大家推荐一款性能监控工具为:Netdata。 它作为一款开源、…

手撕Python之正则

1.正则和re模块的联系 正则表达式是一种通用的用来简洁表达一组字符串的表达式&#xff0c;利用正则表达式可以方便快捷的匹配和筛选字符串 举个例子&#xff1a;在一堆数据中进行电话号码的寻找&#xff0c;我们需要根据电话号码的特征在这一堆数据进行电话的寻找&#xff0…

Nginx之日志切割,正反代理,HTTPS配置

1 nginx日志切割 1.1 日志配置 在./configure --prefixpath指定的path中切换进去&#xff0c;找到log文件夹&#xff0c;进去后找到都是对应的日志文件 其中的nginx.pid是当前nginx的进程号&#xff0c;当使用ps -ef | grep nginx获得就是这个nginx.pid的值 在nginx.conf中…

把哈希表换成 tire 树,居然为公司省下了几千万

你有没有想过,仅仅省下1%的计算资源,能为一家大公司带来多大的影响?你可能觉得,1%听起来微不足道,完全不值得一提。但今天我们聊一下一个技术优化点,就是关于如何通过微小的优化,Cloudflare这样的大型网络公司如何省下了大量的计算资源,背后还有不少值得我们学习的智慧…

OpenCV结构分析与形状描述符(8)点集凸包计算函数convexHull()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 查找一个点集的凸包。 函数 cv::convexHull 使用斯克拉斯基算法&#xff08;Sklansky’s algorithm&#xff09;来查找一个二维点集的凸包&#…

MATLAB实现PID参数自动整定

目录 1、项目说明 2、文件说明 1、项目说明 本项目旨在通过 MATLAB 语言实现 PID 参数的自动整定&#xff0c;并设计了一个直观易用的 GUI 界面。该系统特别适用于实验室环境下的 PID 参数自整定任务。整定的核心原则在于优化系统性能&#xff0c;使系统的衰减比尽可能接近理…