RabbitMQ如何做到不丢不重

目录

MQTT协议

如何保证消息100%不丢失

生产端可靠性投递

​编辑

RabbitMQ的Broker端投

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

消费端

ACK机制改为手动

总结


MQTT协议

先来说下MQTT协议中的3种语义,这个非常重要。

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;  

所以要保证消息不丢,就得从三个方面入手,分别是生产端、RabbitMQ的Broker端、消费端。三个都保证不丢失,才能保证100%不丢。

生产端可靠性投递

事务机制:

// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);


@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory)
 {
  return new RabbitTransactionManager(connectionFactory);
 }
    
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
   rabbitTemplate.setMandatory(true);
   rabbitTemplate.convertAndSend("java",message);
    }

confirm消息确认机制:

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true	     
@Configuration
@Slf4j
public class RabbitMQConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void enableConfirmCallback() {
        //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
        //correlationData 可在发送时指定消息唯一 id
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if(!ack){
                //记录日志、落库定时任务扫描重发,同时对开发人员进行通知
            }
        });
        
        //当消息成功发送到交换机没有路由到队列触发此监听
        rabbitTemplate.setReturnsCallback(returned -> {
            //记录日志、落库、同时对开发人员进行通知
        });
    }
}

一般不推荐事务的模式,因为是同步的会影响性能,所以都会采用异步回调的confirm模式。

RabbitMQ的Broker端投

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降。

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

消费端
ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。
我们需要进行手动消费

#开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。一般就不会再次入队了,而是记录并通知开发人员,进行手动处理



 @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试:抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试:抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/270346.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阿里云经济型e实例2核2G3M99元1年,性价比超高的入门级云服务器

产品简介 经济型e实例是阿里云面向个人开发者、学生、小微企业,在中小型网站建设、开发测试、轻量级应用等场景推出的全新入门级云服务器,采用Intel Xeon Platinum架构处理器,支持1:1、1:2、1:4多种处理器内存配比,采用非绑定CPU…

IntelliJ IDEA快捷键及调试

文章目录 一、IntelliJ IDEA 常用快捷键一览表1-IDEA的日常快捷键第1组:通用型第2组:提高编写速度(上)第3组:提高编写速度(下)第4组:类结构、查找和查看源码第5组:查找、…

ES8生产实践——Kibana对接Azure AD实现单点登录

基本概念介绍 什么是单点登录 单点登录(Single Sign-On,SSO)是一种身份验证和访问控制机制,允许用户使用一组凭据(通常是用户名和密码)仅需登录一次,即可访问多个应用程序或系统,而…

DDIM详解

DDIM详解 参考:https://www.bilibili.com/video/BV1VP411u71p/ 虽然 DDIM 现在主要用于加速采样,但他的实际意义远不止于此。本文将首先回顾 DDPM 的训练和采样过程,再讨论 DDPM 与 DDIM 的关系,然后推导 DDIM 的采样公式&#xf…

MySQL的事务-隔离级别

上篇,整理了MySQL事务的原子性,这篇继续整理MySQL事务的一致性、隔离性和持久性。 2. 一致性指的是事务开始前和结束后,数据库的完整性约束没有被破坏,这保证了数据的完整性和一致性。一致性必须确保数据库从一个一致的状态转换到…

【无标题】【一周安全资讯1223】一图读懂《工业和信息化部办公厅关于组织开展网络安全保险服务试点工作的通知》;15亿条纽约房产记录泄露

要闻速览 1、一图读懂《工业和信息化部办公厅关于组织开展网络安全保险服务试点工作的通知》 2、国家数据局《“数据要素”三年行动计划 (2024—2026年)》公开征求意见 3、中国信息通信研究院发布《公共数据授权运营发展洞察 (2023年)》 4、15亿条纽约房产记录泄露&#xff0c…

c# OpenCvSharp透视矫正六步实现透视矫正(八)

透视矫正,引用文档拍照扫描,相片矫正这块。 读取图像Cv2.ImRead();预处理(灰度化,高斯滤波、边缘检测)轮廓检测(获取到最大轮廓)获取最大面积轮廓的四个顶点标识最小矩形坐标透视矫正显示 完整代码 // 1、…

结构体的对齐规则

