RocketMQ消息重试机制

1 生产者重试

生产者发送消息失败会重试,可以通过参数来设置:

创建producer实例设置参数:

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

application配置文件:

rocketmq:
  name-server: 10.3.22.103:9876;10.3.22.115:9876;10.3.22.121:9876
  # Producer 生产者
  producer:
    # 异步消息重试此处,默认2
    retry-times-when-send-async-failed: 2
    # 发送消息失败重试次数,默认2
    retry-times-when-send-failed: 2

注意:

抛出特定的异常才会重试,异常的类型仅包括以下几种,system busy和broker busy这两个错误码不会重试:

2 消费者重试

2.1 顺序消息

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

2.2 无序消息

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

2.2.1 重试次数

对于无序消息(普通、延时、事务消息),消息队列 RocketMQ 默认允许每条消息最多重试 16 次,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。每次重试的间隔时间如下:

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110 秒97 分钟
230 秒108 分钟
31 分钟119 分钟
42 分钟1210 分钟
53 分钟1320 分钟
64 分钟1430 分钟
75 分钟151 小时
86 分钟162 小时


如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

可以通过以下两种方式修改重试次数:

2.2.2 设置返回状态

通过设置返回状态达到消息重试的结果,返回枚举类ConsumeConcurrentlyStatus的两个状态,来设置是否需要重试:

public class AbstractRocketMQListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            // 业务逻辑
            doConsume();
            //消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            //消费失败,重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

    }
}

2.2.3 设置重试参数maxReconsumeTimes

还有另外一种方式,继承RocketMQListener类,增加注解RocketMQMessageListener,通过设置消息的参数来监听消息,maxReconsumeTimes默认为16次,可以通过设置,修改重试的次数。

@Component
@RocketMQMessageListener(
        topic = "ORDER_RECEIVE_TOPIC",
        consumerGroup = "CONSUMER_ORDER_RECEIVE_GROUP",
        // 指定消费者线程数,默认20,生产中请注意配置,避免过大或者过小
        consumeThreadNumber = 60,
        maxReconsumeTimes = 5
)
public class AbstractRocketMQListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        try {
            //业务逻辑
            doConsume();
            return;
        } catch (CommonMsgException ce) {
            //业务异常,不需要重试,直接退出
        } catch (Exception e) {
            //异常处理
            //抛出异常重试
            throw new RuntimeException(e);
        }

    }
}

注意:

如果业务异常不需要重试的,比如参数有误、校验不通过,重试没有用。可以定义业务异常类CommonMsgException,直接退出。

2.2.4死信队列

并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

对于死信的处理方案有多种,有两种方案:

2.2.4.1 消费者监听死信队列

消息监听和普通的消息监听一样。就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。

2.2.4.2 消费者达到最大重试次数时终止
@Component
@RocketMQMessageListener(
        topic = "ORDER_RECEIVE_TOPIC",
        consumerGroup = "CONSUMER_ORDER_RECEIVE_GROUP",
        // 指定消费者线程数,默认20,生产中请注意配置,避免过大或者过小
        consumeThreadNumber = 60
)
public class AbstractRocketMQListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        try {
            //业务逻辑
            doConsume();
            return;
        } catch (CommonMsgException ce) {
            //业务异常,不需要重试,直接退出
        } catch (Exception e) {
            // 如果重试次数超过2次就不需要重试了  
            if (message.getReconsumeTimes() >= 2) {
                //不要重试了
            } else {
                //重试
                throw new RuntimeException(e);
            }
        }

    }
}

2.2.5 消息重试次数MaxreconsumeTimes

