RocketMQ快速入门:如何保证消息不丢失|保证消息可靠性(九)

0. 引言

在金融、电商等对数据完整性要求极高的行业,消息的丢失可能会导致数据不一致,严重影响业务逻辑和数据统计,也影响客户体验,所以在很多业务场景下,我们都要求数据不能丢失。而rocketmq中,如何对消息防丢失进行处理的呢?

1. 原理

1.2 产生消息丢失的场景

首先我们要理解消息的传递过程,在哪些阶段会导致消息丢失,才能知道如何进行防控。

我们之前分析过rabbitmq如何保证消息不丢失, rabbitmq内部有交换机这一转发步骤,所以相对比rocketmq更加复杂,但是两者的分析方法是一致的。
在这里插入图片描述

rocketmq的消息传递分为3个阶段:
(1)生产者发送消息到broker的队列中
(2)broker存储消息
(3)消费者到队列获取消息进行消费
在这里插入图片描述
而这三个阶段可能会导致消息丢失的场景是什么呢?其实由rabbitmq的分析我们可以得到启发。
(1)生产者发送消息到broker的队列中

生产者在发出消息后,可能因为网络异常、broker宕机,导致发出的消息实际并没有到达broker

(2)broker存储消息

broker的存储机制是将消息先存储到内存,存储完成后再发送回执给生产者,然后再异步将数据刷到磁盘,但如果在这个刷盘这个过程中broker宕机了,也会导致消息丢失

(3)消费者到队列获取消息进行消费

broker在将消息发出后,同样可能因为网络异常、消费者宕机或者消息者消费到一半产生错误等因素,导致消息实际并没有被消费者消费,但broker又扣除了这条消息,就会导致消息丢失

1.2 防丢失措施

阶段一:生产者发送消息到broker的队列中

1、因为发送到broker期间网络因素我们很难干预,也很难百分比保证。第一点我们能做的,如果其中一个broker宕机,那能有备用节点顶上,保证可用性。于是第一项就是多节点部署broker

2、但万一节点都挂了呢,或者整个机房网络瘫痪了,如何保证消息不丢失,我们只要从上游控制,如果下游不通时,就不要发了,待会再发。于是也衍生出消息发送失败时的重试机制

3、但如果一直重发不成功怎么办呢,那就需要下游告知上游,这次发送没成功,你记录好状态,这就是broker要有返回状态告知,否则生产者也不知道到底发送成功没有。broker中提供了3种发送方式:同步、异步、单向(详见之前的文章: RocketMQ快速入门:集成java客户端实现各类消息发送|异步、同步、顺序、单向、延迟、事务(五)附带源码)。

这三种方式中单向肯定不行,他是不管返回结果的,最容易丢失消息。而异步需要设置回调函数,在回调函数中处理发送失败时的逻辑,如果对于一些场景回调里很能补救,最常见的就是回调里进行重发,所以最优先保证消息可靠的就是同步发送的方式,一旦获取到发送失败,就进行补救处理,或者不再继续后续的业务逻辑,整个流程直接报错打回

总结一下,生产阶段保证消息可靠的手段包括多节点部署broker, 消息重发、同步发送。这几种方式实际上是可以配合使用的, 比如多节点部署,通过同步发送,发送失败时进行3次重发,都重发失败则记录状态。
在这里插入图片描述

阶段二:broker存储消息

存储阶段导致丢失的原因就是因为broker默认的是异步刷盘机制,如果改成同步刷盘呢,先存储到内存,然后刷新到磁盘,刷新成功后才给生产者返回成功收到的回执,以此保证消息可靠。rocketmq中也是支持同步刷盘的。

但如果只有一个节点的话,即使同步刷盘,当broker宕机后,没有备用的,还是会导致服务不可用,相对可靠性就没有保障了。所以同步刷盘,也可以配合着多节点部署使用

当然如果你的场景对可用性要求不高,即使宕机,只要报错会生产者就行,那同步刷盘也足够了

总结一下,存储阶段主要依赖同步刷盘和多节点部署来保障可靠性,当然多节点部署可以根据业务情况和成本预算选择。
在这里插入图片描述

