如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费

文章目录

      • 准备工作
      • 项目依赖
      • 配置 RocketMQ
      • 生产批量消息
      • 消费批量消息
      • 测试批量消息发送和消费
      • 总结
      • 推荐阅读文章

RocketMQ 是一款分布式消息队列,支持高吞吐、低延迟的消息传递。对于需要一次处理多条消息的场景,RocketMQ 提供了批量消费的机制,这篇文章将展示如何在 Spring Boot 中实现这一功能。

准备工作

在开始之前,请确保你已经安装和配置好 RocketMQ。如果还没安装,请参考 RocketMQ 官网 获取安装指南。

项目依赖

首先,我们需要在 Spring Boot 项目中添加 RocketMQ 的依赖。打开 pom.xml 文件,添加以下内容:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

这个依赖包包含了与 RocketMQ 集成所需的所有内容。

配置 RocketMQ

application.yml 文件中添加 RocketMQ 的相关配置:

rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: batchConsumerGroup
  producer:
    group: batchProducerGroup
  • name-server:RocketMQ 服务的地址
  • consumer.group:消息消费的分组
  • producer.group:消息生产的分组

确保 name-server 地址是正确的,指向你的 RocketMQ 服务。

生产批量消息

创建一个消息生产者,用于发送批量消息。以下是 BatchProducer.java 的示例代码:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class BatchProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessages() {
        List<Message<String>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload("Hello RocketMQ " + i).build();
            messages.add(message);
        }
        
        rocketMQTemplate.syncSend("BatchTopic", messages, 10000);
        System.out.println("批量消息发送成功!");
    }
}
  • 这里,我们创建了 10 条消息并将它们添加到列表 messages 中。
  • 调用 rocketMQTemplate.syncSend 方法将消息批量发送到主题 BatchTopic

消费批量消息

接下来,我们创建一个消息消费者,用于批量消费消息。以下是 BatchConsumer.java 的示例代码:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RocketMQMessageListener(topic = "BatchTopic", consumerGroup = "batchConsumerGroup", selectorExpression = "*", consumeMessageBatchMaxSize = 10)
public class BatchConsumer implements RocketMQListener<List<String>> {

    @Override
    public void onMessage(List<String> messages) {
        System.out.println("批量接收到消息:");
        messages.forEach(message -> System.out.println("消息内容:" + message));
    }
}

在这段代码中:

  • @RocketMQMessageListener 注解用于标识这是一个 RocketMQ 的消息监听器,指定了监听的主题 BatchTopic 和消费分组 batchConsumerGroup
  • consumeMessageBatchMaxSize = 10 表示每次批量消费最多 10 条消息。
  • onMessage 方法会处理接收到的消息列表,并逐条打印出消息内容。

测试批量消息发送和消费

创建一个简单的 Spring Boot 控制器,用于触发批量消息发送。以下是 MessageController.java 的代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private BatchProducer batchProducer;

    @GetMapping("/sendBatchMessages")
    public String sendBatchMessages() {
        batchProducer.sendBatchMessages();
        return "批量消息已发送";
    }
}

通过访问 http://localhost:8080/sendBatchMessages 触发消息发送。

  • 调用这个接口会将批量消息发送到 RocketMQ 主题 BatchTopic
  • BatchConsumer 会自动接收并批量处理这些消息。

总结

我们成功在 Spring Boot 中实现了 RocketMQ 的批量消息发送与消费:

  1. 使用 BatchProducer 类批量发送消息。
  2. 使用 BatchConsumer 类批量消费消息,并设置最大批量大小。
  3. 通过简单的 REST API 控制消息发送,确保一切顺利。

批量消息处理可以提高消息传递的效率,适合高并发场景。这种方式可以减少网络开销,并有效利用系统资源。