@RocketMQMessageListener 设置了MaxreconsumeTimes=20,但还是MessageExt中reconsumeTimes为什么有时候还是0,有以下几个原因:

  1. 消息尚未被消费失败reconsumeTimes 只会在消息消费失败时增加。如果消息首次被成功消费(即没有抛出异常),则 reconsumeTimes 的值将保持为 0。

  2. 消息被其他消费者实例成功消费:在消费者组(Consumer Group)中,可能有多个消费者实例同时运行。如果一个实例消费失败,而另一个实例成功消费了这条消息,那么 reconsumeTimes 不会增加。

  3. 配置未生效:请确保您的 maxReconsumeTimes 配置已经正确设置并且已经生效。检查消费者配置是否正确,以及是否重新启动了消费者以使配置生效。

  4. 消息未到达重试阶段:如果消息在首次尝试时就被成功消费,那么它不会进入重试阶段。只有当消息首次消费失败时,才会开始重试,并增加 reconsumeTimes

  5. 消息被其他消费者组消费:请确保没有其他消费者组也订阅了相同的主题(Topic)和标签(Tag),并且可能在这些消费者组中成功消费了消息。

如果消息失败过,重试次数加1。如果之前消息消费成功过,不会重试。实例断开,即使服务器重启,重试次数累加的。messageId不变

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/546230.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CentOS7用convert2rhel转Redhat7

CentOS 停更时间表 版本停更时间CentOS 62020/11/30CentOS 72024/6/30CentOS 82021/12/1 支持的转换路径&#xff0c;表示官方测试过 CentOS 转换 RHEL 示例 转换示例环境 本示例模拟以下环境&#xff0c;使用 RHEL 7.9 ISO 文件作为转换使用的 yum repository 源&#xff…

3A大流电输出低压差线性稳压器TO-236封装

概述 PCD3933 是一款低噪声、低压差线性稳压器 (LDO)&#xff0c;可提供 3A 输出电流&#xff0c;最大压降仅为 210mV。该器件提供两种输出电压范围。 PCD3933 的输出电压可通过外部电阻分压器在 0.5V 至 5.15V 范围内进行调节&#xff0c;同时还提供固定输出电压版本。 PCD3…

【Entity Framework】聊一聊EF中继承关系

【Entity Framework】聊一聊EF中继承关系 文章目录 【Entity Framework】聊一聊EF中继承关系一、概述二、实体类型层次结构映射三、每个层次结构一张表和鉴别器配置四、共享列五、每个类型一张表配置六、每个具体类型一张表配置七、TPC数据库架构八、总结 一、概述 Entity Fra…

Unity TMP Inputfield 输入框 框选 富文本 获取真实定位

一、带富文本标签的框选是什么 UGUI的InputField提供了selectionAnchorPosition和selectionFocusPosition&#xff0c;开始选择时的光标下标和当前光标下标 对于未添加富文本标签时&#xff0c;直接通过以上两个值&#xff0c;判断一下框选方向&#xff08;前向后/后向前&…

【热门话题】PyTorch:深度学习领域的强大工具

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 PyTorch&#xff1a;深度学习领域的强大工具一、PyTorch概述二、PyTorch核心特性…

C语言洛谷题目分享(9)奇怪的电梯

目录 1.前言 2.题目&#xff1a;奇怪的电梯 1.题目描述 2.输入格式 3.输出格式 4.输入输出样例 5.说明 6.题解 3.小结 1.前言 哈喽大家好啊&#xff0c;前一段时间小编去备战蓝桥杯所以博客的更新就暂停了几天&#xff0c;今天继续为大家带来题解分享&#xff0c;希望大…

性能再升级!UNet+注意力机制,新SOTA分割准确率高达99%

UNet结合注意力机制能够有效提升图像分割任务的性能。 具体来说&#xff0c;通过将注意力模块集成到UNet的架构中&#xff0c;动态地重新分配网络的焦点&#xff0c;让其更集中在图像中对于分割任务关键的部分。这样UNet可以更有效地利用其跳跃连接特性&#xff0c;以精细的局…

2024-4-15-ARM作业

实现字符串数据收发函数的封装 源代码&#xff1a; main.c #include "gpio.h"#include "uart4.h"int main(){uart4_config();while (1){// char agetchar();// putchar(a1);char s[20];gets(s);puts(s);//putchar(\n);putchar(\r);}return 0;}uart4.c …

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十二 简单把视频的水印去掉效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十二 简单把视频的水印去掉效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之十二 简单把视频的水印去掉效果 一、简单介绍 二、简单把视频的水印去掉效果实现原理 …

