RabbitMQ高可用延迟消息惰性队列

目录

生产者确认

消息持久化

消费者确认

TTL延迟队列

TTL延迟消息

惰性队列


生产者确认

生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。

1、在发送消息服务的application.yml中添加配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 异步回调
    publisher-returns: true
    template:
      mandatory: true

2、确保消息到交换机

package cn.zsh.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirmCallBack() {
        // 1、定义消息
        String message = "ABC";

        // 设置一个消息的唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 3、confirm-ack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送异常:" + ex.toString());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                    // 说明到了交换机
                    System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());
                } else {
                    // 消息没有到交换机
                    System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());
                }
            }
        });

        // 4、消息发送
        rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);
    }


}

3、确保消息从交换机路由到队列

创建公开CommonConfig类

package cn.zsh.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * 发送消息到交换机没有到消息队列
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 1、获取RabbitTemplate(获取启动中的Bean的方式)
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 2、设置回调函数
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);
            }
        });
    }
}

消息持久化

消息持久化就是:确保消息不会在交换机或者队列中丢失。

案例:

使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的

package cn.zsh.mq.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 创建交换机与队列
 */
@Component
public class FoundQueue {

    @Bean
    public DirectExchange qiuExchange(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new DirectExchange("qiu.deirect",true,false);
    }

    @Bean
    public Queue piqiuQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("piqiu.queue").build();
    }
}

消费者确认

消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。

方案一:消费者确认

加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。

默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # none:投递完立马删除  auto:失败后让你再次重试(重新投递到队列)知道成功

方案二:消费者失败重试,重试固定次数后,则删除当前消息

加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列

加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。

1、在消费者(消息接收者)中加入配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

2、创建error交换机和队列并绑定

package cn.zsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ErrorConfig {

    /**
     * 定义交换机
     * @return
     */
    @Bean
    public DirectExchange errorExchange2(){
        return new DirectExchange("error.direct");
    }

    /**
     * 定义队列
     * @return
     */
    @Bean
    public Queue errorQueue2(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){
        return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");
    }
}

3、在启动类或者配置类中加入配置

package cn.zsh.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
    }
}

TTL延迟队列

延迟队列:

延迟队列就是消息发送到当前队列,会延迟一段时间,然后进行处理,具体如下图:

发送消息给指定队列,然后消息回延迟固定的时间,这个延迟时间是在对应的延迟消息队列中设置的。经过延迟以后,会将消息发送给其他的交换机,然后再路由给对应的消息队列,再进行消费,实现延迟的效果。

使用案例:

1、创建处理延迟消息的队列和交换机

当前交换机名称为:dl.direct

当前消息队列名称为:dl.queue

RoutingKey是:dl

package cn.zsh.mq.config;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 创建,处理延迟消息的,交换机和队列
 */
@Component
public class TtlConfig {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue",durable = "true"),    // 处理延迟消息的队列名称
            exchange = @Exchange(name = "dl.direct",durable = "true"),    // 处理延迟消息的交换机名称
            key = "dl"    // 当前的RoutingKey
    ))
    public void consumerDdlMessage(String message){
        System.out.println("接收到延迟消息:" + message);
    }
}

2、创建延迟交换机、队列,并绑定

package cn.zsh.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class publisherTtlConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");    // 延迟交换机名称
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")    // 延迟队列名称
                .ttl(10000) // 延迟队列的延迟时间(当前为10秒)
                .deadLetterExchange("dl.direct") // 设置延时时间到了以后发送到哪个交换机(处理延迟消息的交换机)
                .deadLetterRoutingKey("dl") // 设置具体到那个交换机的具体队列(处理延迟消息队列的RoutingKey)
                .build();
    }

    /**
     * 绑定延迟队列与交换机
     * @return
     */
    @Bean
    public Binding bingTtlQueue(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");    // 将延迟队列与交换机绑定,并设置RoutingKey(这个RoutingKey是当前延迟消息队列的RoutingKey)
    }
}

3、发送任意消息到当前延迟队列(ttl.queue)即可实现延迟效果。

延时时间到了以后,会将消息发送给(dl.direct)交换机,路由给RoutingKey为(dl)的消息队列dl.queue。有绑定了dl.queue的队列进行消息的最终处理。

TTL延迟消息

延迟消息:

延迟消息是给消息设置延迟时间,然后将消息发送给延迟队列,可以实现延迟。

注意!!!延迟消息的延迟时间,与延迟队列的延迟时间,哪个时间短,就使用哪个延迟时间。

