RocketMQ快速入门:集成spring, springboot实现各类消息消费(七)附带源码

0. 引言

rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。

1. java client实现消息消费

1、添加依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>

1.1 Push消息消费

rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器

rocektmq的监听器支持2种:

  • MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
  • MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费

这里虽然还有MessageListener类型,实际上是上述两种的父类,该方法也被弃用了
在这里插入图片描述
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage方法
在这里插入图片描述
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_test", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    try {
                        String messageBody = new String(msg.getBody(), "utf-8");
                        System.out.println(topic+":"+messageBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者实例
        consumer.start();

        Thread.sleep(10000);
    }

当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器

@Component
public class Consumer1PushListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            String topic = msg.getTopic();
            try {
                String messageBody = new String(msg.getBody(), "utf-8");
                System.out.println(topic+":"+messageBody);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @PostConstruct
    public void init(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册监听器
        consumer.registerMessageListener(this);
        try{
            // 设置topic
            consumer.subscribe("topic_test", "*");
            // 启动示例
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("rocketmq 消费者启动失败");
        }
    }
}

我们启动项目,发送一条消息,会发现消费者可以实时消费

在这里插入图片描述
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

1.2 Pull消息消费

pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer,其下有pull方法
在这里插入图片描述
官方给出的示例代码如下:

public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        try {
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(0);
            mq.setTopic("topic_test");
            mq.setBrokerName("Broker");
            long offset = 26;
            PullResult pullResult = consumer.pull(mq, "*", offset, 32);
            if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                System.out.printf("%s%n", pullResult.getMsgFoundList());
                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        consumer.shutdown();
    }

但是截止目前,该类已经被弃用了
在这里插入图片描述
更加推荐的是用DefaultLitePullConsumer类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量

public static void main(String[] args) throws MQClientException {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("topic_test", "*");
        consumer.start();
        try {
            List<MessageExt> messageList = consumer.poll(3000);
            for (MessageExt message : messageList) {
                System.out.println("pull消费:"+new String(message.getBody()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        consumer.shutdown();
    }

发送几条消息,运行测试
在这里插入图片描述
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码

1.3 顺序消息消费

rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently)和顺序消费(MessageListenerOrderly

并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理

我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly监听器

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_order", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
              @Override
              public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                  byte[] body = list.get(0).getBody();
                  System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));
                  return ConsumeOrderlyStatus.SUCCESS;
              }
        });

        // 启动消费者实例
        consumer.start();

        Thread.sleep(10000);
    }

2. springboot实现消息消费

1、添加依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

2、修改配置项

rocketmq:
  name-server: localhost:9876
  producer:
    group: group_test # 生产者分组,事务消息会使用
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

2.1 push消息消费

通过实现RocketMQListener<T>接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型

将监听器声明为bean,并实现onMessage方法即可

@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:" + s);
    }
}

注解中的messageModel属性可以用来设置消息模式,默认为集群模式

在这里插入图片描述

2.2 pull消息消费

添加消费者配置

rocketmq:
  name-server: localhost:9876
  consumer:
    group: "group_test"
    topic: "topic_test"

通过receive方法实现消费

 @GetMapping(value = "/poll")
    public void poll() {
        List<String> list = rocketMQTemplate.receive(String.class);
        for (String message : list) {
            System.out.println("poll消费:"+message);
        }
    }

2.3 顺序消息消费

与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("顺序消费消息:" + s);
    }
}

3. 总结

消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。

本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/728889.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

运维 Tips | IT工程师常用的8个USB引导启动器工具

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 【导语】本指南旨在深入探讨Linux上可用的前六个工具&#xff0c;以及Windows上使用两个U盘启动器生成及刻录工具&#xff0c;创建USB引导启动器用于引导系统ISO文件加载到计算机中&#xff0c;从…

LInux驱动开发笔记(十)SPI子系统及其驱动

