系列十一(实战)、发送 接收带标签的消息(Java操作RocketMQ)

一、发送 & 接收带标签的消息

1.1、概述

        消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。

        其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。

1.2、订阅关系一致性

        订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考

订阅关系一致文档,这里不再赘述。

1.3、Demo07MQTestApp 

/**
 * @Author : 一叶浮萍归大海
 * @Date: 2023/12/25 13:03
 * @Description: 发送 & 接收带标签的消息
 */
@Slf4j
public class Demo07MQTestApp {


    /**
     * 发送带标签的消息
     */
    @Test
    public void demo7Producer() throws Exception {
        // 1、创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");

        // 2、连接NameServer
        producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);

        // 3、启动
        producer.start();

        // 4、创建消息
        String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};
        for (int i = 1; i <= 6; i++) {
            String tag = tags[i % tags.length];
            String content = "";
            switch (tag) {
                case "NBA":
                    content = "this is a message about NBA,消息编号[" + i + "]";
                    break;
                case "run":
                    content = "this is a message about run,消息编号[" + i + "]";
                    break;
                case "star":
                    content = "this is a message about star,消息编号[" + i + "]";
                    break;
                case "mobile":
                    content = "this is a message about mobile,消息编号[" + i + "]";
                    break;
                case "tourism":
                    content = "this is a message about tourism,消息编号[" + i + "]";
                    break;
                default:
                    content = "this is a message about foods,消息编号[" + i + "]";
                    break;
            }
            Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));
            // 5、发送消息
            producer.send(message);
            log.info("【demo7Producer】发送消息成功,消息内容:{}",content);
        }

        // 关闭producer
        producer.shutdown();
    }

    /**
     * 接收带标签的消息(Push方式)
     */
    @Test
    public void demo7PushConsumer1() throws Exception {
        // 1、创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");
        // 2、连接NameServer
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3、订阅消息,*表示订阅该主题所有的消息
        consumer.subscribe("tag-topic", "NBA");
        // 4、设置监听器(采用异步回调方式,一直监听)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
                }
                /**
                 * 返回值:消费消息成功与否
                 *      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
                 *      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5、启动
        consumer.start();
        log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");

        // 6、挂起当前JVM
        System.in.read();
    }

    @Test
    public void demo7PushConsumer2() throws Exception {
        // 1、创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");
        // 2、连接NameServer
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3、订阅消息,*表示订阅该主题所有的消息
        consumer.subscribe("tag-topic", "NBA || star || mobile");
        // 4、设置监听器(采用异步回调方式,一直监听)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
                }
                /**
                 * 返回值:消费消息成功与否
                 *      CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
                 *      RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5、启动
        consumer.start();
        log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");

        // 6、挂起当前JVM
        System.in.read();
    }

}

1.4、测试

        先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。

1.5、Topic和Tag如何选择

        不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:

(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;

(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;

(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;

(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;

        总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/269351.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

正式官宣!谈思AutoSec 8周年年会暨中国汽车网络安全及数据安全合规峰会将于明年4月在沪召开

随着智能互联网时代的到来&#xff0c;智能汽车的安全形势变得更加严峻和复杂&#xff0c;网络资产的暴露和安全边界继续扩大。与传统的汽车车身安全问题相比&#xff0c;网络安全、数据安全、用户隐私等安全问题交织叠加&#xff0c;并加速了黑客对智能汽车领域的渗透&#xf…

OpenHarmony之内核层解析~

OpenHarmony简介 技术架构 OpenHarmony整体遵从分层设计&#xff0c;从下向上依次为&#xff1a;内核层、系统服务层、框架层和应用层。系统功能按照“系统 > 子系统 > 组件”逐级展开&#xff0c;在多设备部署场景下&#xff0c;支持根据实际需求裁剪某些非必要的组件…

微服务-springcloud(eureka实践, nacos实践)

Spring 体系图 版本关系 eureka 实践 1 父工程依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.14</version> </parent> <dependencyManage…

VA01/VA02/VA03 销售订单根据定价和步骤校验权限隐藏价格(二)

1、文档说明 1.1、内容回顾 之前发表过相关文章《VA01/VA02/VA03 销售订单根据定价和步骤校验权限隐藏价格&#xff08;一&#xff09;》&#xff0c;本篇文章对上一篇文章做补充说明。 第一篇文章是通过拥有权限&#xff0c;则隐藏价格的模式&#xff0c;即对需要隐藏价格的…

第一届能源电子产业创新大赛太阳能光伏赛道在京顺利完成初赛评审

近日&#xff0c;第一届能源电子产业创新大赛太阳能光伏赛道初赛在北京顺利举行。本次太阳能光伏赛道赛事由工业和信息化部产业发展促进中心、宜宾市人民政府主办&#xff0c;宜宾市经济和信息化局、宜宾高新技术产业园区承办&#xff0c;中国国检测试控股集团股份有限公司协办…

天下第一铭:以此文纪念汤晓鸥

文章目录 1. Introduction2. Main3. Biography4. My ThoughtsReference彩蛋环节 1. Introduction 汤晓鸥的逝世是继孙剑博士逝世之后&#xff0c;华人在计算机视觉领域的又一损失。 以下文章为汤晓鸥教授的一篇旧文&#xff0c;我重发此文以纪念作者。 2. Main 汤晓鸥&#x…

2024 年 10大 AI 趋势

2025 年&#xff0c;全球人工智能市场预计将达到惊人的 1906.1 亿美元&#xff0c;年复合增长率高达 36.62%。 人工智能软件正在迅速改变我们的世界&#xff0c;而且这种趋势在未来几年只会加速。 我们分析了未来有望彻底改变 2024 年的 10 个AI趋势。从生成式人工智能的兴起到…

56.0/DIV+CSS 布局(详细版)

目录 56.1 本章简介 56.2 实例讲解 56.2.1 菜单制作 56.2.2 美化滚动条 56.2.3 DIV+CSS 布局 56.3 综合示例 56.3.1 总体分析 56.3.2 Header 层 56.3.3 最终代码 56.1 本章简介 本章通过几个实例讲解 DIV+CSS 的应用。 采用表格布局的页面内,为了实现设计的布局,制作者往往…

用旧电脑搭建NAS

将旧 PC 改造成家庭服务器或 NAS&#xff08;网络附加存储的缩写&#xff09; 一、使用旧 PC 作为 NAS 服务器的优势 如果您想要快速且易于使用的解决方案&#xff0c;专门构建的 NAS 可能是个不错的选择。但将旧 PC 重新利用为 NAS 服务器有其独特的优势&#xff1a; 黑苹果…

一文道破Java中的深拷贝,浅拷贝,零拷贝

前言 在Java编写代码中&#xff0c;对象的拷贝是一个常见的操作。根据拷贝的层次和方式不同&#xff0c;可以分为深拷贝、浅拷贝和零拷贝。本篇文章我们将详细介绍这三种拷贝方式的概念、实现方法以及使用场景&#xff0c;方便大佬学习及面试。 深拷贝 深拷贝是一种创建对象副…

【数据结构】——期末复习题题库(1)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

数据治理之数据标准管理

目录 一、概述什么是数据标准数据标准的作用什么是数据标准化数据标准的意义业务方面技术方面管理方面 二、数据标准管理的内容数据模型标准基础数据标准主数据和参考数据标准指标数据标准 三、数据标准管理流程数据标准梳理数据标准制定数据标准审查数据标准发布数据标准贯彻 …

Redis的安装以及使用

第一步&#xff0c;去官网下载一个压缩包到本地解压即用&#xff0c;绿色软件&#xff0c;不用其他操作&#xff0c;点击Download下载即可&#xff1a; Introduction to Redis | RedisLearn about the Redis open source projecthttps://redis.io/docs/about/第二步&#xff0…

用邮件群发软件开拓外贸客户:有效的方法与技巧

随着跨境电商业务的发展&#xff0c;企业在研发外贸客户时面临如何有效地与潜在用户沟通的挑战。电子邮件群发软件已经成为一种时兴的工具&#xff0c;帮助企业迅速推送很多电子邮件。本文将探讨电子邮件群发软件在外贸客户开发中的实际应用效果&#xff0c;并从专业角度分析其…

简单的喷淋实验--嵌入式实训

目录 喷淋实验--嵌入式实训 1.MQTT通信原理 2.MQTT库的移植 3.代码流程 运行视频如下: 喷淋实验--嵌入式实训 1.MQTT通信原理 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的发布/订阅消息传输协议&#xff0c;旨在提供可靠、高效的通信…

计算机视觉基础(12)——图像恢复

前言 我们将学习图像恢复相关知识。主要有图像恢复的定义、评价标准和实现图像恢复的方法。图像恢复任务包括图像去噪、去模糊、图像超分辨率、图像修复等&#xff1b;评价标准有峰值信噪比和结构相似性&#xff1b;图像超分辨的方法有传统方法和基于深度学习的方法&#xff1a…

three.js使用精灵模型Sprite渲染森林

效果&#xff1a; 源码&#xff1a; <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div><div class"box-right&quo…

Elasticsearch基本使用

文章目录 概要一、核心概念二、索引操作2.1 创建索引2.2 判断索引是否存在2.3 查看索引2.4 打开、关闭索引2.5 删除索引 三、映射操作3.1 创建映射字段3.2 映射属性详解3.3 查看映射关系 四、文档增删改查4.1 新增文档4.2 查看单个文档4.3 查看所有文档4.4 _source定制返回字段…

Linux进阶系列(二)——lscpu、htop、seq、shuf、sort

1. lscpu lscpu 命令是Linux系统中用来显示关于CPU架构的信息的工具。它详细展示了CPU的相关信息&#xff0c;包括型号、核心数、架构类型、缓存大小等等。 1.1 物理CPU与逻辑CPU 物理CPU指的是实际存在于硬件系统上的中央处理单元。每个物理CPU都是一个独立的处理器芯片或处…