确保数据一致性:RabbitMQ 消息传递中的丢失与重复问题详解

前言

RabbitMQ 是一个常用的消息队列工具,虽然它能帮助高并发环境下实现高效协同,但我们也曾遇到过因网络波动确认机制失效系统故障代码异常等原因导致消息丢失重复消费的问题,本文将探讨原因及解决方案,希望能为大家提供一点帮助。


一、RabbitMQ 消息丢失问题分析与解决方案

1. 生产者消息丢失

原因分析

生产者在发送消息到 RabbitMQ 时,可能会因以下原因导致消息丢失:

  • 网络故障:消息未能成功到达 RabbitMQ。
  • RabbitMQ 崩溃:生产者未确认消息是否成功送达。
  • 生产者代码异常:消息未正确发送。
解决方案
  1. 使用事务模式(不推荐)
    • 通过 channel.txSelect() 开启事务,channel.basicPublish() 发送消息,channel.txCommit() 提交事务。
    • 缺点:事务模式会显著影响性能,因此不推荐在高并发场景下使用。
  2. 使用 Publisher Confirm 模式(推荐)
    • 生产者开启 confirm 模式,每次发送消息后等待 RabbitMQ 的确认。
    • 示例代码
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.basicPublish("exchange", "routingKey", null, "message".getBytes());
if (!channel.waitForConfirms()) {
    System.out.println("消息可能丢失");
}

优点:确保消息成功写入 RabbitMQ,性能优于事务模式。

  1. 使用 Mandatory 参数或备份交换机
    • 设置 mandatory=true,当消息无法被路由时,RabbitMQ 会将消息返回给生产者。
    • 配置备份交换机,当消息无法投递时,存入备份队列,避免消息丢失。

2. RabbitMQ 内部消息丢失

原因分析

RabbitMQ 内部消息存储在内存或磁盘中,若未进行持久化,可能会导致消息丢失:

  • 队列未持久化:RabbitMQ 重启后,队列中的消息丢失。
  • 消息未持久化:RabbitMQ 崩溃时,内存中的消息丢失。
解决方案
  1. 开启队列持久化
    • 在声明队列时,设置 durable=true,确保 RabbitMQ 重启后队列不会丢失。
    • 示例代码
boolean durable = true;
channel.queueDeclare("queue", durable, false, false, null);
  1. 开启消息持久化
    • 在发送消息时,设置 deliveryMode=2,确保消息持久化到磁盘。
    • 示例代码
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 1:非持久化, 2:持久化
    .build();
channel.basicPublish("exchange", "routingKey", properties, "message".getBytes());

最佳实践:结合队列持久化和消息持久化,并使用 Publisher Confirm 模式,确保消息不丢失。


3. 消费者消息丢失

原因分析

消费者在处理消息时,可能会因以下原因导致消息丢失:

  • 消息未正确 ACK:RabbitMQ 误以为消息已被消费并删除,但实际上消费者未处理完毕。
  • 消费者进程崩溃:消费者在处理消息时崩溃,导致消息未完成处理。
解决方案
  1. 手动 ACK
    • 避免使用 autoAck=true,改为手动确认消息处理完毕后再发送 ACK。
    • 示例代码
boolean autoAck = false;
channel.basicConsume("queue", autoAck, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("Received: " + new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});
  1. 死信队列(DLX)处理异常消息
    • 当消息被拒绝(basicNackbasicReject)时,可以将其转入死信队列(DLX),避免消息直接丢失。
    • 适用场景:处理消费者无法正常处理的消息,确保消息不会丢失。

二、RabbitMQ 重复消费问题分析与解决方案

1. 重复消费的原因

  • 消费者 ACK 丢失:RabbitMQ 未收到 ACK,导致消息重新投递。
  • 网络问题:消费者 ACK 后,网络中断,RabbitMQ 未收到确认,重新投递。
  • 业务逻辑未实现幂等性:即使消息被重复投递,业务层仍需保证最终一致性。

2. 解决方案

1. 确保消息 ACK 成功
  • 在代码中确保消息处理完毕后再发送 ACK。
  • 避免使用 autoAck=true,使用 basicAck 确保 RabbitMQ 收到确认。
