在Broker端进行消息过滤

        在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。

1.消息的Tag和Key

        对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突。Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。

2.通过Tag进行过滤

        用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效,Broker端可以在ConsumeQueue中做这种过滤,只从CommitLog里读取过滤后被命中的消息。看一下ConsumerQueue的存储格式,如图7-1所示。

图7-1 ConsumerQueue的存储格式

Consume Queue的第三部分存储的是Tag对应的hashcode,是一个定长的字符串,通过Tag过滤的过程就是对比定长的hashcode。经过hashcode对比,符合要求的消息被从CommitLog读取出来,不用担心Hash冲突问题,消息在被消费前,会对比完整的Message Tag字符串,消除Hash冲突造成的误读。

3.用SQL表达式的方式进行过滤

        使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑,如代码清单7-1所示。

代码清单7-1 在消息中增加自定义属性

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
msg.putUserProperty("b",  “hello”);

 

代码中这个消息就有了两个特殊的属性值a和b,我们用类似SQL表达式的方式对消息进行过滤,用法如下(目前只支持在PushConsumer中实现这种过滤):

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  // only subsribe messages have property a, also a >=0 and a <= 3

consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently()
{    

@Override    

public ConsumeConcurrentlyStatus consumeMessage
    (List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{        

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    

}

});

consumer.start();
 

类似SQL的过滤表达式,支持如下语法:

·数字对比,比如>、>=、<、<=、BETWEEN、=;

·字符串对比,比如=、<>、IN;

·IS NULL or IS NOT NULL;

·逻辑符号AND、OR、NOT。

支持的数据类型:

·数字型,比如123、3.1415;

·字符型,比如'abc'、注意必须用单引号;

·NULL,这个特殊字符;

·布尔型,TRUEorFALSE。

SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。

4.Filter Server方式过滤

        Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。实现过滤逻辑的示例如代码清单7-2所示。

代码清单7-2 实现过滤逻辑的代码示例

public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

 

上面代码实现了过滤逻辑,它是根据消息的“SequenceId”这个属性来过滤的,其实不一定要根据消息属性来过滤,也可以根据消息体的内容或其他特征过滤,如代码清单7-3所示。

代码清单7-3 使用FilterServer的Consumer示例

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-GroupNamecc4");
    // 使用Java代码,在服务器做消息过滤
    String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

 

在使用Filter Server的Consumer例子中,主要是把实现过滤逻辑的类作为参数传到Broker端,Broker端的Filter Server会解析这个类,然后根据match函数里的逻辑进行过滤。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/156405.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

扩散模型实战(十):Stable Diffusion文本条件生成图像大模型

推荐阅读列表&#xff1a; 扩散模型实战&#xff08;一&#xff09;&#xff1a;基本原理介绍 扩散模型实战&#xff08;二&#xff09;&#xff1a;扩散模型的发展 扩散模型实战&#xff08;三&#xff09;&#xff1a;扩散模型的应用 扩散模型实战&#xff08;四&#xff…

MySQL集群高可用架构之MMM

目录 一、MMM概述 1.1 MMM 简介 1.2 MMM高可用架构 1.3 MMM工作原理 1.4 工作流程图 二、MMM高可用双主双从架构部署 1、架构&#xff1a; 2、搭建 MySQL 多主多从模式 3、安装配置 MySQL-MMM 4、故障测试 一、MMM概述 1.1 MMM 简介 MMM&#xff08;Master-Master re…

设计模式(5)-使用设计模式实现简易版springIoc

自定义简易版springIoc 1 spring使用回顾 自定义spring框架前&#xff0c;先回顾一下spring框架的使用&#xff0c;从而分析spring的核心&#xff0c;并对核心功能进行模拟。 数据访问层。定义UserDao接口及其子实现类 public interface UserDao {public void add(); }public…

Redis7.2.3集群安装,新增节点,删除节点,分配哈希槽,常见问题

概念&#xff1a; 【Redis】高可用之三&#xff1a;集群&#xff08;cluster&#xff09; - 知乎 实操&#xff1a; Redis集群三种模式 主从模式 优势&#xff1a; 主节点可读可写 从节点只能读&#xff08;从节点从主节点同步数据&#xff09; 缺点&#xff1a; 当主节点…

U盘不能访问不一定是坏了,可能还有其他原因!U盘无法访问修复详解

当你将USB驱动器连接到计算机时,系统会将其识别为可移动驱动器,并启动其文件管理过程。 然而,用户现在注意到,即使可以检测到他们的USB驱动器,也无法访问它们。 如果幸运的话,拔下插头就能解决问题,但如果不是,请继续阅读以了解更多故障排除选项。 USB闪存驱动器是便…

开源电子画册源码系统 可重复利用 适合任何行业 带完整的搭建教程

电子画册&#xff0c;又称电子样本、电子商刊、电子杂志&#xff0c;是一种集合图片处理、文案策划、音乐加工、视频、统计调查、虚拟现实、三维动画等多种技术和表现形式为一体的多媒体画册&#xff0c;电子杂志是纸质印刷画册&#xff08;样本&#xff09;的升级版本&#xf…

千兆光模块和万兆光模块需要注意哪些事项

随着网络通信技术的发展&#xff0c;千兆光模块和万兆光模块已经成为了网络设备中不可或缺的关键组件。光模块的制造涉及到许多技术和工艺问题&#xff0c;需要严格的控制和管理。本文将从工艺流程、材料选用、测试认证等方面&#xff0c;详细介绍制造千兆光模块和万兆光模块需…