MVVM架构模式

目录 MVVM 数据绑定方式 实现方式 Model View ViewModel 数据绑定方式 vue&#xff1a;&#xff1a; 数据劫持和发布-订阅模式&#xff1a; Object.defineProperty() 方法来劫持&#xff08;监控&#xff09;各属性的 getter 、setter &#xff0c;并在数据&#xff08;对…

centos7上安装python3.10

1 安装依赖 使用yum程序安装所需依赖。打开终端并运行以下命令&#xff1a; yum install wget zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make zlib zlib-devel libffi-devel -y 这些依赖项是编译和安装Python 3.10所必…

结合文本的目标检测:Open-GroundingDino训练自己的数据集

1、简单介绍 Open-GroundingDino是GroundingDino的第三方实现训练流程的代码&#xff0c;因为官方GroundingDino没有提供训练代码&#xff0c;只提供了demo推理代码。 关于GroundingDino的介绍可以看论文&#xff1a;https://arxiv.org/pdf/2303.05499.pdf GroundingDino的G…

没有算法大佬,都是草台班子

没有算法大佬&#xff0c;都是草台班子。 最近除了工作之外&#xff0c;还有一些时间在和加我微信的小伙伴沟通&#xff0c;聊的内容大部分集中在如何快速有效的学习人工智能、入门人工智能的技巧。 其中&#xff0c;一个知乎过来加我微信的小伙伴的经历更是让我感触很深。 …

openGauss学习笔记-261 openGauss性能调优-使用Plan Hint进行调优-将部分Error降级为Warning的Hint

文章目录 openGauss学习笔记-261 openGauss性能调优-使用Plan Hint进行调优-将部分Error降级为Warning的Hint261.1 功能描述261.2 语法格式261.3 示例261.3.1 忽略非空约束261.3.2 忽略唯一约束261.3.3 忽略分区表无法匹配到合法分区261.3.4 更新/插入值向目标列类型转换失败 o…

AI 编程助手汇总

文章目录 AI编程助手~~GitHub学生认证申请&#xff08;无效&#xff0c;申请不了了&#xff09;~~GitHub双重身份验证 AI编程助手 Baidu Comate &#xff08;强推✔&#xff09; 阿里通义灵码 清华CodeGeeX Amazon CodeWhisperer &#xff08;需要注册账号&#xff0c;绑定信…

秋招算法刷题7

20240410 1.接雨水 方法一&#xff0c;动态规划&#xff0c;时间复杂度O&#xff08;n^2&#xff09;&#xff0c;空间复杂度O&#xff08;n&#xff09; public int trap(int[] height) { int nheight.length; if(n0){ return 0; } …

python 海龟画图tutle螺旋线

目录 初识turtle模块 基本绘图概念 示例&#xff1a;绘制一个正方形 示例&#xff1a;绘制彩色螺旋线 附录 常用命令 其它命令 在Python编程中&#xff0c;使用turtle模块进行图形绘制是一种非常有趣和富有教育意义的活动。通过控制一个小海龟&#xff08;Turtle&#x…

RabbitMQ消息模型之Direct消息模型

Direct消息模型 * 路由模型&#xff1a; * 一个交换机可以绑定多个队列 * 生产者给交换机发送消息时&#xff0c;需要指定消息的路由键 * 消费者绑定队列到交换机时&#xff0c;需要指定所需要消费的信息的路由键 * 交换机会根据消息的路由键将消息转发到对应的队…

ModuleNotFoundError: No module named ‘llama_index.readers“解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

“We Need Structured Output”: 以用户为中心的大模型输出

发表机构&#xff1a;Google Research 这篇论文的核心是设计了一种系统&#xff0c;可以让开发者和用户对大型语言模型的输出施加结构性约束。系统的主要部分包括&#xff1a; 1. 用户界面&#xff08;GUI&#xff09;&#xff1a;允许用户通过图形界面来定义他们希望LLM遵守…