Spring Boot整合RocketMQ实现延迟消息消费

导包
     <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>
添加配置信息
application配置文件
# rocketMq地址
rocketmq.name-server=106.52.60.215:9876
# 生产者分组
rocketmq.producer.group=myGroup
rocketmq.producer.topics=topic1
# 消费者分组
rocketmq.consumer.group=myGroup
# topic
rocketmq.consumer.topics=topic1
# 表示顺序消费模式
rocketmq.consumer.consume-mode=ORDERLY
# 消费者的最大线程数,即消费消息的线程池大小。默认值为20,如果不需要处理大量的消息,可以将其调小。
rocketmq.consumer.consume-thread-max=1
# 表示每次消费消息的最大数量,即一次性消费的最大消息数。默认值为1,即每次只消费一条消息。如果需要批量消费消息,可以将其调大。但是需要注意的是,批量消费消息可能会影响消费的效率和消息的顺序性。
rocketmq.consumer.consume-message-batch-max-size=1
yml配置文件
rocketmq:
  consumer:
    consume-message-batch-max-size: 1
    consume-mode: ORDERLY
    consume-thread-max: 1
    group: myGroup
    topics: topic1
  name-server: 106.52.60.215:9876
  producer:
    group: myGroup
    topics: topic1
生产者发送消息
同步发现消息

在Spring Boot中,可以使用RocketMQTemplate来发送消息。设置消息的延迟级别,可以使用RocketMQTemplatesend(Message message, long timeout, int delayLevel)方法,其中delayLevel为延迟级别,单位为秒

RocketMQ支持18个级别的延迟时间,分别为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
 */
@Slf4j
@RestController
public class MyProducer1 {

    @Value("${rocketmq.producer.topics}")
    private String topic;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * description 同步发送延迟消息
     *
     * @param:  []
     * @return
     * @Date   2023/3/11
     */
    @GetMapping("syncSendTest")
    public void sendDelayMsg() {
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");

        // delayTimeLevel代表延迟级别  messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        int delayTimeLevel = 3;
        Message<Blog> message = MessageBuilder.withPayload(blog)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel)
                .build();

        SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 3000, delayTimeLevel);
        log.info("消息发送成功,时间:{} 发送内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
        log.info("发送结果:{}", sendResult);
    }
}
异步发送消息(推荐)
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
 */
