RocketMQ顺序消息消费重试DEMO

Producer - 加入了id为key,msg为bean的json字符

public class AddProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Alarm alarm1 = new Alarm(1, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-01"));
        Alarm alarm2 = new Alarm(2, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-02"));
        Alarm alarm3 = new Alarm(3, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-03"));
        Alarm alarm4 = new Alarm(4, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-04"));
        Alarm alarm5 = new Alarm(5, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-05"));
        Alarm alarm6 = new Alarm(6, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-06"));
        Alarm alarm7 = new Alarm(7, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-07"));
        Alarm alarm8 = new Alarm(8, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-08"));
        Alarm alarm9 = new Alarm(9, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-09"));
        Alarm alarm10 = new Alarm(10, "add", new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-10"));

        ArrayList<Alarm> list = new ArrayList<>();
        list.add(alarm1);
        list.add(alarm2);
        list.add(alarm3);
        list.add(alarm4);
        list.add(alarm5);
        list.add(alarm6);
        list.add(alarm7);
        list.add(alarm8);
        list.add(alarm9);
        list.add(alarm10);




        try {

            for (Alarm alarm : list) {
                Message msg = new Message("ALARM_RECORD", "add",String.valueOf(alarm.getId()), JSONUtil.toJsonStr(alarm).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),alarm.getId(),new CustomSendCallback());
                System.out.println(alarm.getId() + " Continue execution ");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        // 6. 关闭生产者
//        producer.shutdown();
    }
}

Selector

public class CustomMessageQueueSelector implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        Integer id = (Integer) o;
        long index = id % list.size();
        return list.get((int) index);
    }
}

那么都知道 id 1和id9 用的是queue1

Consumer

public class ConsumerAdd {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("record_add_group");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("ALARM_RECORD", "add");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Consumer tag add Started.%n");
    }
}

监听器

@Slf4j
public class CustomMessageListenerOrderly implements MessageListenerOrderly {

    /**
     * 保持消息消费的次数
     */
    Map<String, Integer> map  = new HashMap<>();

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        try {
            for (MessageExt msg : list) {
                String data = new String(msg.getBody());
                Alarm alarm = JSONUtil.toBean(data, Alarm.class);
                if(alarm.getId() == 1){
                    throw new Exception();
                }
                System.out.println("正常消费:" + alarm);
            }

            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            MessageExt msg = list.get(0);
            map.put(msg.getKeys(),msg.getReconsumeTimes()+1);
            log.info(msg.getKeys()+":"+map.get(msg.getKeys()));
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }

    }
}

id为1和9同queue,所以1报错,id9一定不会消费,那么返回SUSPEND_CURRENT_QUEUE_A_MOMENT   表示等一会,再继续处理这批消息

根据试验,确实如此,重试默认时间间隔可以看出是1S。默认次数Integer.MAX

当然可以通过

consumer.setSuspendCurrentQueueTimeMillis(2000);

修改重试时间间隔

也可以通过

consumer.setMaxReconsumeTimes(10);

修改顺序消费重试次数

再试一次

可以看到这条有问题的消息直接抛掉,消费ID9的了

重试topic也是没有的

那么去哪了呢?我们在死信队列中找到

总结:

如果业务需求对于顺序消费 - 强要求,那么推荐在监听器中根据map中一定要根据重试次数采取一定措施,通知到人。不然会造成消息堆积

如果顺序要求不高,或者不想这么麻烦,可以设置重试次数,使队列可以正常下去,做好记录及抛掉此消息后对后续消息影响的判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/554948.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

损失函数:Cross Entropy Loss (交叉熵损失函数)

损失函数&#xff1a;Cross Entropy Loss &#xff08;交叉熵损失函数&#xff09; 前言相关介绍Softmax函数代码实例 Cross Entropy Loss &#xff08;交叉熵损失函数&#xff09;Cross Entropy Loss与BCE loss区别代码实例 前言 由于本人水平有限&#xff0c;难免出现错漏&am…

密码学 | 椭圆曲线密码学 ECC 入门(二)

目录 4 椭圆曲线&#xff1a;更好的陷门函数 5 奇异的对称性 6 让我们变得奇特 ⚠️ 原文地址&#xff1a;A (Relatively Easy To Understand) Primer on Elliptic Curve Cryptography ⚠️ 写在前面&#xff1a;本文属搬运博客&#xff0c;自己留着学习。如果你和我一样…

【Linux】应用层协议:HTTP

URL 在之前的文章中我们实现了一个网络版本的计算器&#xff0c;在那个计算器中揉合了协议定制以及序列化反序列化的内容&#xff0c;我们当时也自己定制了一套协议标准&#xff0c;比如请求和响应的格式应该是什么&#xff1f;如何读到一个完整的报文&#xff1f;支持的运算符…

【XR806开发板试用】在 xr806 上移植 LVGL

本文参与极术社区的《基于安谋科技STAR-MC1的XR806开发板试用》活动。 不多废话&#xff0c;直接开搞&#xff0c;先上效果图 准备 开发环境啥的&#xff0c;已经有很多文章了&#xff0c;这里就不再提搭建开发环境的相关内容了。 一个屏幕(1.8’ 128x160) LVGL源码(v8.0.2…

京东微服务microApp使用总结

前言 基于现有业务门户进行微服务基础平台搭建 主应用框架&#xff1a;vue3vite 子应用框架&#xff1a;vue2webpack / vue3vite在这里插入代码片 本地调试即可&#xff1a;主应用子应用进行打通&#xff08;注意&#xff1a;两者都是vue3vite&#xff09; 问题总结 1.嵌入…

压缩感知的概述梳理(3)

参考文献 Adaptive embedding: A novel meaningful image encryption scheme based on parallel compressive sensing and slant transform 文献内容 梳理 列表形式 并行压缩感知核心元素与流程 信号 x 长度&#xff1a;N表示&#xff1a;(x \sum_{i1}^{N} a_i\psi_i \su…

密码学 | 椭圆曲线密码学 ECC 入门(三)

目录 7 这一切意味着什么&#xff1f; 8 椭圆曲线密码学的应用 9 椭圆曲线密码学的缺点 10 展望未来 ⚠️ 原文地址&#xff1a;A (Relatively Easy To Understand) Primer on Elliptic Curve Cryptography ⚠️ 写在前面&#xff1a;本文属搬运博客&#xff0c;自己留…

Argus DBM 一款开源的数据库监控工具,无需部署Agent

开箱即用 无需部署Agent&#xff0c;开箱即用。我们使用JDBC直连您的数据库&#xff0c;输入IP端口账户密码即可。 全平台支持 Argus目前支持对Mysql, PostgreSQL, Oracle等数据库类型的监控&#xff0c;我们也会尽快适配其它数据库&#xff0c;致力于监控所有数据库。我们提…

ctfshow web入门 SQl注入web171--web179

从这里开始SQl建议大家去看这篇文章学习一下先 MySQl web171 法一联合查询 题目 $sql "select username,password from user where username !flag and id ".$_GET[id]." limit 1;";爆数据库名 -1 union select 1,database(),3 -- 爆表名 -1 union s…

el-input在type=“textarea“中文字换行,导出成word文档中也显示换行

el-input在type"textarea"文字中换行&#xff0c;是\n转义的&#xff0c;文字导出成word文档没有换行&#xff0c;只需要把\n替换成\r&#xff0c;这样el-input和word文档都能正常显示换行 val.replace.replace("\n", "\r")

UWB技术应用:UWB模块在室内定位系统中的关键角色

随着室内定位技术的不断发展&#xff0c;UWB模块在室内定位系统中扮演着关键角色&#xff0c;其高精度、抗干扰和多路径传输等特点&#xff0c;为室内定位提供了可靠的技术支持。本文将探讨UWB模块在室内定位系统中的关键角色&#xff0c;包括其原理、应用场景和技术优势&#…

Java项目 苍穹外卖 黑马程序员

目录 day1一、项目效果展示二、项目开发整体介绍三、项目介绍3.1 定位3.2 功能架构3.3 产品原型3.4 技术选型 四、开发环境搭建4.1 前端环境4.2 后端环境 五、导入接口文档六、Swagger6.1 介绍6.2 使用方式6.3 常用注解 day2一、新增员工二、员工分页查询三、启用禁用员工账号四…

【无标题】PHP-parse_str变量覆盖

[题目信息]&#xff1a; 题目名称题目难度PHP-parse_str变量覆盖1 [题目考点]&#xff1a; 变量覆盖指的是用我们自定义的参数值替换程序原有的变量值&#xff0c;一般变量覆盖漏洞需要结合程序的其它功能来实现完整的攻击。 经常导致变量覆盖漏洞场景有&#xff1a;$$&…

paddle gpu install

官方教程 https://www.paddlepaddle.org.cn/documentation/docs/zh/install/pip/linux-pip.html#sanyanzhenganzhuang 创建一个新的环境安装 python3 -m pip install paddlepaddle-gpu2.6.1.post112 -f https://www.paddlepaddle.org.cn/whl/linux/cudnnin/stable.htmlpython …

网络基础-TCP和UDP协议区别

什么是 TCP 协议&#xff1f; TCP 全称是 Transmission Control Protocol&#xff08;传输控制协议&#xff09;&#xff0c;它由 IETF 的 RFC 793 定义&#xff0c;是一种面向连接的点对点的传输层通信协议。 TCP 通过使用序列号和确认消息&#xff0c;从发送节点提供有关传输…

请陪伴Kimi和GPT成长

经验的闪光汤圆 但是我想要写实的 你有吗&#xff1f; 岁数大了&#xff0c;希望如何学习新知识呢&#xff1f;又觉得自己哪些能力亟需补强呢&#xff1f; 看论文自然得用Kimi&#xff0c;主要是肝不动了&#xff0c;眼睛也顶不住了。 正好昨天跟专业人士学会了用工作流的办法跟…

【动态规划 状态机dp】3082. 求出所有子序列的能量和

算法可以发掘本质&#xff0c;如&#xff1a; 一&#xff0c;若干师傅和徒弟互有好感&#xff0c;有好感的师徒可以结对学习。师傅和徒弟都只能参加一个对子。如何让对子最多。 二&#xff0c;有无限多1X2和2X1的骨牌&#xff0c;某个棋盘若干格子坏了&#xff0c;如何在没有坏…

idea连接Docker数据库

我们在docker下创建了数据库&#xff0c;想要更方便的查看和操作该数据库&#xff0c;idea和DataGrip或者其他人家都可以。在数据库连接时需要填写数据库名字&#xff0c;主机&#xff0c;端口&#xff0c;数据库用户名和密码。 输入之后先不要点击OK和按Enter键&#xff0c;我…

vue+springboot实验个人信息,修改密码,忘记密码功能实现

前端部分 新增Person&#xff08;个人页面&#xff09;&#xff0c;Password&#xff08;修改密码页面&#xff09;&#xff0c;还需要对Manager&#xff0c;login页面进行修改 router文件夹下的index.js&#xff1a; import Vue from vue import VueRouter from vue-router i…

【运输层】TCP 的流量控制和拥塞控制

目录 1、流量控制 2、TCP 的拥塞控制 &#xff08;1&#xff09;拥塞控制的原理 &#xff08;2&#xff09;拥塞控制的具体方法 1、流量控制 一般说来&#xff0c;我们总是希望数据传输得更快一些。但如果发送方把数据发送得过快&#xff0c;接收方就可能来不及接收&#x…