RocketMQ复杂过滤尝试

需求

消息实体,根据实体中的一个字段,决定推给多个业务系统。

例:一个点位信息Bean,这个点位信息,设备、能源、安全都有用,那么点位信息表中有适用模块标识。

点位新增 需要通知所有勾选业务系统  tag - add

点位编辑 需要新增勾选业务系统标识 tag - add ,移除勾选 tag - delete ,不变 tag - update

点位删除 通知所有勾选的系统 tag - delete

分析

1、MQ不支持同个消息,一下子发送到不同的topic。发多次也可以实现,但是多少有点抵触

2、那么发送到同一个topic下,让各个业务系统来取,那么必定需要去过滤,不然拿到不属于本业务系统的点位信息了,仅仅靠tag明显是不够的,服务端过滤可以采用SQL92方式

3、那么我随之就想到也可以在各个业务系统中过滤了,不是本业务系统的标识,直接返回。不执行相关逻辑。

本人更倾向于第二种实现

编码

尝试一:SQL92过滤实现

再broker.conf 新增配置 enablePropertyFilter=true

生产者

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point0 = new Point(0, "point0");
        Point point1 = new Point(1, "point1");
        Point point2 = new Point(2, "point2");
        Point point3 = new Point(3, "point3");
        Point point4 = new Point(4, "point4");
        Point point5 = new Point(5, "point5");
        Point point6 = new Point(6, "point6");
        Point point7 = new Point(7, "point7");
        Point point8 = new Point(8, "point8");
        Point point9 = new Point(9, "point9");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point0);
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);
        list.add(point6);
        list.add(point7);
        list.add(point8);
        list.add(point9);
 
 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者1 - 模拟业务系统1

    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 0 and 3)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }

消费者2 - 业务系统2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 4 and 6)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果

消费者1

消费者2

sql92 确实可以实现我的需求。

那么我把testValue换成各业务系统唯一标识,逗号拼接

把生产者调整一下

msg.putUserProperty("testValue","aaaa,bbbb,cccc");

客户端试了 in 不行

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and 'aaaa' in (testValue))"));

like

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue LIKE '%aaaa%')"));

不行,只支持这些关键字

in 关键字 只能变量in 特定字符集合这样表达。

思路转变一下,也不是说不能实现

msg.putUserProperty("aaaa","aaaa");
msg.putUserProperty("bbbb","bbbb");
msg.putUserProperty("cccc","cccc");

生产者 setProperty 每个业务系统一个区分开来。

消费者即可实现

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("aaaa is not null"));

尝试二:消费端逻辑处理

修改生产者

测试的点位信息加上适用系统标识(逗号分隔)

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point1 = new Point(1, "point1","1,2");
        Point point2 = new Point(2, "point2","1,2");
        Point point3 = new Point(3, "point3","1,4");
        Point point4 = new Point(4, "point4","1,4");
        Point point5 = new Point(5, "point5","1,5");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);

 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

模拟消费者一:业务系统标识为1