1.引入 我们在掌握了结构体的基本使⽤后。 现在我们深⼊讨论⼀个问题:计算结构体的大小。 这也是⼀个特别热门的考点: 结构体内存对齐。 2.具体分析 ⾸先我们得掌握结构体的对⻬规则: 1. 结构体的第⼀个成员对⻬到和结构体变量起始位置偏移量…

直通车定义、功能以及扣费原则

1.直通车是天猫付费搜索广告,即时需求,是消费者主动来搜索的,cpc扣费原则,一般用来拉新或者收割客户; 2.一般关键词优先,人群溢价是用来更精准投放的,可以不投溢价人群; 3.溢价人群…

移动开发新的风口?Harmony4.0鸿蒙应用开发基础+实践案例

前段时间鸿蒙4.0引发了很多讨论,不少业内人士认为,鸿蒙将与iOS、安卓鼎足而三了。 事实上,从如今手机操作系统竞赛中不难看出,安卓与iOS的形态、功能逐渐趋同化,两大系统互相取长补短,综合性能等差距越来越…

详细学习Java注解Annotation、元注解(通俗易懂,一学就会)

概述 底层原理 自定义注解 示例代码: 1. 2.只有属性名为value的才可以,java对它进行了标识,如果是其他别名如value1是不行的 3.多个属性,必须用键值对形式,不能少写,也不能多写,除非有default修…

Apache Commons BeanUtils: JavaBean操作的艺术

第1部分:Apache Commons BeanUtils 简介 咱们今天聊聊Apache Commons BeanUtils。这货简直就是处理JavaBean的利器,用起来又方便又快捷。不管是属性拷贝、类型转换,还是动态访问,BeanUtils都能轻松应对。 BeanUtils是啥&#xf…

H266/VVC帧内预测编码技术概述

预测编码技术 预测编码(Prediction Coding)是指利用已编码的一个或多个样本值,根据某种模型或方法,对当前的样本值进行预测,并对样本真实值和预测值之间的差值进行编码。 视频中的每个像素看成一个信源符号&#xff…

Java并发(二十一)----wait notify介绍

1、小故事 - 为什么需要 wait 由于条件不满足(没烟干不了活啊,等小M把烟送过来),小南不能继续进行计算 但小南如果一直占用着锁,其它人就得一直阻塞,效率太低 于是老王单开了一间休息室(调…

时间Date

你有没有思考过时间问题: 前端为什么可以直接看见时间格式的数据 后端怎么接受的数据,怎么处理的 一般来说:前端传输来数据都是时间格式的字符串,那么后端需要能够解析时间格式的字符串,归功于JSONFormat ,可以解析…

node封装一个图片拼接插件

说在前面 平时我们拼接图片的时候一般都要通过ps或者其他图片处理工具来进行处理合成,这次有个需求就需要进行图片拼接,而且我希望是可以直接使用代码进行拼接,于是就有了这么一个工具包。 插件效果 通过该插件,我们可以将图片进…

SUS-Chat-34B领先一步:高效双语AI模型的突破

引言 在人工智能领域,模型的规模和效能一直是衡量其先进性的关键指标。南方科技大学联合IDEA研究院CCNL团队最新开源的SUS-Chat-34B模型,以其340亿参数的庞大规模和卓越的双语处理能力,在AI界引起了广泛关注。 模型概述 SUS-Chat-34B是基于…

07 Vue3框架简介

文章目录 一、Vue3简介1. 简介2. 相关网站3. 前端技术对比4. JS前端框架5. Vue核心内容6. 使用方式 二、基础概念1. 创建一个应用2. 变量双向绑定(v-model)3. 条件控制(v-if)4. 数组遍历(v-for)5. 绑定事件…

Vue核心语法、脚手架与组件化开发、VueRouterVuex、综合案例(待办事项工具)

学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/frontlearningNotes 觉得有帮助的同学&#xff0c;可以点心心支持一下哈 一、Vue核心语法 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name…

leetcode2两数加和问题(链表)

题目思路&#xff1a; ①创建一个int类型的局部变量&#xff0c;用来存储两个结点的Val值。 ②判断该Val值与10求余(mod)后是否大于0,如果大于0, 则需要在下一个结点进位。 ③最关键的步骤&#xff1a;实现l1&#xff0c;l2结点数值相加后构建新的存储求和后的结点&#xff0…