2. 消息去重(业务幂等性)
  • 数据库去重(适用于写操作):
    • 设计唯一约束,如 orderId 唯一。
    • 消费时,先检查 orderId 是否已处理。
  • Redis 去重(适用于高并发场景):
    • 使用 SETNX 存储 msgId,若已存在,则丢弃。
    • 示例代码
String msgId = getMessageId(message);
if (redis.setnx(msgId, "1") == 0) {
    System.out.println("重复消息,丢弃");
    return;
}
3. RabbitMQ 唯一消息 ID
  • 使用 Message Deduplication 插件:让 RabbitMQ 自动去重。
  • 在消息属性中增加唯一 ID,如 UUID,消费者根据唯一 ID 进行去重。

三、总结

问题主要原因解决方案
生产者消息丢失网络故障、RabbitMQ 崩溃开启 Confirm 模式、Mandatory 参数
RabbitMQ 内部丢失未持久化队列或消息开启持久化 + Confirm 模式
消费者消息丢失ACK 机制错误手动 ACK + 死信队列
消息重复消费ACK 丢失、业务未幂等手动 ACK + 幂等处理

通过以上措施,可以有效减少 RabbitMQ 消息丢失和重复消费问题,确保系统的可靠性和一致性。在实际开发中,应根据业务需求选择合适的方案,结合业务需求优化RabbitMQ的使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/965877.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《云夹:高效便捷的书签管理利器》

在信息爆炸的时代,我们每天都会浏览大量的网页,遇到许多有价值的内容。如何高效地管理这些网页书签,以便随时快速访问,成为了一个重要的问题。云夹作为一款出色的书签管理工具,为我们提供了完美的解决方案。 强大的功能…

学习数据结构(6)链表OJ

1.移除链表元素 解法一:(我的做法)在遍历的同时移除,代码写法比较复杂 解法二:创建新的链表,遍历原链表,将非val的节点尾插到新链表,注意,如果原链表结尾是val节点需要将…

MongoDB开发规范

分级名称定义P0核心系统需7*24不间断运行,一旦发生不可用,会直接影响核心业务的连续性,或影响公司名誉、品牌、集团战略、营销计划等,可能会造成P0-P2级事故发生。P1次核心系统这些系统降级或不可用,会间接影响用户使用…

设计模式.

设计模式 一、介绍二、六大原则1、单一职责原则(Single Responsibility Principle, SRP)2、开闭原则(Open-Closed Principle, OCP)3、里氏替换原则(Liskov Substitution Principle, LSP)4、接口隔离原则&am…

STM32的HAL库开发-通用定时器输入捕获实验

一、通用定时器输入捕获部分框图介绍 1、捕获/比较通道的输入部分(通道1) 首先设置 TIM_CCMR1的CC1S[1:0]位,设置成01,那么IC1来自于TI1,也就是说连接到TI1FP1上边。设置成10,那个IC1来自于TI2,连接到TI2FP1上。设置成…

JavaScript 复习

文章目录 语法前置语法组成引入位置内部引入外部引入 基础语法输出变量变量声明规则变量赋值变量的作用范围 数据类型强制类型转换强转为 Number强转为 Boolean强转为 String强转为 整数 | 浮点数 运算符流程控制隐式转换函数常用内置对象String 对象Array 数组对象Math 数学对…

【C】链表算法题5 -- 相交链表

leetcode链接https://leetcode.cn/problems/intersection-of-two-linked-lists/description/https://leetcode.cn/problems/intersection-of-two-linked-lists/description/ 题目描述 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节…

蓝桥杯准备 【入门3】循环结构

素数小算法&#xff08;埃氏筛&&欧拉筛&#xff09; 以下四段代码都是求20以内的所有素数 1.0版求素数 #include<iostream> using namespace std;int main() {int n 20;for(int i2;i<n;i){int j0;for(j2;j<i;j)//遍历i{if(i%j0){break;}}if(ij){cout&l…

寒假2.6--SQL注入之布尔盲注

知识点 原理&#xff1a;通过发送不同的SQL查询来观察应用程序的响应&#xff0c;进而判断查询的真假&#xff0c;并逐步推断出有用的信息 适用情况&#xff1a;一个界面存在注入&#xff0c;但是没有显示位&#xff0c;没有SQL语句执行错误信息&#xff0c;通常用于在无法直接…

