Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.roy</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0</version>

    <properties>
        <flink.version>1.12.5</flink.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- CEP主要是下面这个依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(
	new PatternProcessFunction<Event, Result>() {
	@Override
	public void processMatch(
		Map<String, List<Event>> pattern,
		Context ctx,
		Collector<Result> out) throws Exception {
	}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配
 */
public class MyUserLoginAna {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
        env.getConfig().setAutoWatermarkInterval(1000L);

        // 使用Socket测试
        env.setParallelism(1);
        // 1、获取原始事件流(10.86.97.206改为实际地址)
        final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);

        final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {
            @Override
            public UserLoginRecord map(String s) throws Exception {
                final String[] splitVal = s.split(",");
                return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间
                        .withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime())
        );

        // 2、定义匹配器
        // 2.1:10秒内出现3次登录失败的记录(不一定连续)
        // Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));

        // 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followBy
        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).next("two").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).next("three").where(new SimpleCondition<UserLoginRecord>() {
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
                return 1 == value.getLoginRes();
            }
        }).within(Time.seconds(10));

        // 3、获取匹配流
        final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);

        final MyProcessFunction myProcessFunction = new MyProcessFunction();
        // 4、将匹配流中的数据处理成结果数据流
        final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);
        badUserStream.print("badUser");
        env.execute("UserLoginAna");

    }// main

    public static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{

        @Override
        public void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {
            // 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }

            // 针对2.2 非连续3次登录失败
            final List<UserLoginRecord> records = match.get("three");
            for(UserLoginRecord record : records){
                out.collect(record);
            }

        }// processMarch
    }// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {
    private String userId;
    private int loginRes; // 0-成功, 1-失败
    private long loginTime;

    public UserLoginRecord() {
    }

    public UserLoginRecord(String userId, int loginRes, long loginTime) {
        this.userId = userId;
        this.loginRes = loginRes;
        this.loginTime = loginTime;
    }

    @Override
    public String toString() {
        return "UserLoginRecord{" +
                "userId='" + userId + '\'' +
                ", loginRes=" + loginRes +
                ", loginTime=" + loginTime +
                '}';
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public int getLoginRes() {
        return loginRes;
    }

    public void setLoginRes(int loginRes) {
        this.loginRes = loginRes;
    }

    public long getLoginTime() {
        return loginTime;
    }

    public void setLoginTime(long loginTime) {
        this.loginTime = loginTime;
    }
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/361367.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python完善APC netbotz 250报告功能实现主动式运维。

首先介绍一下APC netbotz 250, 这是施耐德推出的一款机架式监控主机&#xff0c;能够对所有IT环境进行经济有效而且灵活的监控&#xff0c;号称APC史上性价比最高的环境监测方案&#xff0c;这可不是我吹的&#xff0c;是APC官网的介绍&#xff0c;可参考下面的官网截图。 我们…

【原创】VMware创建子网,并使用软路由获得访问互联网的能力,并通过静态路由让上层网络访问位于虚拟机的子网

前言 一看标题就很离谱&#xff0c;确实内容也有点复杂&#xff0c;我的初衷是为后面搞软路由做准备&#xff0c;先通过VMware进行可行性验证&#xff0c;确定方案是否可行&#xff0c;再做下一步的计划。结论当然可以的&#xff0c;能通能访问&#xff0c;强的不行。 网络拓…

重大进展:国产200层存储芯片实现量产,超越国际领先企业

近日&#xff0c;中国芯片技术领域迎来了一项历史性突破&#xff1a;200层以上的存储芯片率先实现量产&#xff0c;这一成就不仅超越了国外存储芯片巨头&#xff0c;更预示着中国有望成为全球行业的领军者。 后起之秀&#xff0c;鱼跃龙门 在这场技术的赛跑中&#xff0c;中国…

“死“社群先不要扔,想办法激活一下,隔壁的运营都馋哭了

私域运营已成为当下很多企业寻求增长的标配。在这过程中&#xff0c;社群运营就是极为重要的一个环节。过去我们为了流量&#xff0c;疯狂建群拉人。但建社群容易活跃难&#xff0c;活跃一段时间后&#xff0c;社群会越来越安静。 不仅如此&#xff0c;群主和管理员也渐渐疏于…

行为型设计模式—迭代器模式

迭代器模式&#xff1a;也叫作游标模式&#xff0c;能在不暴露复杂数据结构内部细节的情况下遍历其中所有的元素。在迭代器的帮助下&#xff0c; 客户端可以用一个迭代器接口以相似的方式遍历不同集合中的元素。 当集合背后为复杂的数据结构&#xff0c;且希望对客户端隐藏其复…

双非本科准备秋招(12.2)—— 力扣栈与队列

复习一下栈和队列的基础知识&#xff0c;刷几道题上上手。 1、102. 二叉树的层序遍历 广度优先遍历嘛&#xff0c;每次拓展一个新结点&#xff0c;就把新结点加入队列&#xff0c;这样遍历完队列中的元素&#xff0c;顺序就是层序遍历。 class Solution {public List<Lis…

分布式搜索引擎_学习笔记_3

分布式搜索引擎03 0.学习目标 1.数据聚合 **聚合&#xff08;aggregations&#xff09;**可以让我们极其方便的实现对数据的统计、分析、运算。例如&#xff1a; 什么品牌的手机最受欢迎&#xff1f;这些手机的平均价格、最高价格、最低价格&#xff1f;这些手机每月的销售…

91 C++对象模型探索。RTTI运行时类型识别回顾 与 存储位置介绍

一&#xff0c;RTTI 运行时类型识别&#xff0c;简单回顾 C运行时类型识别RTTI&#xff0c;要求父类这种必须 至少有一个虚函数&#xff0c;如果父类中没有虚函数&#xff0c;那么得到的RTTI就不准确&#xff1b; RTTI就可以在执行期间查询一个多态指针&#xff0c;或者多态应…

C++:第十四讲动态规划初步

每日C知识 想要在做C小游戏里实现等待效果&#xff0c;可以用Sleep。 Sleep函数可以使计算机程序&#xff08;进程&#xff0c;任务或线程&#xff09;进入休眠&#xff0c;使其在一段时间内处于非活动状态。 一般需要头文件windows.h。 注意"Sleep"首字母要大写…

k8s Sidecar filebeat 收集容器中的trace日志和app日志

目录 一、背景 二、设计 三、具体实现 Filebeat配置 K8S SideCar yaml Logstash配置 一、背景 将容器中服务的trace日志和应用日志收集到KAFKA&#xff0c;需要注意的是 trace 日志和app 日志需要存放在同一个KAFKA两个不同的topic中。分别为APP_TOPIC和TRACE_TOPIC 二、…

LLM之Agent(十)| 本地安装Microsoft AutoGen Studio 2.0教程

2021年3月&#xff0c;微软发布了AutoGen[2]&#xff0c;这是一个使用多个代理开发LLM应用程序的框架&#xff0c;这些代理可以协作解决任务。 2024年1月&#xff0c;微软推出了一款新的应用程序&#xff0c;它名为AutoGen Studio[3]&#xff0c;可以简化AI Agent执行过程。 一…

JavaScript入门

第二个知识点&#xff1a;javascript的基本语法 定义变量 在JavaScript里面&#xff0c;没有int&#xff0c;string 之类的数据类型&#xff0c;只有 var var num 1; var string "天玄地号"; 在javascript中&#xff0c;写完一句语句之后可以不加分号&#xff…

MyBatis概述与MyBatis入门程序

MyBatis概述与MyBatis入门程序 一、MyBatis概述二、入门程序1.准备开发环境&#xff08;1&#xff09;准备数据库&#xff08;2&#xff09;创建一个maven项目 2.编写代码&#xff08;1&#xff09;打包方式和引入依赖&#xff08;2&#xff09;新建mybatis-config.xml配置⽂件…

直方图均衡化原理与代码实现

1. 简介 直方图均衡化是一种用于增强图像对比度的图像处理技术。通过调整图像的灰度级别分布&#xff0c;直方图均衡化能够使图像中的像素值更加均匀分布&#xff0c;从而增强图像的细节和对比度。 2. 原理 直方图均衡化的原理是通过调整图像的累积分布函数&#xff08;CDF&…

H2数据库学习总结

H2数据库-简介 H2 是开源的轻量级Java数据库。它可以嵌入Java应用程序中或以客户端-服务器模式运行。 H2 数据库主要可以配置为作为内存数据库运行&#xff0c;这意味着数据将不会持久存储在磁盘上。 由于具有嵌入式数据库&#xff0c;因此它不用于生产开发&#xff0c;而主要…

017 JavaDoc生成文档

什么是JavaDoc 示例 package se;/*** 用于学习Java* author Admin* version 1.0* since 1.8*/ public class Test20240119 {/*** 主方法* param args*/public static void main(String[] args) {// 你好&#xff0c;世界System.out.println("Hello world");} } 写一…

C语言实战项目<贪吃蛇>

我们这篇会使用C语言在Windows环境的控制台中模拟实现经典小游戏贪吃蛇 实现基本的功能&#xff1a; 结果如下: 1.一些Win32 API知识 本次实现呢我们会用到一些Win32 API的知识(WIN32 API也就是Microsoft Windows 32位平台的应用程序编程接口): 1)控制窗口大小 我们可以使用…

43 漏洞发现-WEB应用之漏洞探针类型利用修复

目录 已知CMS开发框架末知CMS演示案例:开发框架类源码渗透测试报告-资讯-thinkphp开发框架类源码渗透测试-咨讯-spring已知CMS非框架类渗透测试报告-工具脚本-wordpress已知CMS非框架类渗透测试报告-代码审计-qqyewu_php未知CMS非框架类渗透测试报告-人工-你我都爱的wg哦~ 已知…

Unity 自动轮播、滑动轮播

如图所示&#xff0c;可设置轮播间隔&#xff0c;可左右滑动进行轮播 1.在UGUI创建个Image&#xff0c;添加自动水平组件 2.添加并配置脚本 3.代码如下&#xff0c;都有注释 using UnityEngine; using UnityEngine.UI;public class IndicatorManager : MonoBehaviour {public …

【IM】如何保证消息可用性(二)

请先阅读第一篇&#xff1a;【IM】如何保证消息可用性&#xff08;一&#xff09; 在第一篇文章中我们了解了保证消息可用性的挑战与目标&#xff0c;现在我们来对于具体的技术方案进行探讨。 1. 上行消息 消息上行过程指的是客户端发送消息给服务端 我们需要先辨析几个概念…