RabbitMQ-如何保证消息不丢失

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确rabbitmq在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

RabbitMQ在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:RabbitMQ宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

1.生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置
 

spring:
  rabbitmq:
    publisher-confirm-type: correlated #开启生产者确认机制
    publisher-returns: true

这里的

publisher-confirm-type:有三种模式可以选择:

第一种是none:代表关闭confirm机制

第二种是 simple:表示同步阻塞并等待mq的回执消息,即发送完消息后不能干其他的事情,只能等待mq的回执,很显然这样效率很低。

第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到mq的回执。很明显这种效率要优于第二种。

配置return callback的代码如下,每个RabbitTemplate只能配置一个 代码如下
 

package com.itheima.publisher.com.it.heima.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/13 10:34
 * @Description:
 */
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息return的callback,  {},{},{},{},{}",
                        returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(),
                        returnedMessage.getMessage(),
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText());
            }
        });
    }
}

Confirm Callback需要每次发消息的时候都要配置(要制定发消息的id方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

 @Test
    void testConfirmCallback() throws InterruptedException {
        //创建cd 参数为每次发送消息的id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //添加confirmCallBack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                //这种情况一般是运行出现bug,一般不会发生。
                log.error("消息回调失败",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback 回执");
                if (result.isAck()){
                    //消息发送成功
                    log.debug("消息发送成功收到ack");
                }else {
                    //消息发送失败
                    log.debug("消息发送失败收到nack,原因:{}",result.getReason());
                    //TODO 重发消息等业务
                }
            }
        });

        rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);

        Thread.sleep(2000);
    }

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~如果业务需要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。

~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

2.消息持久化

由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq会对消息做迁移(page out 写入磁盘)从而引发mq阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择Durable,因为Transient是临时交换机,当mq宕机后会消失。

代码展示
 

 @Bean
    public DirectExchange simpleExchange(){
        //分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除
        return new DirectExchange("qjc.exchange",true,false);
    }

二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示

@Bean
    public Queue simpleQueue(){
        //springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的
        return QueueBuilder.durable("qjc.queue").build();
    }

三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示

 Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

Lazy Queue(惰性队列)

惰性队列的特征如下

~接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~消费者需要消息的时候才会从磁盘中取出数据加载到内存

~支持数百万条的消息存储

在mq3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

如果各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列

或用注解声明

    @RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.debug("接收到lazyqueue的消息" + msg);
    }

3.消费者确认机制

RabbitMQ支持消费者确认机制,即:当消费者处理消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

SpringAMQP已经实现了消息确认的功能,并且允许我们通过配置文件选择ack的处理方式,有三种方式。

- none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用  
- manual: 手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.  
当业务出现异常时,根据异常判断返回不同结果:  
- 如果是业务异常,会自动返回nack  
- 如果是消息处理或校验异常,自动返回reject

注意我们需要再消费者的配置文件中加入参数

这就是mq保证消息不丢失的一些方式和解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/356720.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

sqli.labs靶场(第18~22关)

18、第十八关 经过测试发现User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0加引号报错 这里我们闭合一下试试 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0,127.0.0.1,adm…