阶段三:消费者到队列获取消息进行消费

消费阶段的丢失可能性主要来源于消费者没处理好之前就宕机或则异常了,首先一点能想到的那就多个消费者呢,但实际上多点部署在消费阶段并不能解决问题,因为rocketmq消费模式有广播模式和集群模式,广播模式下每个节点都会收到消息,这个模式下的天然就是多节点部署。而集群模式本身也是基于多消费者的情况,但两者都无法保障当消息发送给某一节点后,这个节点拉去了消息,但没实际处理完就异常的场景。

所以就考虑当消费者消费完成后,再给broker发送成功消费的回执,这时broker才更新消息偏移量,将消息标识为被消费。如此才能保障消息的可靠性。

在这里插入图片描述

2. 实现

2.1 生产阶段

1、多节点部署,可以部署主从或集群模式,这里不讲解详细的搭建流程,后续单独讲解

2、同步发送,主要通过producer.send来实现同步发送

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 声明group
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 声明namesrv地址
        producer.setNamesrvAddr("localhost:9876");

        // 设置重试次数
        producer.setRetryTimesWhenSendFailed(2);
        // 启动实例
        producer.start();

        // 设置消息的topic,tag以及消息体
        Message msg = new Message("topic_test", "tag_test", "消息内容1".getBytes(StandardCharsets.UTF_8));

        // 发送消息,并设置10s连接超时
        SendResult send = producer.send(msg, 10000);
        System.out.println("发送结果:"+send);

        // 关闭实例
        producer.shutdown();
    }

如果是springboot集成的,可以通过SendResult sendResult = rocketMQTemplate.syncSend("topic_test:tag_test", message);来实现。通过发送结果SendResult对象,来判断发送失败后的处理逻辑

3、发送重试
(1)首先手动重发的实现很简单,只需要根据send.getSendStatus()状态来判断,如果需要重发多次的,可以结合guava-retry 等重发组件来更方便的实现

// 发送消息,并设置10s连接超时
        SendResult send = producer.send(msg, 10000);
        System.out.println("发送结果:"+send);
        
        if(!send.getSendStatus().equals(SendStatus.SEND_OK)){
            // 发送失败,手动重发
            send = producer.send(msg, 10000);
            
        }

(2)当然rocketmq也封装好了重试机制给我们使用,其重试机制采用衰减的形式,也就是重试间隔时间会逐渐增加
在这里插入图片描述
我们只需要通过producer.setRetryTimesWhenSendFailed(2);方法即可设置,会在发送失败时自动触发重新发送,同时如果超过设置的超时时间还未接收到成功的结果也会触发重发机制,就不需要我们手动书写重发逻辑了,更加推荐这种方式。

        // 声明group
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 声明namesrv地址
        producer.setNamesrvAddr("localhost:9876");

        // 设置重试次数
        producer.setRetryTimesWhenSendFailed(2);
        // 启动实例
        producer.start();

2.2 存储阶段

1、主要将broker的刷盘策略设置为同步刷盘,需要修改broker.conf配置文件

# 设置为同步刷盘模式
flushDiskType = SYNC_FLUSH

2、如果配置的是多节点,一般是主从模式,为了防止主节点有数据,从节点没刷到数据的情况,就需要开启从节点刷盘后再返回ACK回执给生产者,需要修改从节点broker配置文件

# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER

broker提供了两种主从同步模式:ASYNC_MASTER异步 和 SYNC_MASTER同步

  • ASYNC_MASTER:

消息发送到master节点后,开启一个异步线程更新给从节点,这个过程中有消息同步丢失的风险,优点是性能高

  • SYNC_MASTER:

消息发送到master节点后,同步更新到从节点,当从节点更新完再返回成功的ACK回执给生产者,表示消息发送成功,可靠性高,但性能会有所下降

3、关于多节点部署,我们在后续单独讲解

2.3 消费阶段