2023年中国恒温蜡疗仪发展趋势分析:应用前景存有很大发展与探索空间[图]

恒温电蜡疗仪可将蜡熔化&#xff0c;利用蜡自身特点&#xff0c;能阻止热的传导、散热慢、气体和水分不易消失&#xff0c;保温性能优越。利用蜡能紧密贴于体表的可塑性&#xff0c;可加入其他药物协同进行治疗&#xff0c;也可将中药与蜡疗有机地结合在一起&#xff0c;产生柔…

为什么嵌入式没有35岁危机?

为什么嵌入式没有35岁危机? 在当今数字化时代&#xff0c;IT行业变化迅速&#xff0c;技术的更新迭代速度惊人。然而&#xff0c;有一个技术领域却能够在这个竞争激烈的行业中稳步前行&#xff0c;而且不受35岁危机所困扰&#xff0c;那就是嵌入式技术。 嵌入式技术是指将计算…

三羊马-001317 三季报分析(20231116)

三羊马-001317 基本情况 公司名称&#xff1a;三羊马(重庆)物流股份有限公司 A股简称&#xff1a;三羊马 成立日期&#xff1a;2005-09-06 上市日期&#xff1a;2021-11-30 所属行业&#xff1a;装卸搬运和运输代理业 周期性&#xff1a;0 主营业务&#xff1a;通过公铁联运方式…

11.16堆的一些性质与操作

1016 7&#xff0c;5&#xff0c;4&#xff0c;3&#xff0c;2&#xff0c;6&#xff0c;1 7&#xff0c;4&#xff0c;6&#xff0c;1&#xff0c;3&#xff0c;2&#xff0c;5 没有度为1的结点说明为满树 A.哈夫曼树一定没有度为1的结点。最大堆可能有度为1的结点 D.哈夫曼…

GAT里面的sofamax函数的实现:

1.sofamx 公式&#xff1a; 2. GAT里的sofamax函数的实现&#xff1a; 1. 因为指数在x轴正轴爆炸式地快速增长&#xff0c;如果zi比较大&#xff0c;exp⁡(zi)也会非常大&#xff0c;得到的数值可能会溢出。溢出又分为下溢出&#xff08;Underflow&#xff09;和上溢出&#x…

计算机毕设 深度学习 机器学习 酒店评价情感分析算法实现

文章目录 0 前言概述项目所需模块数据数据说明字段说明 数据处理分词处理停用词处理样本均衡建立多层感知机分类模型训练模型网络检测率以及检测结果 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&a…

2023最新版JavaSE教程——第8天:面向对象编程(高级)

目录 一、关键字&#xff1a;static1.1 类属性、类方法的设计思想1.2 static关键字1.3 静态变量1.3.1 语法格式1.3.2 静态变量的特点1.3.3 举例1.3.4 内存解析 1.4 静态方法1.4.1 语法格式1.4.2 静态方法的特点1.4.3 举例 1.5 练习 二、单例(Singleton)设计模式2.1 设计模式概述…

Kubernetes学习-概念2

参考&#xff1a;关于 cgroup v2 | Kubernetes 关于 cgroup v2 在 Linux 上&#xff0c;控制组约束分配给进程的资源。 kubelet 和底层容器运行时都需要对接 cgroup 来强制执行为 Pod 和容器管理资源&#xff0c; 这包括为容器化工作负载配置 CPU/内存请求和限制。 Linux 中…

⑧【MySQL】数据库查询:内连接、外连接、自连接、子查询、多表查询

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 内连接、外连接、自连接、子查询、多表查询 ⑧…

蓝凌OA sysUiComponent 任意文件上传漏洞复现

0x01 产品简介 蓝凌核心产品EKP平台定位为新一代数字化生态OA平台&#xff0c;数字化向纵深发展&#xff0c;正加速构建产业互联网&#xff0c;对企业协作能力提出更高要求&#xff0c;蓝凌新一代生态型OA平台能够支撑办公数字化、管理智能化、应用平台化、组织生态化&#xff…

Azure 机器学习:使用 Azure 机器学习 CLI、SDK 和 REST API 训练模型

目录 环境准备克隆示例存储库 示例案例在云中训练1.连接到工作区PythonAzure CLIREST API 2. 创建用于训练的计算资源4. 提交训练作业PythonAzure CLIREST API 注册已训练的模型PythonAzure CLIREST API Azure 机器学习提供了多种提交 ML 训练作业的方法。 在本文中&#xff0c…

利用 Kubernetes 降本增效?EasyMR 基于 Kubernetes 部署的探索实践

Kubernetes 是用于编排容器化应用程序的云原生系统。最初由 Google 创建&#xff0c;如今由 Cloud Native Computing Foundation&#xff08;CNCF&#xff09;维护更新。 Kubernetes 是市面上最受欢迎的集群管理解决方案之一。它自动化容器化应用程序的部署、扩展和管理&#…

解决公网下,k8s calico master节点无法访问node节点创建的pod

目的&#xff1a;解决pod部署成功后&#xff0c;只能在node节点访问&#xff0c;而master节点无法访问 原因&#xff1a;集群搭建时&#xff0c;没有配置公网进行kubectl操作&#xff0c;从而导致系统默认node节点&#xff0c;使用内网IP加入k8s集群&#xff01;如下&#xff…