推荐阅读文章

  • 由 Spring 静态注入引发的一个线上T0级别事故(真的以后得避坑)

  • 如何理解 HTTP 是无状态的,以及它与 Cookie 和 Session 之间的联系

  • HTTP、HTTPS、Cookie 和 Session 之间的关系

  • 什么是 Cookie?简单介绍与使用方法

  • 什么是 Session?如何应用?

  • 使用 Spring 框架构建 MVC 应用程序:初学者教程

  • 有缺陷的 Java 代码:Java 开发人员最常犯的 10 大错误

  • 如何理解应用 Java 多线程与并发编程?

  • 把握Java泛型的艺术:协变、逆变与不可变性一网打尽

  • Java Spring 中常用的 @PostConstruct 注解使用总结

  • 如何理解线程安全这个概念?

  • 理解 Java 桥接方法

  • Spring 整合嵌入式 Tomcat 容器

  • Tomcat 如何加载 SpringMVC 组件

  • “在什么情况下类需要实现 Serializable,什么情况下又不需要(一)?”

  • “避免序列化灾难:掌握实现 Serializable 的真相!(二)”

  • 如何自定义一个自己的 Spring Boot Starter 组件(从入门到实践)

  • 解密 Redis:如何通过 IO 多路复用征服高并发挑战!

  • 线程 vs 虚拟线程:深入理解及区别

  • 深度解读 JDK 8、JDK 11、JDK 17 和 JDK 21 的区别

  • 10大程序员提升代码优雅度的必杀技,瞬间让你成为团队宠儿!

  • “打破重复代码的魔咒:使用 Function 接口在 Java 8 中实现优雅重构!”

  • Java 中消除 If-else 技巧总结

  • 线程池的核心参数配置(仅供参考)

  • 【人工智能】聊聊Transformer,深度学习的一股清流(13)

  • Java 枚举的几个常用技巧,你可以试着用用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/912894.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件缺陷等级评定综述

1. 前言 正确评估软件缺陷等级&#xff0c;在项目的生命周期中有着重要的作用&#xff1a; 指导缺陷修复的优先级和资源分配 在软件开发和维护过程中&#xff0c;资源&#xff08;包括人力、时间和资金&#xff09;是有限的。通过明确缺陷的危险等级&#xff0c;可以帮助团队合…

Linux:vim命令总结及环境配置

文章目录 前言一、vim的基本概念二、vim模式命令解析1. 命令模式1&#xff09;命令模式到其他模式的转换&#xff1a;2&#xff09;光标定位&#xff1a;3&#xff09;其他命令&#xff1a; 2. 插入模式3. 底行模式4. 替换模式5. 视图模式6. 外部命令 三、vim环境的配置1. 环境…

Obsidian的Git插件设置配置全流程 -- 简单的电脑端多平台同步方案及常见问题

Obsidian的Git插件设置配置全流程 -- 简单的电脑端多平台同步方案及常见问题 参考文章引言1. git 介绍及安装2. git 本地配置及远程仓库链接3. obsidian 的 git 插件4. 常用的使用场景和对应的命令4.1. 本地仓库已推送到远端&#xff0c;如何在另一个电脑上第一次同步4.2 多端同…

【优选算法篇】微位至简,数之恢宏——解构 C++ 位运算中的理与美

文章目录 C 位运算详解&#xff1a;基础题解与思维分析前言第一章&#xff1a;位运算基础应用1.1 判断字符是否唯一&#xff08;easy&#xff09;解法&#xff08;位图的思想&#xff09;C 代码实现易错点提示时间复杂度和空间复杂度 1.2 丢失的数字&#xff08;easy&#xff0…

Redis(3):持久化

一、Redis高可用概述 在介绍Redis高可用之前&#xff0c;先说明一下在Redis的语境中高可用的含义。   我们知道&#xff0c;在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、9…

高亚科技签约酸动力,助力研发管理数字化升级

近日&#xff0c;中国企业管理软件资深服务商高亚科技与广东酸动力生物科技有限公司&#xff08;以下简称“酸动力”&#xff09;正式签署合作协议。借助高亚科技的8Manage PM项目管理软件&#xff0c;酸动力将进一步优化项目过程跟踪与节点监控&#xff0c;提升研发成果的高效…

大模型领域最值得看的 9 本新书,找到了

在人工智能革命的浪潮中&#xff0c;程序员们正站在技术变革的最前沿。本书单精选了关于人工智能在各行业应用的最新著作&#xff0c;从医疗诊断到金融风控&#xff0c;从智能制造到智慧城市&#xff0c;全面展现AI如何重塑行业生态&#xff0c;推动社会进步。通过阅读这些书籍…

加入GitHub Spark需要申请

目录 加入GitHub Spark需要申请 GitHub Spark 一、产品定位与特点 二、核心组件与功能 三、支持的AI模型 四、应用场景与示例 五、未来展望 六、申请体验 加入GitHub Spark需要申请 GitHub Spark 是微软旗下GitHub在2024年10月30日的GitHub Universe大会上推出的一款革…

Rust移动开发:Rust在iOS端集成使用介绍