1、其实现就是消费后返回成功状态即可

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 设置topic
        consumer.subscribe("topic_test", "*");

        // 设置重试次数
        consumer.setMaxReconsumeTimes(2);

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : list) {
                    String topic = msg.getTopic();
                    try {
                        String messageBody = new String(msg.getBody(), "utf-8");
                        System.out.println(topic+":"+messageBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者实例
        consumer.start();

2、如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER状态,消费者就会触发稍后重试机制进行重新消费,同样的可以通过consumer.setMaxReconsumeTimes设置最大重试次数

// 设置重试次数
consumer.setMaxReconsumeTimes(2);

其重试次数和时延等级与生产重试是一致的
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/726984.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

集合进阶(泛型、泛型通配符、数据结构(二叉树、平衡二叉树、红黑树

一、泛型类、泛型方法、泛型接口 1、泛型概述 泛型&#xff1a;是JDK5中引入的特性&#xff0c;可以在编译阶段约束操作的数据类型&#xff0c;并进行检查。泛型的格式&#xff1a;<数据类型>注意&#xff1a;泛型只能支持引用数据类型。 泛型的好处 1、统一数据类型。 …

建筑主体沉降观测规范详解

随着城市化进程的加速&#xff0c;高层建筑和大型建筑项目日益增多&#xff0c;建筑主体的沉降观测工作显得尤为重要。沉降观测是确保建筑安全稳定的关键环节&#xff0c;对于预防建筑安全事故、保障人民生命财产安全具有重要意义。本文将详细解析建筑主体沉降观测的规范和要求…

【机器学习】线性回归:从基础到实践的深度解析

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 线性回归&#xff1a;从基础到实践的深度解析引言一、线性回归基础1.1 定义与目…

Word和Excel如何快速对齐姓名

日常工作经常遇到整理参会人员名单时&#xff0c;有2字姓名、3字姓名&#xff0c;为保证文档美观&#xff0c;你是否还在一个一个空格在敲空格&#xff1f; 今天刘小生分享如何在Word和Excel中快速对齐姓名&#xff0c;快来练起来吧&#xff01; 1. Word姓名对齐 【第一步】…

看见未来社区:视频孪生技术打造智慧社区

智慧社区的建设需要创新的技术支撑。智汇云舟创新升级数字孪生为视频孪生技术&#xff0c;通过将真实世界的视频监控与数字模型实时融合&#xff0c;实现了对物理空间的实时实景动态模拟。 针对智慧社区管理业务&#xff0c;以智汇云舟视频孪生平台为支撑&#xff0c;综合承载…

一起笨笨的学C——014grep特别版

目录 前言 正文 原文&#xff1a; 要求总结&#xff1a; 一点一点来&#xff1a; grep学习&#xff1a; glob理解&#xff1a; dirent 目录函数&#xff1a; 加载日志文件&#xff1a; strstr与strcmp&#xff1a; 非首次尝试&#xff1a; 非二次 &#xff1a; 老师…

易兆微电子_嵌入式软件工程师笔试题

易先电子 嵌入式软件工程师笔试题(十七) 1.关键字 extern是什么含义, 请举例说明。 修饰符extern用在变量或者函数的声明前&#xff0c;用来说明 “ 此变量 / 函数是在别处定义的&#xff0c;要在此处引用 ”。 //main.c #include <stdio.h>int main() {extern int num…

英国牛津大学基因组学方向博士后职位

英国牛津大学基因组学方向博士后职位 牛津大学&#xff08;University of Oxford&#xff09;&#xff0c;简称“牛津”&#xff08;Oxford&#xff09;&#xff0c;位于英国牛津&#xff0c;是一所公立研究型大学&#xff0c;采用传统学院制。是罗素大学集团成员&#xff0c;被…

商超智能守护:AI监控技术在零售安全中的应用

结合思通数科大模型的图像处理、图像识别、目标检测和知识图谱技术&#xff0c;以下是详细的商超合规监测应用场景描述&#xff1a; 1. 员工仪容仪表监测&#xff1a; 利用图像识别技术&#xff0c;系统可以自动检测员工是否按照规范整理妆容、穿着工作服&#xff0c;以及是否…

管理咨询公司的五个招聘秘密

在管理咨询中&#xff0c;人是业务&#xff1b;客户支付数百万美元&#xff0c;要求管理顾问确认问题&#xff0c;并推荐解决方案。由于收入和合规性受到威胁&#xff0c;招聘错误的成本可能非常昂贵&#xff0c;一些公司更倾向于谨慎而不是效率。然而&#xff0c;在当今竞争激…

Nexus安卓木马分析报告

概述 2023年3月21日晚上&#xff0c;链安与中睿天下联合研发的监控系统检测到一种新型安卓木马。在经过睿士沙箱系统捕获样本之后&#xff0c;发现该安卓木马极有可能是原安卓网银盗号木马SOVA的变种。与此同时&#xff0c;意大利安全公司Cleafy发布了一篇题为《Nexus&#xf…

API接口对接的步骤流程?有哪些注意事项?

API接口对接自动化的实现方法&#xff1f;如何调试API接口发信&#xff1f; 在现代软件开发中&#xff0c;API接口对接已成为各个系统和应用之间进行通信和数据交换的关键技术。AokSend将详细介绍API接口对接的步骤流程&#xff0c;帮助开发者更好地理解和实现这一过程。 API…

VScode基本使用

VScode下载安装&#xff1a; Visual Studio Code - Code Editing. Redefined MinGW的下载安装&#xff1a; MinGW-w64 - for 32 and 64 bit Windows - Browse Files at SourceForge.net x86是64位处理器架构&#xff0c;i686是32为处理器架构。 POSIX和Win32是两种不同的操…

Spring Cloud Gateway网关下的文档聚合(knife4j)

文章目录 引言I 服务发现自动聚合(discover)1.1 配置1.2 服务发现的路由聚合策略-数据来源1.3 编写动态路由实现类II 其他2.1 网关动态加载swagger路由和配置2.2 无法处理 lb://URI,返回503错误。2.3 SpringBoot3 解决NoResourceFoundException: No static resource favicon.i…

数据结构与算法-差分数组及应用

差分数组 差分数组&#xff1a; 其实差分数组是创建一个一个辅助数组&#xff0c;用来表示给定数组的变化&#xff0c;一般用来对数组进行区间修改的操作。 频繁操作数组区间的问题 假设我们要对一个数组进行区间操作。数组为 a {10,10, 20,20,50,… 100}。数组数据比较多。 对…

羊大师:培养儿童配得感,从自我认知开始

在儿童的成长过程中&#xff0c;配得感的培养是至关重要的。配得感&#xff0c;即孩子认为自己值得拥有美好事物和得到他人关爱的一种心理状态&#xff0c;是孩子自信心和自尊心的基石。而自我认知&#xff0c;则是培养配得感的第一步。 我们要引导孩子正确地认识自己。每个孩子…

vant组件 顶部下拉刷新和页面底部下拉获取数据

1.html部分&#xff08;顶部tab切换无&#xff0c;只有主体list部分&#xff09; <div class"yd" ><!-- yd端 --><van-pull-refresh v-model"refreshing" refresh"onRefresh"><van-listv-model"ydloading":finis…

【SpringCloud】Eureka的简单使用

本文使用的是jdk17&#xff0c;mysql8。 以下用两个服务做演示&#xff1a; 订单服务&#xff1a;提供订单ID&#xff0c;获取订单详细信息。 商品服务&#xff1a;提供商品ID&#xff0c;获取商品详细信息。 对于上篇http://t.csdnimg.cn/vcWpo 订单服务调用商品服务的时候&a…

Markdown 生成 Epub (Typora + pandoc)

文章目录 一、安装 pandoc二、Typora pandoc 导出 Pandoc 文件三、看看效果 一、安装 pandoc macOS 上使用 brew 安装 brew install pandoc其他系统可见&#xff1a;https://pandoc.org/installing.html 安装成功后查看版本 pandoc --version$ pandoc --version pandoc 2.1…

PS选不了颜色和路径描边?PS不知为何才能描边任意路径,这个办法让你秒懂

在选中路径的情况下&#xff0c;按图下操作&#xff0c;即可制作路径&#xff08;不会让你选不了颜色和路径描边&#xff09;