Servlet笔记(下)

HttpServletRequest对象相关API 获取请求行信息相关(方式,请求的url,协议及版本) | API | 功能解释 | | ----------------------------- | ------------------------------ | | StringBuffer getRequestURL(); | 获取客户端…

QQ自动发送消息

QQ自动发送消息 python包导入 import time import pandas as pd import pyautogui import pyperclip图像识别函数封装 本程序使用pyautogui模块控制鼠标和键盘来实现QQ自动发送消息&#xff0c;因此必须得到需要点击位置的坐标&#xff08;当然也可以在程序中将位置写死&…

5.1计算机网络基本知识

5.1.1计算机网络概述 目前&#xff0c;三网融合(电信网络、有线电视网络和计算机网络)和宽带化是网络技术的发展的大方向&#xff0c;其应用广泛&#xff0c;遍及智能交通、环境保护、政府工作、公共安全、平安家居等多个领域&#xff0c;其中发展最快的并起到核心作用的则是计…

51单片机之冯·诺依曼结构

一、概述 8051系列单片机将作为控制应用最基本的内容集成在一个硅片上&#xff0c;其内部结构如图4-1所示。作为单一芯片的计算机&#xff0c;它的内部结构与一台计算机的主机非常相似。其中微处理器相当于计算机中的CPU&#xff0c;由运算器和控制器两个部分构成&#xff1b;…

13.PPT:诺贝尔奖【28】

目录 NO1234 NO567 NO8/9/10 NO11/12 NO1234 设计→变体→字体→自定义字体 SmartArt超链接新增加节 NO567 版式删除图片中的白色背景&#xff1a;选中图片→格式→删除背景→拖拉整个图片→保留更改插入→图表→散点图 &#xff1a;图表图例、网格线、坐标轴和图表标题…

RabbitMQ 从入门到精通:从工作模式到集群部署实战(一)

#作者&#xff1a;闫乾苓 文章目录 RabbitMQ简介RabbitMQ与VMware的关系架构工作流程RabbitMQ 队列工作模式及适用场景简单队列模式&#xff08;Simple Queue&#xff09;工作队列模式&#xff08;Work Queue&#xff09;发布/订阅模式&#xff08;Publish/Subscribe&#xff…

DFX(Design for eXcellence)架构设计全解析:理论、实战、案例与面试指南*

一、什么是 DFX &#xff1f;为什么重要&#xff1f; DFX&#xff08;Design for eXcellence&#xff0c;卓越设计&#xff09;是一种面向产品全生命周期的设计理念&#xff0c;旨在确保产品在设计阶段就具备**良好的制造性&#xff08;DFM&#xff09;、可测试性&#xff08;…

【Elasticsearch】diversified sampler

作用就是聚合前的采样&#xff0c;主要是采样 它就是用来采样的&#xff0c;采完样后在进行聚合操作 random_sampler和diversified_sampler是 Elasticsearch 中用于聚合查询的两种采样方法&#xff0c;它们的主要区别如下&#xff1a; 采样方式 • random_sampler&#xff1a…

2月7号.

二叉树是一种特殊的树形数据结构&#xff0c;具有以下特点&#xff1a; 基本定义 节点的度&#xff1a;二叉树中每个节点最多有两个子节点&#xff0c;分别称为左子节点和右子节点。 子树的顺序性&#xff1a;二叉树的子树有左右之分&#xff0c;且顺序不能颠倒。 递归定义&…

openpnp2.2 - 环境搭建 - 编译 + 调试 + 打包

文章目录 openpnp2.2 - 环境搭建 - 编译 调试 打包概述笔记前置任务克隆代码库切到最新的tag清理干净编译工程关掉旧工程打开已经克隆好的openpnp2.2工程将IDEA的SDK配置为openjdk23 切换中英文UI设置JAVA编译器 构建工程跑测试用例单步调试下断点导出工程的JAR包安装install…

【复现论文】DAVE

网站&#xff1a; GitHub - jerpelhan/DAVE 下载完以后&#xff0c;阅读 readme文件 新建终端&#xff0c;打印文件树&#xff0c;不包含隐藏文件&#xff1a; 命令&#xff1a;tree -I .* . ├── LICENSE ├── README.md ├── demo.py ├── demo_zero.py ├── mai…