@Slf4j
@RestController
public class DemoProducers {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * description 延迟消息发送
     *
     * @param:  [user]
     * @return
     * @Date   2023/3/11
    */
    @RequestMapping("/asyncSendTest")
     public  String asyncSendTest(){
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");
        // 构建消息体
        Message<Blog> msg = MessageBuilder.withPayload(blog).build();
        rocketMQTemplate.asyncSend("topic1", msg, new SendCallback() {
            // 发送成功
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // 发送失败
            @Override
            public void onException(Throwable throwable) {
                log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // ps:3 代表第三个延迟10s   延迟级别:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        },3000,3);

        return "发送成功";
    }
}
消息消费者
import com.songzixain.Blog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * <p>
 * Description: 消息生产者
 * </p>
 *
 * @author 
 * @version 
 * @create 
 * @see 
@Slf4j
@RestController
public class MyProducer2 {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.topics}")
    private String topic;

    /**
     * description 延迟消息发送
     * 在上面的代码中,我们使用了RocketMQTemplate的syncSend方法来发送消息。
     * 其中,第一个参数是消息的主题,第二个参数是消息内容,第三个参数是延迟时间(单位为毫秒)
     * ,第四个参数是发送消息的重试次数。
     * @param:  [user]
     * @return
     * @Date   2023/3/11
    */
    @RequestMapping("/asyncSendTest")
     public  String asyncSendTest(){
        Blog blog = new Blog();
        blog.setBlogName("余十步");
        blog.setUrl("yushibu");
        // 构建消息体
        Message<Blog> msg = MessageBuilder.withPayload(blog).build();
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            // 发送成功
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("消息发送成功,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // 发送失败
            @Override
            public void onException(Throwable throwable) {
                log.info("消息发送失败,时间:{} 发送内容:{}",  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), blog.toString());
            }
            // ps:3 代表第三个延迟10s   延迟级别:"messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        },3000,4);

        return "发送成功";
    }
}
启动测试

启动请求:http://localhost:8081/asyncSendTest

控制台打印

可以看到,消息生产者设置的延迟级别是3,对应延迟了10秒钟

延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是,顺序消费模式下,同一个消费者组内的消费者只会有一个线程消费同一个队列中的消息,这样才能保证消息的顺序性。

通过以上步骤,就可以使用RocketMQ实现消息延迟功能了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/747369.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据采集与预处理复习资料

目录 第一章 简答 1.简述Hadoop各个组件及其功能 2.Hadoop在大数据技术体系中的地位和作用&#xff08;来自文心一言&#xff09; 3.Hadoop 启动命令&#xff0c;停止命令 4.pig 加载HDFS 数据 5.数据采集的方法&#xff08;来自ppt&#xff09; 6.数据分析过程&#xf…

GO sync包——读写锁

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

引导过程与服务器控制

一、引导过程 1.开机自检 服务器主机开机以后&#xff0c;将根据主板 BIOS 中的设置对 CPU&#xff08;Central Processing Unit&#xff0c; 中央处理器&#xff09;、内存、显卡、键盘等设备进行初步检测&#xff0c;检测成功后根据预设的启动顺序移 交系统控制权&#xff0c…

ChatGPT在程序开发中的应用:提升生产力的秘密武器

在当今飞速发展的科技时代&#xff0c;程序开发已经成为许多企业和个人必不可少的技能。然而&#xff0c;编写代码并非总是顺风顺水&#xff0c;面对复杂的算法、繁琐的调试、持续不断的需求变更&#xff0c;程序员们常常感到压力山大。在这种情况下&#xff0c;ChatGPT应运而生…

C#学习系列之DataGrid无故添加空行

C#学习系列之DataGrid无故添加空行 前言解决前解决后总结 前言 采用别人的轮子&#xff0c;想在基础上改界面&#xff0c;但是copy后&#xff0c;无论怎么样都会有空行&#xff0c;实在是绑定数据的输入没有任何赋值。 解决前 绑定的数据中输入三组数据&#xff0c;但是没有第…

【osgEarth】Ubuntu 22.04 源码编译osgEarth 3.5

下载源代码 git clone --depth1 https://dgithub.xyz/gwaldron/osgearth -b osgearth-3.5 下载子模块 git submodule update --init 如果下载不过来&#xff0c;就手动修改下.git/config文件&#xff0c;将子模块的地址替换成加速地址 (base) yeqiangyeqiang-Default-string…

openlayers性能优化——开启图层预加载、减少空白等待时间

使用切片图层时、地图拖拽会有空白图片&#xff0c;为了减少空白等待时间&#xff0c;我们可以开始图层预加载。 const map_top new Map({layers: [new TileLayer({preload:Infinity, //预加载source: new StadiaMaps({layer: "outdoors",}),}),],target: "ma…

点云处理实战 PCL求解点云表面曲率

目录 一、什么是曲率 二、曲率计算过程 三、pcl 求解点云局部曲率 四、思考?为何曲率计算会使用协方差矩阵? 五、推荐阅读 一、什么是曲率 曲率是几何学中用来描述曲线或曲面形状变化的一个量。它反映了曲线或曲面的弯曲程度。在不同的上下文中,曲率的定义和计算方式有…

uniapp 微信小程序端使用百度地图API

1、登录百度地图开放平台 https://lbsyun.baidu.com/&#xff08;没有账号则先去创建一个百度账号&#xff09; 2、进入百度地图开放平台控制台&#xff08;导航栏“控制台”&#xff09;&#xff0c;点击“应用管理”-“我的应用” 3、选择“创建应用”&#xff0c;应用模块选…

LLM大模型算法学习资源持续整理

文章目录 waytoagiLLM101llm-coursellm-cookbook waytoagi 飞书文档写的AGI知识库。 https://www.waytoagi.com/ LLM101 karpathy更新中的大模型教程&#xff1a; https://github.com/karpathy/LLM101n llm-course Course to get into Large Language Models (LLMs) wi…

摄影师危!AI绘画即将降维打击摄影行业

你还以为AI绘画影响的只是插画师行业吗&#xff1f;错了&#xff0c;摄影行业也即将面临技术洗牌 话不多说&#xff0c;先看一下这几张图 你能一眼看出这是AI画的迪丽热巴吗&#xff1f; 你是不是还以为AI绘画只能画点动漫艺术风格&#xff1f;那你就低估了AI的发展速度&…

实战|记一次java协同办公OA系统源码审计

前言 因为笔者也是代码审计初学者&#xff0c;写得不好的地方请见谅。该文章是以项目实战角度出发&#xff0c;希望能给大家带来启发。 审计过程 审计思路 1、拿到一个项目首先要看它使用了什么技术框架&#xff0c;是使用了ssh框架&#xff0c;还是使用了ssm框架&#xff…

Redis 学习笔记(2)

目录 1 Redis的持久化1.1 RDB持久化方案1.2 AOF持久化方案 2 Redis架构2.1 主从复制架构2.2 哨兵集群设计2.3 哨兵集群设计 3 Redis事务机制4 Redis过期策略与内存淘汰机制4.1 过期策略4.2 内存淘汰机制 5 Redis高频面试题4.1 缓存穿透4.2 缓存击穿4.3 缓存雪崩 1 Redis的持久化…

防火墙虚拟系统

防火墙虚拟系统 防火墙虚拟系统的应用场景 大中型企业的网络隔离 通过防火墙的虚拟系统将网络隔离为研发部门、财经部门和行政部门。各部门之间可以根据权限互相访问&#xff0c;不同部门的管理员权限区分明确。 云计算中心的安全网关 通过配置虚拟系统&#xff0c;可让部署…

论文生成新纪元:探索顶尖AI写作工具的高效秘诀

在学术探索的征途中&#xff0c;AI论文工具本应是助力前行的风帆&#xff0c;而非让人陷入困境的漩涡。我完全理解大家在面对论文压力的同时&#xff0c;遭遇不靠谱AI工具的沮丧与无奈。毕竟&#xff0c;时间可以被浪费&#xff0c;但金钱和信任却不可轻弃。 作为一名资深的AI…

昇思25天学习打卡营第3天|onereal

前几天不能运行代码&#xff0c;经过排查是因为我的浏览器是搜狗的&#xff0c;换成Chrome问题解决了。按照提示学习了《应用实践/计算机视觉/FCN图像语义分割.ipynb》并且尝试运行代码&#xff0c;开始训练&#xff0c;最后看到图片变化。 网络流程 FCN网络的流程如下图所示&…

机器学习算法(二):1 逻辑回归的从零实现(普通实现+多项式特征实现非线性分类+正则化实现三个版本)

文章目录 前言一、普通实现1 数据集准备2 逻辑回归模型3 损失函数4 计算损失函数的梯度5 梯度下降算法6 训练模型二、多项式特征实现非线性分类1 数据准备与多项式特征构造2 逻辑回归模型三、逻辑回归 --- 正则化实现1 数据准备2 逻辑回归模型3 正则化损失函数4 计算损失函数的…

关于服务器的一些知识

1. 云服务器 和 轻量应用服务器 腾讯云中的"云服务器"&#xff08;Cloud Virtual Machine, CVM&#xff09;和"轻量应用服务器"&#xff08;Lite Cloud Server&#xff09;都是提供云端计算资源的服务&#xff0c;但它们在定位、特性和使用场景上存在一些差…

超越边界:探索深度学习的泛化力量

深度学习的泛化能力 一. 简介1.1 深度学习的定义1.2 什么是泛化能力1.3 深度学习模型的泛化能力1.4 提升深度学习模型的泛化能力 二. 泛化能力的重要性2.1 深度学习中泛化能力的作用2.1.1 防止过拟合2.1.2 处理噪声和不完整数据2.1.3 对于数据分布的变化具有适应性 2.2 泛化能力…

双指针dd d df f

像二分这样的算法&#xff0c;我们甚至可以不用管&#xff0c;直接在问题空间之内搜索&#xff0c;但是双指针也非常好用&#xff0c;帮助我们来减少枚举对象&#xff0c;我们来总结一下这经典的三个题目&#xff1a; 最长上升不重复子序列活动 - AcWings 首先一定要写…