例1:延迟消息设置延迟时间为5秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟5秒然后就会被处理。

例2:延迟消息设置延迟时间为20秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟10秒然后就会被处理。

使用案例:

延迟消息必须发送给延迟队列,因为延迟时间按最短的执行。发送给没有设置延迟时间的消息队列,会直接被消费。

package cn.zsh.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTTLMessage(){
        // 创建消息
        Message message = MessageBuilder
                .withBody("这是一条延时5秒后执行的消息".getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")// 延时时间
                .build();

        // 消息ID,需要封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 发送消息(这个消息必须要发给延时的消息队列)
        rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);
    }

}

惰性队列

惰性队列是为了防止消息大量积压的一种队列。

消息队列中的消息一般都存在内存中,而消息大量积压,就会产生很多问题,这时候可以使用惰性队列,惰性队列的消息保存在磁盘中。

创建惰性队列:

方案一:

在代码中声明

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CommonConfig {

    @Bean
    public Queue lazyQueue(){
        return QueueBuilder
                .durable("lazy.queue")
                .lazy()
                .build();
    }
}

方案二:

在浏览器中MQ的控制台声明

第一步:

第二步:

额外补充:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/921715.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式面试八股文(十)·RS485特性分析、CAN硬件同步和再同步遵从规则、SPI四种工作模式、错误帧基本概念

目录 1. 相较于传统的RS232接口&#xff0c;RS485的接口特性有哪些&#xff1f; 2. 在CAN接口协议中硬件同步和再同步需要遵从哪些规则&#xff1f; 3. 为什么位错误不能用于帧间隔&#xff1f; 4. SPI四种工作模式&#xff1f; 5. 关于错误帧&#xff0c;基本概念&a…

librdns一个开源DNS解析库

原文地址&#xff1a;librdns一个开源DNS解析库 – 无敌牛 欢迎参观我的个人博客&#xff1a;无敌牛 – 技术/著作/典籍/分享等 介绍 librdns是一个开源的异步多功能插件式的解析器&#xff0c;用于DNS解析。 源代码地址&#xff1a;GitHub - vstakhov/librdns: Asynchrono…

cookie反爬----普通服务器,阿里系

目录 一.常见COOKIE反爬 普通&#xff1a; 1. 简介 2. 加密原理 二.实战案例 1. 服务器响应cookie信息 1. 逆向目标 2. 逆向分析 2. 阿里系cookie逆向 1. 逆向目标 2. 逆向分析 实战&#xff1a; 无限debugger原理 1. Function("debugger").call() 2. …