【爬坑】临时修复To connect to xxx insecurely, use `--no-check-certificate‘报错

解决方案&#xff1a;wget请求时跳过证书验证。 sudo vim /etc/wgetrc 插入一行&#xff1a; check_certificate off 重新运行wget命令即可。

STM32-电动车报警器

STM32-电动车报警器 1.振动传感器点亮LED灯 需求:当振动传感器接收到振动信号时&#xff0c;使用中断方式点亮LED1 //重写中断服务函数&#xff0c;如果检测到EXTI中断请求&#xff0c;则进入此函数 void HAL_GPIO_EXTI_Callback(uint16_t GPIO_Pin) {//一根中断线上接有多个…

RHCE练习3

1.基于域名www.openlab.com可以访问网站内容为 welcome to openlab 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c;基于www.openlab.com/student 网站访问学生信息&#xff0c;www.openlab.com/data网站访问教学资料www.openlab.com/mo…

Unity Editor 获取Screen.width, Screen.height 与Game视图对不上问题

之前在项目中写测试代码时 获取Screen.width 发现的跟game视图不一致 最后发现 通过在其他面板去触发函数 获取Screen.width 拿到的是其他面板的大小 而不是Game视图的大小

SpringCloud-高级篇(十八)

前面我们已经实现了多级缓存架构&#xff0c;大大提高了查询商品的性能&#xff0c;缓存在提高性能的同时&#xff0c;也带来了一致性的问题&#xff0c;比如说数据库发生了修改&#xff0c;这个时候&#xff0c;如果缓存依然是旧的数据&#xff0c;两者就产生了不一致&#xf…

任务修复实例(1)

实例1 任务名&#xff1a;增强防御&#xff08;quest_template.id 8490&#xff09; 涉及的两个数据表分别为 smart_script 和 creature_summon_groups smart_script Reactstate 取值参考源码 UnitDefines.h 的 ReactStates 定义&#xff0c;其中&#xff1a;0为被动&#…

【React教程】(3) React之表单、组件、事件处理详细代码示例

目录 事件处理示例1示例2示例3&#xff08;this 绑定问题&#xff09;示例4&#xff08;传递参数&#xff09;Class 和 Style 表单处理组件组件规则注意事项函数式组件&#xff08;无状态&#xff09;类方式组件&#xff08;有状态&#xff09;组件传值 Propsthis.props.childr…

【劳德巴赫 Trace32 高阶系列 1 -- svf 文件介绍】

文章目录 SVF 文件概述SVF文件的格式以及头Trace32 如何识别和使用SVF文件如何使用SVF文件SVF 命令支持总结小结总结SVF 文件概述 SVF 文件是一种ASCII文本文件,用于描述JTAG(Joint Test Action Group)测试动作的串行向量。这些文件包含了对JTAG TAP(Test Access Port)的…

寒假思维训练计划day16 A. Did We Get Everything Covered?

今天更新一道1月27号晚上div2的C题作为素材&#xff0c;感觉用到了我的构造题总结模型&#xff0c;我总结了一系列的模型和例题。 摘要&#xff1a; Part1 定义"边界贪心法" Part2 题意 Part3 题解 Part4 代码 Part5 思维构造题模型和例题 Part1 边界贪心…

生产解决方案:实现上传图片至主机文件夹下

1 需求&#xff1a; 目前需要实现&#xff0c;上传图片时候&#xff0c;自动根据图片上传地址&#xff0c;创建对应文件夹&#xff0c;例如&#xff1a;上传文件地址为&#xff0c;/2024/1/29/楼层1/1713.jpg&#xff0c;则存储结构应如下图所示。

[杂项:AD画板]B站_01

一、PCBA 1、快捷方式 CTRL鼠标滚轮&#xff1a;缩放界面 鼠标右键&#xff1a;拖拽界面 SHIFT鼠标滚轮&#xff1a;左右移动界面 CAPSLK鼠标滚轮&#xff1a;上下移动界面 CTRL鼠标左键&#xff1a;高亮选中接线网络 【】&#xff1a;调节高亮亮度&#xff0c;需要处于…

Sqli靶场 11--->22Less

打靶场&#xff0c;打靶场&#xff0c;打靶场&#xff0c;打靶场......靶场你别打我 球球 11.不用密码&#xff08;狂喜) 这一关知不知道账号密码都无所谓 那么我们就尝试一下报错类型&#xff0c;单引号报错&#xff0c;好&#xff0c;字符型 构造poc I_don_t_know_t…

C++ 之LeetCode刷题记录(二十二)

&#x1f604;&#x1f60a;&#x1f606;&#x1f603;&#x1f604;&#x1f60a;&#x1f606;&#x1f603; 开始cpp刷题之旅。 目标&#xff1a;执行用时击败90%以上使用 C 的用户。 112. 路径总和 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该…

[PHP]严格类型

PHP: 类型声明 - Manual

数字身份保护:Web3如何改变个人隐私观念​

随着Web3时代的来临&#xff0c;数字身份保护成为人们关注的焦点之一。Web3技术的引入不仅为个人隐私带来了新的挑战&#xff0c;同时也为我们重新思考和改变个人隐私观念提供了契机。本文将深入探讨Web3如何改变个人隐私观念&#xff0c;以及在数字身份保护方面的创新举措。 1…

计算机网络-H3C设备型号简介

H3C(新华三)的设备主要有&#xff1a;交换机、路由器、防火墙、无线AC、无线AP等组成。基本所有H3C设备都采用同一版本&#xff0c;基本命令都差不多&#xff0c;有V5、V7系列。 一、交换机系列 交换机有傻瓜式二层交换机&#xff0c;三层交换机&#xff0c;高端框式交换机。 交…

MATLAB环境下基于信号处理的EEG信号的睡眠纺锤波和K-复合波检测

睡眠纺锤波是正常人浅 - 中度睡眠脑电图的一种表现&#xff0c;随着睡眠深浅的变化而改变。睡眠纺锤波可以作为检测中枢神经机能正常与否的一个指标&#xff0c;对评估大脑发育与脑功能有重要意义。睡眠纺锤波在丘脑的后外侧腹侧核形成&#xff0c;通过投射系统投射到大脑皮层&…

CH395Q之CH395Q简介(一)

本节主要介绍以下内容&#xff1a; 1、TCP/IP协议栈是什么&#xff08;了解&#xff09; 2、CH395Q是什么&#xff08;了解&#xff09; 3、CH395Q工作命令&#xff08;熟悉&#xff09; 4、CH395Q & W5500 一、TCP/IP协议栈是什么 是一系列网络协议的总和&#xff0…