文章目录 前言一、SPI驱动框架二、总线驱动2.1 SPI总线的运行机制2.2 重要数据结构2.2.1 spi_controller2.2.2 spi_driver2.2.3 spi_device2.2.4 spi_transfer2.2.5 spi_message 三、设备驱动的编写3.1 设备树的修改3.2 相关API函数3.2.1 spi_setup( )3.2.2 spi_message_init( …

在windows 台式机电脑部署GLM4大模型

参考这篇文章在windows笔记本电脑部署GLM4大模型_16g显卡本地部署glm4-CSDN博客 我的环境&#xff08;PC台式机电脑&#xff1a; 处理器 Intel(R) Core(TM) i9-14900K 3.20 GHz 机带 RAM 32.0 GB (31.8 GB 可用)、32G内存、NVIDIA RTX4080&#xff08;16G&#xff09;…

深入理解Open vSwitch(OVS):原理、架构与操作

一、引言 随着云计算和虚拟化技术的不断发展&#xff0c;网络虚拟化成为了构建灵活、可扩展网络架构的关键技术之一。Open vSwitch&#xff08;OVS&#xff09;作为一种功能强大的开源虚拟交换机&#xff0c;被广泛应用于云计算和虚拟化环境中&#xff0c;为虚拟机提供高效、灵…

前端调试技巧

1、利用console打印日志 2、利用debugger关键字&#xff0c;浏览器f12调用到方法debugger处会断点住&#xff0c;可以利用浏览器调试工具查看变量 a.监视表达式可以添加想要观察的变量 b.调用堆栈可以观察方法调用链 3、xhr断点 请求地址包含v1.0/banner_theme/pagelist&a…

预制舱变电站高压室巡检机器人系统

一、背景 预制舱变电站高压室由于空间狭小、设备紧凑&#xff0c;传统的巡检方式往往需要人工进入高压室进行巡检&#xff0c;不仅存在安全风险&#xff0c;而且巡检效率低下&#xff0c;难以满足日益增长的电力设备运维需求。 二、预制舱高压室巡检机器人系统 预制舱高压室巡…

express+vue在线im实现【四】

往期内容 expressvue在线im实现【一】 expressvue在线im实现【二】 expressvue在线im实现【三】 本期示例 本期总结 支持了音频的录制和发送&#xff0c;如果觉得对你有用&#xff0c;还请点个免费的收藏与关注 下期安排 在线语音 具体实现 <template><kl-dial…

【总结】ui自动化selenium知识点总结

1. 大致原理 首页安装第三方库selenium库&#xff0c; 其次要下载好浏览器驱动文件&#xff0c;比如谷歌的 chromedriver.exe&#xff0c;配置上环境变量。 使用selenium的webdriver类去创建一个浏览器驱动对象赋值叫driver&#xff0c;一个浏览器驱动对象就可以 实现 对浏…

【PHP项目实战训练】——使用thinkphp框架对数据进行增删改查功能

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

国内怎样使用GPT4 turbo

GPT是当前最为熟知的大模型&#xff0c;它优越的性能一直遥遥领先于其它一众厂商&#xff0c;然而如此优秀的AI在中国境内却是无法正常使用的。本文将告诉你4种使用gpt4的方法&#xff0c;让你突破限制顺利使用。 官方售价是20美元/月&#xff0c;40次提问/3小时&#xff0c;需…

嵌入式系统软件开发环境_2.一般架构

1.Eclipse框架 嵌入式系统软件开发环境是可帮助用户开发嵌入式软件的一组工具的集合&#xff0c;其架构的主要特征离不开“集成”问题&#xff0c;采用什么样的架构框架是决定开发环境优劣主要因素。Eclipse框架是当前嵌入式系统软件开发环境被普遍公认的一种基础环境框架。目…

vscode插件开发之 - TestController

TesController概要介绍 TestController 组件是用于实现自定义测试框架和集成测试结果的。它允许开发者定义自己的测试运行器&#xff0c;以支持在VSCode中运行和展示测试。以下是一些使用 TestController 组件的主要场景&#xff1a; 自定义测试框架&#xff1a;如果你正在开发…

深度学习算法informer(时序预测)(三)(Encoder)

一、EncoderLayer架构如图&#xff08;不改变输入形状&#xff09; 二、ConvLayer架构如图&#xff08;输入形状中特征维度减半&#xff09; 三、Encoder整体 包括三部分 1. 多层EncoderLayer 2. 多层ConvLayer 3. 层归一化 代码如下 class AttentionLayer(nn.Module):de…

世界奇观短视频制作,AI加持,新手也能月入上万

在这个数字化的时代&#xff0c;短视频已经成为了人们获取信息和娱乐的重要途径。特别是那些展示世界奇观的短视频&#xff0c;如极端的气候、危险的动物、美丽的自然景观等&#xff0c;这些主题具有很强的吸引力&#xff0c;能够引起观众的兴趣和好奇心。那么&#xff0c;如何…

运算放大器(运放)反相放大器电路

运算放大器(运放)反相放大器电路 设计目标 输入ViMin输入ViMax输出VoMin输出VoMax频率f电源Vcc电源Vee–7V7V–14V14V3kHz15V–15V 设计说明 该设计将输入信号 Vi 反相并应用 –2V/V 的信号增益。输入信号通常来自低阻抗源&#xff0c;因为该电路的输入阻抗由输入电阻器 R1…

深度学习神经网络协同过滤模型(NCF)与用户协同过滤(UCF)的区别

一、效果图 点我查看在线demo 二、启发式推荐系统 推荐系统的核心是根据用户的兴趣需求&#xff0c;给用户推荐喜欢的内容。常用的推荐算法有启发式推荐算法&#xff0c;可分为基于用户的 协同过滤&#xff0c;基于物品的协同过滤。 1、基于用户的协同过滤&#xff08;UCF…

【云岚到家】-day04-1-数据同步方案-Canal-MQ

【云岚到家】-day04-1-数据同步方案-Canal-MQ 1 服务搜索1.1 服务搜索技术方案1.1.1 需求分析1.1.2 技术方案1.1.2.1 使用Elasticsearch进行全文检索1.1.2.2 索引同步方案 1.1.3 CanalMQ1.1.3.1 MySQL主从数据同步1.1.3.2 Canal工作流程1.1.3.3 具体实现方案 1.2 MQ技术方案1.2…

Linux连接工具MobaXterm详细使用教程

目录 一、MobaXterm的下载 1、访问官网 2、下载便携版 3、启动MobaXterm 二、MobaXterm基本使用设置 1、新建会话 2、使用ssh连接第一个会话 3、设置主密码 4、主界面 5、sftp文件上传下载 6、文件拖拽的上传下载 7.右键粘贴 8、查看服务器监测信息​编辑 9、个…

文件扫描工具哪个好?便捷的文件扫描工具推荐

对于初入职场的大学毕业生&#xff0c;申请就业补贴是一项不可忽视的福利。 它不仅能够为新生活带来经济上的缓解&#xff0c;也有助于职业生涯的顺利起步。面对申请过程中需提交的文件&#xff0c;如纸质劳动合同&#xff0c;不必烦恼。市面上众多文件扫描软件能助你一臂之力…

Oracle最终还是杀死了MySQL

起因 大约15年前&#xff0c;Oracle收购了Sun公司&#xff0c;从而也拥有了MySQL&#xff0c;互联网上关于Oracle何时会“扼杀MySQL”的讨论此起彼伏。 当时流传着各种理论&#xff1a;从彻底扼杀 MySQL 以减少对 Oracle 专有数据库的竞争&#xff0c;到干掉 MySQL 开源项目&…