大数据新视界 -- 大数据大厂之 Impala 性能优化:跨数据中心环境下的挑战与对策(上)(27 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

width设置100vh但出现横向滚动条的问题

在去做flex左右固定,中间自适应宽度的布局时, 发现这样一个问题: 就是我明明是宽度占据整个视口, 但是却多出了横向的滚动条 效果是这样的 把width改成100%,就没有滚动条了 原因: body是有默认样式的, 会有一定的默认边距, 把默认边距清除就是正常的了 同时, 如果把高度设…

百度在下一盘大棋

这两天世界互联网大会在乌镇又召开了。 我看到一条新闻&#xff0c;今年世界互联网大会乌镇峰会发布“2024 年度中国互联网企业创新发展十大典型案例”&#xff0c;百度文心智能体平台入选。 这个智能体平台我最近也有所关注&#xff0c;接下来我就来讲讲它。 百度在下一盘大棋…

探索 RocketMQ:企业级消息中间件的选择与应用

一、关于RocketMQ RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件&#xff0c;它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递&#xff0c;它是一个轻量级的、功能强大的消息队列系统&…

Android 基于Camera2 API进行摄像机图像预览

前言 近期博主准备编写一个基于Android Camera2的图像采集并编码为h.264的应用&#xff0c;准备分为三个阶段来完成&#xff0c;第一阶段实现Camera2的摄像机预览&#xff0c;第二阶段完成基于MediaCodec H.264编码&#xff0c;第三阶段完成基于MediaCodec H.264解码,针对不同…

QT 线程 QThread QT5.12.3环境 C++实现

一、线程 QT主线程称为GUI线程&#xff0c;负责初始化界面并监听事件循环&#xff0c;并根据事件处理做出界面上的反馈。如果把一些比较复杂或者费时的操作放在主线程中&#xff0c;界面就会出现卡顿或者无响应的现象。一般主线程负责影响界面上的操作&#xff0c; 子线程负责负…

【LLM】一文学会SPPO

博客昵称&#xff1a;沈小农学编程 作者简介&#xff1a;一名在读硕士&#xff0c;定期更新相关算法面试题&#xff0c;欢迎关注小弟&#xff01; PS&#xff1a;哈喽&#xff01;各位CSDN的uu们&#xff0c;我是你的小弟沈小农&#xff0c;希望我的文章能帮助到你。欢迎大家在…

Vue3-后台管理系统

目录 一、完成项目历程 1、构建项目 2、项目的自定义选项 3、 封装组件 4、配置对应页面的路由 5、从后端调接口的方式 二、引入Element Plus、Echarts、国际化组件 1、Element Plus安装 2、Echarts安装 3、国际化 三、介绍项目以及展示 1、项目是基于Vue3、Element …

C0030.Clion中运行提示Process finished with exit code -1073741515 (0xC0000135)解决办法

1.错误提示 2.解决办法 添加环境变量完成之后&#xff0c;重启Clion软件&#xff0c;然后就可以正常调用由mingw编译的opencv库了。

【es6进阶】vue3中的数据劫持的最新实现方案的proxy的详解

vuejs中实现数据的劫持,v2中使用的是Object.defineProperty()来实现的&#xff0c;在大版本v3中彻底重写了这部分&#xff0c;使用了proxy这个数据代理的方式&#xff0c;来修复了v2中对数组和对象的劫持的遗留问题。 proxy是什么 Proxy 用于修改某些操作的默认行为&#xff0…

Python浪漫之画明亮的月亮

目录 1、效果展示 2、完整版代码 1、效果展示 2、完整版代码 import turtledef draw_moon():# 设置画布turtle.bgcolor("black") # 背景颜色为黑色turtle.speed(10) # 设置绘制速度# 绘制月亮的外圈turtle.penup()turtle.goto(0, -100) # 移动到起始…

《线性代数的本质》

之前收藏的一门课&#xff0c;刚好期末复习&#xff0c;顺便看一看哈哈 课程链接&#xff1a;【线性代数的本质】合集-转载于3Blue1Brown官方双语】 向量究竟是什么 线性代数中最基础、最根源的组成部分就是向量&#xff0c;需要先明白什么是向量 不同专业对向量的看法 物理专…

鸿蒙系统ubuntu开发环境搭建

在RISC-V等平台移植鸿蒙系统OpenHarmony&#xff0c;需要使用linux环境进行代码的编译&#xff0c;为兼顾日常办公需要&#xff0c;可采用WindowsUbuntu虚拟机的混合开发的环境&#xff0c;通过网络及文件夹共享&#xff0c;在主机和虚拟机之间共享文件数据。 工具准备&#x…

智能停车解决方案之停车场室内导航系统(二):核心技术与系统架构构建

hello~这里是维小帮&#xff0c;如有项目需求和技术交流欢迎大家私聊我们&#xff01;点击文章最下方获取智慧停车场方案~撒花&#xff01; 随着城市化进程的加速&#xff0c;停车难问题日益凸显。智能停车系统作为缓解停车压力的有效手段&#xff0c;其核心技术与架构的构建至…

(免费送源码)计算机毕业设计原创定制:Java+JSP+HTML+JQUERY+AJAX+MySQL springboot计算机类专业考研学习网站管理系统

摘 要 大数据时代下&#xff0c;数据呈爆炸式地增长。为了迎合信息化时代的潮流和信息化安全的要求&#xff0c;利用互联网服务于其他行业&#xff0c;促进生产&#xff0c;已经是成为一种势不可挡的趋势。在大学生在线计算机类专业考研学习网站管理的要求下&#xff0c;开发一…

IDEA2023版本中如何启动项目的多个实例

假设现在要启动多个服务&#xff0c;例如简单的客户端和服务端&#xff0c;默认的idea是只能启动一个的&#xff0c;那么我们需要进行配置允许多个项目的同时启动&#xff0c;现在进行多实例的配置。 第一步 点击Edit Configurations 第二步 点击Modify options 第三步 勾选…

图的邻接矩阵和邻接表存储

目录 邻接矩阵存储法 简介 ​编辑 邻接矩阵举例 无向图邻接矩阵 有向图邻接矩阵 当各条边带有权值时 邻接矩阵算法实现 结构体定义和函数声明 函数的实现 邻接表存储法 简介 邻接表的算法实现 结构体定义和函数声明 函数的实现 邻接矩阵和邻接表的差别 邻接矩阵存…