iOS调用Rust 上篇介绍了 Rust移动开发&#xff1a;Rust在Android端集成使用介绍, 这篇主要看下iOS上如何使用Rust&#xff0c;Rust可以给移动端开发提供跨平台&#xff0c;通用组件支持。 该篇适合对iOS、Rust了解&#xff0c;想知道如何整合调用和编译的&#xff0c;如果想要…

【月之暗面kimi-注册/登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

2024 CSS保姆级教程四

CSS中的动画 CSS动画&#xff08;CSS Animations&#xff09;是为层叠样式表建议的允许可扩展标记语言&#xff08;XML&#xff09;元素使用CSS的动画的模块​ 即指元素从一种样式逐渐过渡为另一种样式的过程​ 常见的动画效果有很多&#xff0c;如平移、旋转、缩放等等&#…

[C++11] Lambda 表达式

lambda 表达式&#xff08;Lambda Expressions&#xff09;作为一种匿名函数&#xff0c;为开发者提供了简洁、灵活的函数定义方式。相比传统的函数指针和仿函数&#xff0c;lambda 表达式在简化代码结构、提升代码可读性和编程效率方面表现出色。 Lambda 表达式的基本语法 在…

AI4SCIENSE(鄂维南院士:再谈AI for Science)

鄂维南院士&#xff1a;再谈AI for Science_哔哩哔哩_bilibili 以往处理高维问题 量子力学&#xff1a;单变量乘积 统计学&#xff1a;旋转 AI4S 处理数据 蛋白质折叠&#xff1f; 不是纯粹的数据驱动 物理学等学科基本原理 例&#xff1a;分子动力学 数据模型 流程图 这…

接收nVisual中rabbitmq数据不成功问题排查

rabbitmq服务部署成功的情况下&#xff0c;消息对接不成功一般原因为消息发送失败&#xff0c;发送失败大多数可能为global_settings表配置错误。下面从两个方面解决消息对接不成功问题。 1.数据是否成功发送 检查global_settings表中rabbitmq发送消息配置信息是否正确 #MQS…

SpringBoot框架学习总结 及 整合 JDBC Mybatis-plus JPA Redis 我的学习笔记

SpringBoot框架学习总结 及 整合 JDBC Mybatis-plus JPA Redis 我的学习笔记 一、SpringBoot概述二、创建SpringBoot程序1. 使用maven方式创构建2. 使用Spring Initializr构建3. SpringBoot热部署4. SpringBoot的跨域处理 三、基础配置1.配置文件的作用2.配置文件格式2.yaml3.S…

【AIGC】如何通过ChatGPT轻松制作个性化GPTs应用

创建个性化的GPTs应用是一个涉及技术、设计和用户体验的过程。以下是详细步骤&#xff1a; ###1.确定应用目标和用户群体 在开始之前&#xff0c;你需要明确你的应用的目标和目标用户。这将帮助你在设计、开发和个性化方面做出相应的决策。例如&#xff0c;如果你的应用是为了…

strtok函数详解

strtok函数 strtok 函数是一个字符串分割函数&#xff0c;用于将字符串分割成一系列的标记。这个函数通过一组分隔符字符来确定标记的边界&#xff0c;每次调用都会返回字符串中的下一个标记&#xff0c;并且将原始字符串中的分隔符替换为空字符‘\0’&#xff0c;从而实际上是…

【Linux】Linux入门实操——vim、目录结构、远程登录、重启注销

一、Linux 概述 1. 应用领域 服务器领域 linux在服务器领域是最强的&#xff0c;因为它免费、开源、稳定。 嵌入式领域 它的内核最小可以达到几百KB, 可根据需求对软件剪裁&#xff0c;近些年在嵌入式领域得到了很大的应用。 主要应用&#xff1a;机顶盒、数字电视、网络…

系统管理与规划师

综合 工业化、信息化两化融合&#xff1a;战略、资源、经济、设备和技术的融合 诺兰6时期&#xff1a;&#xff08;初普控&#xff0c;整数成&#xff09;初始、普及、控制、整合、数据管理、成熟期&#xff1b;技术转型期介于控制和整合间 IT战略规划 IT战略制定&#xff1a;使…

tcpdump 是一款功能强大的网络数据包分析工具

功能概述 tcpdump 可以捕获和分析网络上传输的数据包。它允许用户在网络接口上监听经过的流量&#xff0c;并根据指定的条件&#xff08;如协议类型、源 IP 地址、目的 IP 地址、端口号等&#xff09;对数据包进行过滤和显示&#xff0c;帮助网络管理员、安全分析师和开发人员排…