public class consumer {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("1")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

模拟消费者二:业务系统标识2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("2")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果 - 消费者1

测试结果 - 消费者2

结论

SQL92方式:过滤在服务端,但是功能还是局限的。如果服务器性能好压力不大,且过滤方式能满足。个人任务还是可用的

消费端逻辑处理方式:略费带宽,服务器压力小。过滤在消费端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/772728.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

摄像机反求跟踪软件/插件 Mocha Pro 2024 v11.0.2 CE Win

AE/PR/OFX/达芬奇/AVX插件 | 摄像机反求跟踪软件Mocha Pro 2024 v11.0.2 CE Win-PR模板网 Mocha Pro 软件(插件)&#xff0c;用于平面运动跟踪、3D跟踪、动态观察、对象移除、图像稳定和PowerMesh有机扭曲跟踪等功能。整合了SynthEyes核心的3D跟踪算法&#xff0c;能够快速自动…

Pluck-CMS-Pluck-4.7.16 远程代码执行漏洞(CVE-2022-26965)

前言 CVE-2022-26965 是一个影响 Pluck CMS 4.7.16 版本的远程代码执行&#xff08;RCE&#xff09;漏洞。该漏洞允许经过身份验证的用户通过 /admin.php?actionthemeinstall 的主题上传功能执行任意代码。 漏洞细节 在 Pluck CMS 的管理界面中&#xff0c;管理员可以上传主…

【数据结构】(C语言):堆(二叉树的应用)

堆&#xff1a; 此处堆为二叉树的应用&#xff0c;不是计算机中用于管理动态内存的堆。形状是完全二叉树。堆分两种&#xff1a;最大堆&#xff0c;最小堆。最大堆&#xff1a;每个节点比子树所有节点的数值都大&#xff0c;根节点为最大值。最小堆&#xff1a;每个节点比子树…

千万不要用国产BI,不然你会发现它性价比奇高——以奥威BI软件为例

在信息技术日新月异的今天&#xff0c;企业对于商业智能&#xff08;BI&#xff09;软件的选择往往陷入了一个误区&#xff1a;盲目追求国际品牌&#xff0c;却忽视了身边那些性价比极高的国产精品。如果你不慎踏入了“千万不要用国产BI”的陷阱&#xff0c;那么奥威BI软件将是…

PHP家政服务预约单开版微信小程序系统源码

&#x1f3e0; —— 便捷生活&#xff0c;从指尖开始&#x1f4aa; &#x1f308;【开篇&#xff1a;家政新风尚&#xff0c;一键触达】 在忙碌的生活节奏中&#xff0c;你是否渴望拥有一个温馨、整洁的家&#xff0c;却又苦于找不到合适的家政服务&#xff1f;现在&#xff…

C++_03

1、构造函数 1.1 什么是构造函数 类的构造函数是类的一种特殊的成员函数&#xff0c;它会在每次创建类的新对象时执行。 每次构造的是构造成员变量的初始化值&#xff0c;内存空间等。 构造函数的名称与类的名称是完全相同的&#xff0c;并且不会返回任何类型&#xff0c;也不…

对标GPT-4o!不锁区、支持手机、免费使用,Moshi来啦!

7月4日凌晨&#xff0c;法国知名开源AI研究实验室Kyutai在官网发布了&#xff0c;具备看、听、说多模态大模型——Moshi。 Moshi功能与OpenAI在5月14日展示的最新模型GPT-4o差不多&#xff0c;可以听取人的语音提问后进行实时推理回答内容。但GPT-4o的语音模式要在秋天才能全面…

适合弱电行业的项目管理软件!找企智汇软件!

随着科技的不断发展&#xff0c;弱电行业对于项目管理的需求日益增强。为满足这一需求&#xff0c;企智汇推出了一款专为弱电行业打造的工程项目管理系统。 企智汇弱电行业工程项目管理系统以其专业性、高效性和智能性&#xff0c;赢得了业界的广泛认可。该系统深入融合了弱电…

仓库管理系统

create database Warehouse_management;//建库 use Warehouse_management; 一、建表 1、管理员信息表 CREATE TABLE ManagerInfo (Mno CHAR(3) PRIMARY KEY,Mname VARCHAR(10) NOT NULL,Mgender CHAR(1) DEFAULT 男,Mbirhdate DATE,Mtelephone CHAR(11) NOT NULL,Mhiredate …

数据预处理:统计关联性分析/数据清洗/数据增强/特征工程实例

专栏介绍 1.专栏面向零基础或基础较差的机器学习入门的读者朋友,旨在利用实际代码案例和通俗化文字说明,使读者朋友快速上手机器学习及其相关知识体系。 2.专栏内容上包括数据采集、数据读写、数据预处理、分类\回归\聚类算法、可视化等技术。 3.需要强调的是,专栏仅介绍主…

C++(第四天----拷贝函数、类的组合、类的继承)

一、拷贝构造函数&#xff08;复制构造函数&#xff09; 1、概念 拷贝构造函数&#xff0c;它只有一个参数&#xff0c;参数类型是本类的引用。如果类的设计者不写拷贝构造函数&#xff0c;编译器就会自动生成拷贝构造函数。大多数情况下&#xff0c;其作用是实现从源对象到目…

Access,Trunk,Hybrid网络设备链接类型详解

带着问题找答案&#xff1a;网络链路上的数据包怎么看&#xff0c;是否携带vlan-id如何看&#xff0c;以及如何设计链接类型满足用户要求&#xff0c;请看如下解析。 第一种&#xff1a;链接类型access 无标记数据帧 第二种&#xff1a;链接类型trunk 第三种&#xf…

最新mysql打开远程访问和修改最大连接数

这里写目录标题 1.使用navicat进入命令控制板,进入use mysql;2.查询用户表3.更新user表中root用户域属性&#xff0c;%表示允许外部访问4.执行以上语句之后再执行&#xff0c;FLUSH PRIVILEGES;5. 执行授权语句修改最大连接数 1.使用navicat进入命令控制板,进入use mysql; use…

为什么写Python脚本时要加上if __name__ == ‘__main__‘?

目录 一、__name__ 的秘密 二、if __name__ __main__: 的作用 三、代码示例与案例分析 示例一&#xff1a;简单的数学工具模块 示例二&#xff1a;命令行工具 四、实际应用场景 五、进阶应用 1. 插件开发 2. 动态加载模块 3. 交互式与脚本模式切换 六、结论 在Pyth…

电商API对接流程丨从零开始快速打通电商平台数据通道

开发电商业务管理系统时&#xff0c;怎么对接电商接口呢&#xff1f;有两种方式可供选择&#xff0c;一种方式就是自己入驻想要对接的电商平台对应的开放平台&#xff0c;按照要求与流程与电商接口进行对接&#xff0c;还有一种方式就是寻找电商中台&#xff0c;通过第三方接口…

吴恩达深度学习笔记:机器学习策略(2)(ML Strategy (2)) 2.5-2.6

目录 第三门课 结构化机器学习项目&#xff08;Structuring Machine Learning Projects&#xff09;第二周&#xff1a;机器学习策略&#xff08;2&#xff09;(ML Strategy (2))2.5 数据分布不匹配时的偏差与方差的分析&#xff08;Bias and Variance with mismatched data di…

玩机进阶教程----MTK芯片杂牌机 小品牌机型解除bl锁以及root的操作步骤解析

在玩机过程中会遇到很多小品牌机型或者杂牌机类的。大多都使用mtk芯片。而且基本很少有官方线刷包。在这些机型中玩机首先我们要想办法导出系统来制作线刷包。以免后续解锁bl或者root出现未知故障可以恢复原系统。 那么对于这些机型该如何进行备份固件和root呢。通过博文可以初…

选微调、RAG还是微调+RAG?

RAG技术是一种结合了检索与生成的方法。它通常依赖于两个核心组件&#xff1a;一个大型语言模型&#xff08;如GPT-3&#xff09;和一个检索系统&#xff08;如向量数据库&#xff09;。RAG先使用检索系统从大量数据中检索出相关信息&#xff0c;然后将这些信息提供给语言模型&…

一文带你看懂什么是营销归因模型及SaaS企业的应用

在数字化时代&#xff0c;营销活动的多样性和复杂性使得评估其效果成为一项挑战。营销归因模型应运而生&#xff0c;为SaaS企业等提供了科学、系统的评估工具。本文将简要介绍什么是营销归因模型&#xff0c;阐述其带来的好处&#xff0c;并探讨SaaS企业可以采用的营销归因系统…

编译rust程序,并让它依赖低版本的GLIBC库

在linux环境下编译rust程序&#xff0c;编译好的程序会依赖你当前系统的GLIBC库&#xff0c;也就是说你的程序无法在使用更低版本GLIBC库的linux系统中运行。 查看当前系统的GLIBC版本&#xff1a; strings /lib64/libc.so.6 | grep GLIBC 为了让编译的程序依赖比较低版本的GL…