RabbitMQ高级(MQ的问题,消息可靠性,死信交换机,惰性队列,MQ集群)【详解】

目录

一、MQ的问题

1. 问题说明

2. 准备代码环境

1 创建project

2 创建生产者模块

3 创建消费者模块

二、消息可靠性

1. 介绍

2. 生产者确认机制

3. MQ消息持久化

4. 消费者确认机制

5. 消费者auto模式的失败重试

6. 小结

三、死信交换机和延迟消息

1. 介绍

2. 消费失败成为死信

3. 延迟消息-通过死信交换机实现【了解】

4. 延迟消息-通过延迟消息插件实现【重点】

5. 小结

四、惰性队列

1. 消息堆积问题

2. 惰性队列

3. 小结

五、MQ集群【了解】

1. 集群分类

2. 普通集群

3. 镜像集群

4. 仲裁队列

5. RabbitTemplate连接MQ集群

大总结


一、MQ的问题

1. 问题说明

MQ在分布式项目中是非常重要的,它可以实现异步、削峰、解耦,但是在项目中引入MQ也会带来一系列的问题。

今天我们要解决以下几个常见的问题:

  • 消息可靠性问题:如何确保消息被成功送达消费者,并且被消费者成功消费掉

  • 延迟消息问题:如果一个消息,需要延迟15分钟再消费,像12306超时取消订单,如何实现消息的延迟投递

  • 消息堆积问题:如果消息无法被及时消费而堆积,如何解决百万级消息堆积的问题

  • MQ的高可用问题:如何避免MQ因为单点故障而不可用的问题

2. 准备代码环境

注意:为了后续的演示效果,暂不声明交换机、队列、绑定关系

在虚拟机里把旧的mq容器删除掉,重新创建:

#删除容器
docker stop mq
docker rm mq
#删除旧的数据卷
docker volume rm mq-plugins

#创建新容器
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 --restart=always \
 rabbitmq:3-management

1 创建project

  1. 删除project里的src文件夹

  2. 添加依赖坐标

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2 创建生产者模块

依赖

不需要添加,直接继承父工程的依赖

配置

修改application.yaml,添加配置:

spring:
  rabbitmq:
    host: 192.168.200.137
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321

引导类

package com.itheima;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

发消息测试类

package com.itheima;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;


@SpringBootTest
public class Demo01SimpleTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        rabbitTemplate.convertAndSend("demo.exchange", "demo", "hello");
    }
}

3 创建消费者模块

依赖

不需要添加,直接继承父工程的依赖

配置

修改application.yaml,添加配置:

spring:
  rabbitmq:
    host: 192.168.200.137
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
    listener:
      simple:
        prefetch: 1

引导类

package com.itheima;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

创建Listener

package com.itheima.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DemoListener {

    @RabbitListener(queuesToDeclare = @Queue("demo.queue"))
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "demo.queue", msg);

        System.out.println("模拟:处理消息中……");

        log.info("消息处理完毕");
    }
}

二、消息可靠性

1. 介绍

当我们的生产者发送一条消息后,这条消息最终会到达消费者。那么在这整个过程中任何一个环境出错,都可能会导致消息的丢失,而导致不够可靠。

可能出问题的环节有:

  • 生产者发送消息到Broker时 丢失:

    消息未送达Exchange

    消息到达了Exchange,但未到达Queue

  • Broker收到消息后丢失:

    MQ宕机,导致未持久化保存消息

  • 消费者从Broker接收消息丢失:

    消费者接收消息后,尚未消费就宕机

针对这些问题,RabbitMQ给出了对应的解决方案

  • 生产者发送消息丢失:使用生产者确认机制

  • Broker接收消息丢失:MQ消息持久化

  • 消费者接收消息丢失:消费者确认机制与失败重试机制

2. 生产者确认机制

1 介绍

在了解生产者确认机制之前,我们需要先明确一件事:生产者发送的消息,怎么样才算是发送成功了?

消息发送成功,有两个标准

  • 消息被成功送达Exchange

  • 消息被成功送达匹配的Queue

以上两个过程任何一步失败,都认为消息发送失败了。

生产者确认机制,可以确保生产者明确知道消息是否成功发出,如果未成功的话,是哪一步出现问题。然后开发人员就可以根据投递结果做进一步处理。

2 Confirm Callback机制

说明

使用发送者的ConfirmCallback机制,用于让生产者确认 消息是否送达交换机:如果消息成功送达交换机,MQ会给生产者返回一个ack(确认)。当生产者得到ack之后,就可以确定消息成功送达交换机了

使用步骤,在生产者一方做如下操作:

  1. 修改配置文件,指定 confirm确认的处理方式,使用异步方式

  2. 发送消息时,配置CorrelationData,用于处理确认结果

示例

1. 修改配置文件

修改生产者一方的配置文件application.yaml,增加如下配置

  • 如果配置为simple,表示使用同步方式处理确认的结果

  • 如果配置为correlated,表示使用异步方式处理确认的结果,但是发送消息时需要我们准备一个CorrelationData对象,用于接收确认结果

spring:
  rabbitmq:
    #生产者确认机制类型。simple同步方式确认;correlated异步方式确认,将使用CorrelationData接收确认结果
    publisher-confirm-type: correlated

2. 发送消息

package com.itheima;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;


@SpringBootTest
public class DemoMessageTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        //准备一个CorrelationData对象
        CorrelationData data = new CorrelationData();
        //设置消息的id。为了防止ACK结果混乱,我们给每条消息指定一个唯一标识
        data.setId(UUID.randomUUID().toString());
        //设置ConfirmCallback回调。当消息发送后,对应的回调方法将会执行
        data.getFuture().addCallback(
                result -> {
                    if (result.isAck()) {
                        log.info("消息已发出,成功到达交换机。消息id={}", data.getId());
                    }else{
                        log.warn("消息已发出,但未到达交换机。消息id={},原因是:{}", data.getId(), result.getReason());
                    }
                },
                ex -> {
                    log.error("消息未发出,出现异常", ex);
                }
        );

        rabbitTemplate.convertAndSend("demo.exchange", "demo", "hello", data);
    }
}

 

3. 测试结果-未送达交换机的结果

首先,我们先要保证 demo.exchange交换机不存在,再运行单元测试方法,发送消息。可看到如下结果

4. 测试结果-成功送达交换机

然后,我们再创建配置类,声明一个名称为demo.exchange的交换机

package com.itheima.config;

import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitBindingConfig {

    @Bean
    public TopicExchange demoTopicExchange(){
        return ExchangeBuilder.topicExchange("demo.exchange").build();
    }
}

然后重新发送消息,可看到如下结果:

3 Return Callback机制

说明

使用生产者的Confirm Callback机制,可以确保消息成功送达交换机。但是消息是否被送达队列呢?我们同样需要进行确认。为了解决这个问题,RabbitMQ提供了Return Callback机制:

  • 如果消息被交换机成功路由到队列,一切正常

  • 如果消息路由到队列时失败了,Return回调会把消息回退给生产者。生产者可以自行决定后续要如何处理

使用步骤,在生产者一方操作:

  1. 修改配置文件,开启return callback机制,并设置强制return back

  2. 创建配置类,预先设置Return回调函数  

示例

1. 修改配置文件

修改生产者的配置文件application.yaml,开启return回调机制

spring:
  rabbitmq:
    publisher-returns: true #开启生产者return回调机制
    template:
      mandatory: true #开启强制回调。如果为true,消息路由失败时会调用ReturnCallback回退消息;如果为false,消息路由失败时会丢弃消息

2. 设置Return回调

当消息未被路由到Queue时,Return回调会执行

注意:

  • 只要给RabbitTemplate对象设置一次回调函数即可,并不需要每次发送消息都设置Return回调。所以我们在配置类里给RabbitTemplate设置一次即可

  • 给单例的RabbitTemplate对象设置Return回调的方式有多种,使用哪种都行,只要能够设置成功即可

创建一个配置类,在配置类里设置Return回调函数:

package com.itheima.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.warn("消息未被路由到队列,replyCode={}, replyText={}, exchange={}, routingKey={}, msg={}",
                    replyCode, replyText, exchange, routingKey, message);
        });
    }
}

3. 测试结果-未路由到队列

首先,我们要先保证交换机没有绑定队列demo.queue,再运行单元测试方法,发送消息,可看到如下结果:

4. 测试结果-成功路由到队列

然后,我们再找到RabbitBindingConfig配置类,

增加一个队列demo.queue,并绑定给交换机demo.exchange,最终代码如下:

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitBindingConfig {

    @Bean
    public TopicExchange demoTopicExchange(){
        return ExchangeBuilder.topicExchange("demo.exchange").build();
    }
    
    @Bean
    public Queue demoQueue(){
        return QueueBuilder.durable("demo.queue").build();
    }
    
    @Bean
    public Binding demoQueueBinding(Queue demoQueue, TopicExchange demoTopicExchange){
        return BindingBuilder.bind(demoQueue).to(demoTopicExchange).with("demo");
    }
}

然后再发送消息,不报错,就说明路由成功了。可以去RabbitMQ控制台上查看消息

3. MQ消息持久化

1 介绍

通过生产者确认机制,我们可以把消息投递到队列中。但是如果这时候MQ宕机了,队列里的消息同样有可能会丢失。这是因为:

  • 交换机可能是非持久化的。MQ一重启,交换机就消失了

  • 队列可能是非持久化的。MQ一重启,队列就消失了

  • 消息可能是非持久化的(在RabbitMQ内存中)。MQ一重启,消息就丢失了

所以我们必须要保证:交换机、队列、消息都是持久化的。

但实际上,我们创建交换机、队列、消息的方式都是持久化创建的,所以以下内容我们仅仅了解即可

2 交换机持久化

@Bean
public TopicExchange demoTopicExchange(){
    return ExchangeBuilder
            .topicExchange("demo.exchange")
        	//设置交换机为持久化的,重启也不消息。
        	//但其实可以不设置,因为交换机默认就是持久化的
            .durable(true)
            .build();
}

3 队列持久化

@Bean
public Queue demoQueue(){
    return QueueBuilder
            //使用durable("队列名称")方法  创建的就是持久化队列
            .durable("demo.queue")
            .build();
}

4 消息持久化

Message message = MessageBuilder
        .withBody("hello".getBytes())
        //设置为持久化消息
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();
rabbitTemplate.convertAndSend("demo.exchange", "demo", message, data);

4. 消费者确认机制

1 介绍

RabbitMQ采用的是阅后即焚模式,即只要消息被消费成功获取,MQ就会立刻删除掉这条消息。所以,我们必须保证,消息确实成功的被消费掉了。为此,RabbitMQ也提供了ack确认机制:

  • RabbitMQ将消息投递给消费者

    • 消费者成功处理消息

    • 消费者向RabbitMQ返回ack确认

  • RabbitMQ收到ack确认,删除消息

从上述过程中我们可以得到,消费者返回ack的时机是非常关键的:如果消费者仅仅是得到消息还未处理,就给RabbitMQ返回ack,然后消费者宕机了,就会导致消息丢失。

SpringAMQP允许消费者使用以下三种ack模式:

  • manual:手动ack。由开发人员在处理完业务后,手动调用API,向RabbitMQ返回ack确认

  • auto:自动ack【默认】。当消费者方法正常执行完毕后,由Spring自动给RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack(未消费成功)

  • none:关闭ack。MQ假定所有消息都会被成功消费,因为RabbitMQ投递消息后会立即删除

我们一般使用默认的auto模式

2 none模式

修改配置文件

修改消费者一方的配置文件application.yaml,设置消费者确认模式为none。添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none #设置 消费者确认模式为none

修改消费者

修改消费者Listener代码,模拟处理消息出现异常的情况

package com.itheima.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DemoListener {

    @RabbitListener(queues = "demo.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "simple.queue", msg);

        //模拟:处理消息中出现了异常
        int i = 1/0;

        log.info("消息处理完毕");
    }
}

测试效果

  1. 启动消费者服务

  2. 运行生产者单元测试类,发送消息

  3. 查看消费者的运行日志控制台

    4. 去RabbitMQ控制台(http://192.168.200.137:15672)查看队列里的消息,发现队列里没有消息。消息还没有被成功处理,就丢失了  

3 auto模式

修改配置文件

修改消费者一方的配置文件application.yaml,设置消费者确认模式为auto

 spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto #设置 消费者确认模式为auto

修改消费者

代码和刚刚‘none’模式的代码相同,并没有调整。仍然是:模拟处理消息过程中出错

package com.itheima.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DemoListener {

    @RabbitListener(queues = "demo.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("从{}队列接收到消息:{}", "simple.queue", msg);

        //模拟:处理消息中出现了异常
        int i = 1/0;

        log.info("消息处理完毕");
    }
}

测试效果

  1. 重启消费者服务

  2. 运行生产者的单元测试方法,发送消息

  3. 查看消费者的运行日志控制台,发现程序在不停的报错。这是因为

    RabbitMQ在投递消息之后,消费者收到消息后抛了异常,导致没有给RabbitMQ返回ack确认

    RabbitMQ尝试重新投递消息,消费者收到消息后又抛了异常……

5. 消费者auto模式的失败重试

1 介绍

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

 

我们可以利用Spring本身的retry机制,在消费者出现异常后,在消费者内部进行本地重试;而不是让消息重新入队列,然后让RabbitMQ重新投递。

2 消费者本地重试

只要修改消费者一方的配置文件,设置消费者本地重试,并配置重试参数

修改消费者一方的配置文件application.yaml,增加如下配置:

 spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true  #开始 消费者本地的失败重试
          initial-interval: 1000 #初始的失败等待时长,单位是ms,默认1000
          multiplier: 1 #与上次重试间隔时长的倍数(1表示每次重试的时间间隔相同)。默认1
          max-attempts: 3 #最多重试几次。默认3
          stateless: true #是否无状态。默认true。如果涉及事务,要改成false

重启消费者服务后,发现:

  1. 消费者重复获取了3次消息,在3次尝试中并没有抛出异常

  2. 在3次尝试都失败后,才抛出了RejectAndDontRequeueRecoverer异常

    3. 然后再去RabbitMQ控制台(http://192.168.200.137:15672),从队列里查看消息,发现消息已经被删除了

 

3 失败后的消息恢复策略

在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。

但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息。是默认的处理策略

  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,立即重新入队requeue

  • RepublishMessageRecoverer:重试次数耗尽后,将失败消息投递到指定的交换机

实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。

使用步骤:

  1. 声明消息的恢复策略

  2. 声明交换机、队列、绑定关系

声明消息恢复策略

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMsgRecovererConfig {

    /**
     * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略
     */
    @Bean
    public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct.exchange", "error");
    }

    @Bean
    public Queue errorQueue(){
        return QueueBuilder.durable("error.queue").build();
    }

    @Bean
    public DirectExchange errorExchange(){
        return ExchangeBuilder.directExchange("error.direct.exchange").build();
    }

    @Bean
    public Binding errorQueueBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }
}

测试效果

  1. 消费者收到消息,模拟报错。耗尽重试次数

  2. 打开RabbitMQ控制台(http://192.168.200.137:15672),查看错误队列里的消息

6. 小结

消息可靠性:从4个环节处理

  • 生产者:消息确实送达交换机

    使用:confirm回调机制

    效果:无论消息是否被送达交换机,生产者都可以得到一个结果

    步骤:

    1. 修改生产者的配置文件,开启confirm机制设置为correlated (异步确认)

    2. 发送消息时,要给每条消息附加一个CorrelationData对象

CorrelationData data = new CorrelationData();
data.setId("消息的id唯一标识");
data.getFuture().addCallback(
	result->{
        //如果发送消息没有异常,这里的代码会执行
        if(result.isAck()){
            //消息成功送达交换机了
        }else{
            //消息没有送达交换机:result.getReason()获取失败原因
        }
    },
    ex->{
        //如果发送消息出现异常,这里的代码会执行
    }
);

生产者:消息被路由到队列

使用:return回退机制

效果:如果消息被交换机成功路由到了队列,没有任何回退;如果消息没有被路由到队列,就会被退回,return回调函数可以接收到被退回的消息

步骤:

  1. 修改生产者的配置文件,开启return回退机制

spring:
  rabbitmq:
    publisher-returns: true #开启消息回退机制
    template:
      mandatory: true #消息回退机制

     2. 修改IoC容器里的唯一的RabbitTemplate对象,设置return回调

@Configuration
public class RabbitReturnConfiguration implements ApplicationContextAware{
    public void setApplicationContext(ApplicationContext ioc){
        RabbitTemplate rabbitTemplate = ioc.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.warn("消息没有被路由到队列,被退回了。message={}, replyCode={}, replyText={}, exchange={}, routingKey={}",
                    message,replyCode, replyText, exchange, routingKey);
        });
    }
}
  • 中间件:持久化存储消息、队列、交换机

    使用:SpringAMQP框架本身创建出来的队列、交换机、消息都是持久化的

    用法:不需要我们额外做什么事,就是持久化的

  • 消费者:确保消费者成功接收并成功处理消息;即使最终处理失败,也需要存储到某个地方等待人工干预

    使用:消费者ack确认机制,来保证MQ在我们成功处理消息之后再删除

    ack确认机制:

    • manual:手动确认,不常用

    • none:不确认。MQ只要把消息投递给消费者,就直接删除掉

    • auto:自动确认,默认。效果是:

      • 如果消费者方法处理消息成功,SpringAMQP会给MQ返回ack确认。MQ收到ack之后删除消息

      • 如果消费者方法处理消息出现异常,SpringAMQP会把消息重新入队requeue,重新投递

      在auto基础上,开启“消费者本地重试”。效果是:

      • 消息不会重新入队,而是由SpringAMQP再次调用我们的消费者方法。

      • 要么:在重试次数耗尽之前,消费成功,就结束

      • 要么:在重试次数耗尽之后,仍然消费失败,会走最终的消息回收策略,默认是直接丢弃消息

      在“消费者本地重试”基础上,再添加最终的“消息回收策略”

      • RejectAndDontRequeueRecoverer:默认策略。重试次数耗尽后,向MQ发送reject,直接丢弃消息

      • ImmediateRequeueMessageRecoverer:把消息再重新入队,重新投递

      • RepublishMessageRecoverer:把消息重新发送到一个错误消息队列里,等待人工干预

    步骤:

    1. 修改消费者的配置文件

      设置确认机制为auto

      开启消费者本地重试

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto #设置消费者确认机制为auto
        retry:
          enabled: true #开启消费者本地重试。当消费者方法抛出异常后,不会把消息requeue,而是由SpringAMQP再次调用消费者方法
          initial-interval: 1000 #失败重试的初始时间间隔,默认1000ms
          multiplier: 2 #失败重试的时间间隔,是上次时间间隔的多少倍。默认是1
          max-interval: 10000 #失败重试的最大时间间隔,单位ms.默认是10000ms
          max-attempts: 3 #失败重试的最大次数。一旦达到最大次数仍然失败,默认就会丢弃消息。默认3

    2. 在消费者一方配置最终的消息回收策略

@Configuration
public class RabbitErrorMsgConfig {

    @Bean
    public MessageRecoverer errorMessageRecoverer(RabbitTemplate rabbitTemplate){
        //如果一个消息,在消费者本地重试次数耗尽之后,最终的处理策略是:
        // 使用RabbitTemplate,把这条消息发送到 error.exchange交换机, 消息的路由是key是:error.key
        // 相当于rabbitTemplate.convertAndSend("error.exchange","error.key", 这条错误消息)
        return new RepublishMessageRecoverer(rabbitTemplate, "交换机名", "路由key");
    }
}

三、死信交换机和延迟消息

1. 介绍

1 什么是死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时未消费

  • 要投递的队列消息满了,无法投递

默认情况下,死信会直接丢弃。但是如果配置了死信交换机和死信队列,死信将会被投递到死信队列里

2 死信交换机

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信;因为demo.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机;如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

注意:

  • 死信交换机,其实是普通交换机。只是用于处理死信,所以称为死信交换机

  • 死信队列,其实也是普通队列。只是用于处理死信,所以称为死信队列

3 准备基础代码

准备一个新的工程,用于演示延迟消息

2. 消费失败成为死信

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给demo.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

1 消费者:设置消息确认机制

使用auto确认机制并开启本地重试

spring:
  rabbitmq:
    host: 192.168.150.131
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
    listener:
      simple:
        acknowledge-mode: auto #使用auto确认机制
        retry:
          enabled: true #开启消息本地重试。默认重试3次

使用RejectAndDontRequeueRecoverer策略

SpringAMQP默认使用的消息恢复策略RejectAndDontRequeueRecoverer,在本地重试次数耗尽后,发送reject给RabbitMQ。

所以,不需要设置消息恢复策略

2 生产者:配置死信交换机

修改生产者的配置类,在声明队列时绑定死信交换机

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitBindingConfig {

    @Bean
    public TopicExchange demoTopicExchange(){
        return ExchangeBuilder
                .topicExchange("demo.exchange").durable(true).build();
    }

    @Bean
    public Queue demoQueue(){
        return QueueBuilder
                .durable("demo.queue")
                //给队列指定死信交换机,名称是dl.exchange
                .deadLetterExchange("dl.exchange")
	            //把消息投递给死信交换机时,消息的routingKey是dl
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding demoQueueBinding(Queue demoQueue, TopicExchange demoTopicExchange){
        return BindingBuilder.bind(demoQueue).to(demoTopicExchange).with("demo");
    }

    /**
     * 声明一个死信交换机,使用DirectExchange交换机,名称为dl.exchange
     */
    @Bean
    public DirectExchange dlExchange(){
        return ExchangeBuilder.directExchange("dl.exchange").build();
    }
    /**
     * 声明一个死信队列,名称为dl.queue
     */
    @Bean
    public Queue dlQueue(){
        return QueueBuilder.durable("dl.queue").build();
    }
    /**
     * 把死信交换机与死信队列进行绑定,设置路由key为dl
     */
    @Bean
    public Binding dlQueueBinding(DirectExchange dlExchange, Queue dlQueue){
        return BindingBuilder.bind(dlQueue).to(dlExchange).with("dl");
    }
}

3 测试效果

  1. 先去RabbitMQ控制台页面(http://192.168.200.137:15672)中,demo.queue队列删除掉。因为之前声明的队列并没有绑定死信交换机,必须要删除掉,重新声明才行

  2. 运行生产者的单元测试方法,发送消息

  3. 启动消费者服务,开始从demo.queue中接收消息但出现异常;在耗尽重试次数后,因为恢复策略是默认的RejectAndDontRequeueRecoverer成为死信。消息被投递到死信交换机,然后路由到死信队列

  4. 在RabbitMQ控制台中查看死信队列dl.queue,可看到死信队列中有一条消息

3. 延迟消息-通过死信交换机实现【了解】

1 说明

如果一条消息超时未被消费,也会成为死信。而超时有两种方式:

  • 消息所在的队列设置了超时

  • 消息本身设置了超时

我们将按照如下设计,演示超时成为死信的效果:

2 队列TTL示例

注意:为了方便演示死信队列的效果,我们将创建一个新的project,准备新的代码环境。参考第一章节中准备的代码环境。

生产者

声明队列和交换机

  • 声明死信交换机与死信队列,并绑定

  • 声明普通交换机与普通队列,并绑定。注意,声明普通队列时要:

    设置队列的TTL

    给队列设置死信交换机与死信的RoutingKey

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitBindingConfig {

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                //设置队列的超时时间为10s
                .ttl(10*1000)
                //给队列设置死信交换机,名称为dl.ttl.exchange;设置投递死信时的RoutingKey为ttl
                .deadLetterExchange("dl.ttl.exchange").deadLetterRoutingKey("ttl")
                .build();
    }

    @Bean
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange("ttl.exchange").build();
    }

    @Bean
    public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("demo");
    }

    //--------------------死信交换机、死信队列、死信绑定关系------------------------------

    @Bean
    public DirectExchange dlTtlExchange(){
        return ExchangeBuilder.directExchange("dl.ttl.exchange").build();
    }

    @Bean
    public Queue dlTtlQueue(){
        return QueueBuilder.durable("dl.ttl.queue").build();
    }

    @Bean
    public Binding tlTtlBinding(Queue dlTtlQueue, DirectExchange dlTtlExchange){
        return BindingBuilder.bind(dlTtlQueue).to(dlTtlExchange).with("ttl");
    }
}

发送消息

注意:声明队列时已经给队列设置了TTL,所以发送消息时不需要给消息设置TTL

package com.itheima;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.LocalTime;
import java.util.UUID;


@Slf4j
@SpringBootTest
public class DemoMessageTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend("ttl.exchange", "demo", "demo dead letter,发送时间是:" + LocalTime.now());
    }
}

消费者

package com.itheima.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;


@Slf4j
@Component
public class DemoListener {

    /**
     * 监听死信队列
     */
    @RabbitListener(queuesToDeclare = @Queue("dl.ttl.queue"))
    public void handleDemoQueueMsg(String msg){
        log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "dl.ttl.queue", msg);
    }
}

测试效果

  1. 运行生产者的单元测试代码,发送一条消息

  2. 启动消费者服务,等待接收消息。发现消费者在10s后收到了消息

3 消息TTL示例

生产者

声明队列和交换机

可以直接使用刚刚的“队列TTL示例”中的配置,与之相比,仅仅是声明队列时不再设置队列的TTL。代码如下:

package com.itheima.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitBindingConfig {

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                //给队列设置死信交换机,名称为dl.ttl.exchange;设置投递死信时的RoutingKey为ttl
                .deadLetterExchange("dl.ttl.exchange").deadLetterRoutingKey("ttl")
                .build();
    }

    @Bean
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange("ttl.exchange").build();
    }

    @Bean
    public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("demo");
    }

    //--------------------死信交换机、死信队列、死信绑定关系------------------------------

    @Bean
    public DirectExchange dlTtlExchange(){
        return ExchangeBuilder.directExchange("dl.ttl.exchange").build();
    }

    @Bean
    public Queue dlTtlQueue(){
        return QueueBuilder.durable("dl.ttl.queue").build();
    }

    @Bean
    public Binding tlTtlBinding(Queue dlTtlQueue, DirectExchange dlTtlExchange){
        return BindingBuilder.bind(dlTtlQueue).to(dlTtlExchange).with("ttl");
    }
}

发送消息

发送消息时设置消息的TTL

package com.itheima;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.LocalTime;
import java.util.UUID;

@Slf4j
@SpringBootTest
public class DemoMessageTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        String msgStr = "消息TTL demo,发送时间是:" + LocalTime.now();
        
        Message message = MessageBuilder
                .withBody(msgStr.getBytes())
                //设置消息TTL为5000毫秒
                .setExpiration("5000")
                .build();

        //发送消息时:
        //  如果消息和队列都设置了TTL,则哪个TTL短,哪个生效
        //  如果消息和队列只设置了一个TTL,则直接以设置的为准
        rabbitTemplate.convertAndSend("ttl.exchange", "demo", message);
    }
}

 

消费者

直接使用刚刚“队列TTL示例”中的消费者代码即可。代码如下:

package com.itheima.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * @author liuyp
 * @date 2023/05/21
 */
@Slf4j
@Component
public class DemoListener {

    /**
     * 监听死信队列
     */
    @RabbitListener(queues = "dl.ttl.queue")
    public void handleDemoQueueMsg(String msg){
        log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "dl.ttl.queue", msg);
    }
}

测试效果

  1. 运行生产者的单元测试代码,发送消息

  2. 启动消费者服务,开始监听消息。发现消费者在5s后收到了消息

4. 延迟消息-通过延迟消息插件实现【重点】

1 介绍

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。延迟队列的使用场景非常多,例如:

  • 用户下单,如果用户在15 分钟内未支付,则自动取消订单

  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟消息的需求非常多,所以RabbitMQ官方也推出了一个延迟队列插件,原生支持延迟消息功能。插件名称是:rabbitmq_delayed_message_exchange

官网插件列表地址:Community Plugins | RabbitMQ

 

2 安装

大家可以去对应的GitHub页面下载3.8.9版本的插件,这个版本的插件对应RabbitMQ3.8.5以上版本:Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

也可以直接使用资料中提供好的插件。

安装步骤如下:

  1. 查看mq的数据卷目录

    因为我们的MQ是使用docker安装的,而创建mq容器时挂载了名称为mp-plugins的数据卷。

    我们要先查看一下数据卷的位置,执行命令:docker volumes inspect mq-plugins

    可知数据卷的目录在/var/lib/docker/volumes/mq-plugins/_data

     2. 使用finalshell或其它工具,把插件上传到虚拟机的/var/lib/docker/volumes/mq-plugins/_data目录中

     3. 安装延迟消息插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3 原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息

  • 判断消息是否具备x-delay属性

  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间

  • 返回routing not found结果给消息发送者

  • x-delay时间到期后,重新投递消息到指定队列

4 使用示例

插件的使用步骤也非常简单:

  • 声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true,然后声明队列与其绑定即可。

  • 发送消息时,指定一个消息头 x-delay,值是延迟的毫秒值

声明队列和交换机

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder.directExchange("delay.direct.exchange")
            //设置为“延迟交换机”
            .delayed()
            .build();
}
@Bean
public Queue delayQueue(){
    return QueueBuilder.durable("delay.queue").build();
}
@Bean
public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange){
    return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay");
}

也可以使用注解方式声明,示例代码:

@RabbitListener(bindings = @QueueBinding(
     value = @Queue("delay.queue"),
     exchange = @Exchange(value = "delay.direct.exchange", type = ExchangeTypes.DIRECT, delayed = "true"),
     key = "delay"
))
public void handleDelayQueueMsg(String msg){
 log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "delay.queue", msg);
}

发送消息

发送消息时,必须指定x-delay头,设置延迟时间

@Test
public void test2(){
    String msgStr = "这是一条延迟消息,发送时间是:" + LocalTime.now();
    Message message = MessageBuilder
            .withBody(msgStr.getBytes())
            .setHeader("x-delay", 10000)
            .build();
    rabbitTemplate.convertAndSend("delay.direct.exchange", "delay", message);
}

监听消息

@RabbitListener(queues="delay.queue")
public void handleDelayQueueMsg(String msg){
    log.info("现在时间是:{},从{}队列接收到消息:{}", LocalTime.now(), "delay.queue", msg);
}

测试效果

  1. 运行生产者的单元测试方法,发送消息

  2. 启动消费者服务,开始监听消息。发现10s后收到了消息

5. 小结

 

延迟消息使用场景:

  • 订单超时后自动取消。15分钟未支付,就取消订单

    下单成功以后,向MQ里发送一条延迟消息:xx订单需要取消,延迟时间15分钟

    15分钟以后,消息就到达消费者:消费者先判断此订单的状态,如果还是未支付状态,就取消订单

  • 所有需要延迟一定时间再做一些事的场景,都可以使用延迟消息

延迟消息的实现方案:使用RabbitMQ

  • 使用死信交换机和死信队列:太麻烦

  • 使用延迟消息插件

    使用@Bean方式声明

//声明交换机时 调用delayed()
@Bean
public XxxExchange exchange(){
    return ExchangeBuilder.xxxExchange("").delayed().build();
}
//声明队列,声明绑定关系:略,和之前一样

//发消息,需要给消息设置x-delay消息头
Message msg = MessageBuilder.withBody(消息内容).setHeader("x-delay",延迟毫秒值).build();
rabbitTemplate.convertAndSend("交换机","路由key", msg);

使用@RabbitListener声明

//发消息:需要给消息设置x-delay消息头
Message msg = MessageBuilder.withBody(消息内容).setHeader("x-delay",延迟毫秒值).build();
rabbitTemplate.convertAndSend("交换机","路由key", msg);


//收消息
@RabbitListener(bindings=@QueueBinding(
	value=@Queue("队列名称"),
    exchange=@Exchange(value="交换机名",type=ExchangeType.类型, delayed="true"),
    key="路由key"
))
public void listener(String msg){
    
}

 

四、惰性队列

1. 消息堆积问题

1 什么是消息堆积

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。

RabbitMQ的队列溢出的默认处理方式是:丢弃队首的消息(最老的),被丢弃掉( 如果配置了死信交换机,那么将会成为死信)

2 如何解决消息堆积

解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式

  • 扩大队列容积,提高堆积上限

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存

  • 消费者要消费消息时才会从磁盘中读取并加载到内存

  • 支持数百万条的消息存储

2. 惰性队列

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。指定属性的方式有三种:

  • 命令行方式,把一个已存在的队列修改为惰性队列

  • 基于@Bean方式声明惰性队列

  • 基于注解方式声明惰性队列

1 命令行方式

需要进入mq容器,然后执行命令:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues          

说明:

  • rabbitmqctl:RabbitMQ的命令行管理工具

  • set_policy:设置策略。后边跟的Lazy是策略名称

  • ^lazy-queue$:是正则表达式,用于匹配队列名称。匹配上的队列都会被修改

  • {"queue-mode":"lazy"}:设置队列为lazy

  • --apply-to queues:命令的作用目标对象,是对所有队列做以上操作

2 @Bean方式

在声明队列时,调用一下lazy()方法即可

//---------------------Lazy Queue-------------------------
@Bean
public Queue lazyQueue(){
    return QueueBuilder.durable("lazy.queue")
            //设置为惰性队列
            .lazy()
            .build();
}

@Bean
public DirectExchange lazyExchange(){
    return ExchangeBuilder.directExchange("lazy.exchange").build();
}

@Bean
public Binding lazyBinding(Queue lazyQueue, DirectExchange lazyExchange){
    return BindingBuilder.bind(lazyQueue).to(lazyExchange).with("lazy");
}

 

3 注解@RabbitListener方式

在@RabbitListener注解中声明队列时,添加x-queue-mode参数

@RabbitListener(queuesToDeclare = @Queue(
        value = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void handleLazyQueueMsg(String msg) {
    log.info("从{}队列接收到消息:{}", "lazy.queue", msg);
}

3. 小结

解决消息堆积问题:

  • 增加消费者,提升消费能力

  • 提升队列的容积,可以使用惰性队列

惰性队列的用法:

  • 声明队列时候:QueueBuilder.durable("队列名称").lazy().build();

  • 生产者发消息:和之前一样

  • 消费者收消息:和之前一样

五、MQ集群【了解】

1. 集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力与堆积能力。

  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步性能低,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:

  • 仲裁队列:用来代替镜像集群,底层采用Raft协议同步性能更高,丢失数据的风险更小

2. 普通集群

1 介绍

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 把队列和交换机分散到不同的节点上

  • 在各个节点间共享 元数据,包括:交换机、队列元信息。不包含队列中的消息。

  • 当访问集群某节点时,如果队列不在该节点,该节点将会承担路由的作用,从数据所在节点中获取数据并返回

优点:

  • 多个集群共同提供队列服务,提高了消息吞吐量和并发能力

  • 提高了MQ的可用性,某个节点宕机,还有其它节点可提供服务,整个MQ不会彻底宕机

缺点:

  • 队列没有备份,所以一旦队列所在节点宕机,队列中的消息就会丢失

普通集群的架构如图所示:

2 部署

我们的计划部署3节点的mq集群:

集群中的节点标示默认都是:rabbit@[hostname],因此以上三个节点的名称分别为:

  • rabbit@mq1

  • rabbit@mq2

  • rabbit@mq3

获取cookie

集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信,集群每个节点必须有相同的Cookie。cookie 只是一串最多 255 个字符的字母数字字符。

我们先在之前运行中的mq容器中获取一个Cookie值,作为稍后我们要搭建的集群的Cookie。

  • 执行命令:docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

 

可以看到Cookie值为:TCXMOWUEDEXDZSGZHUZG

删除旧容器

接下来,停止并删除当前的mq容器,我们重新搭建集群

执行命令:docker rm -f mq

准备集群配置

1. 准备三个文件夹

 mkdir ~/01classic
cd ~/01classic
mkdir mq1 mq2 mq3

2. 准备mq1的配置文件

  1. 进入mq1文件夹:cd ~/01classic/mq1

  2. 用vi编辑rabbitmq.conf文件:vi rabbitmq.conf

    然后按i进入编辑模式,在文件中添加下面的内容,然后保存并退出vi

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

     3. 再创建一个文件,记录Cookie:

#把cookie值保存到.erlang.cookie文件里。只执行一次就行,不要重复执行
echo "TCXMOWUEDEXDZSGZHUZG" > ~/01classic/mq1/.erlang.cookie

3. 拷贝配置文件

把mq1里的配置文件和cookie文件,拷贝到mq2和mq3文件夹里

cp ~/01classic/mq1/rabbitmq.conf ~/01classic/mq2
cp ~/01classic/mq1/rabbitmq.conf ~/01classic/mq3
cp ~/01classic/mq1/.erlang.cookie ~/01classic/mq2
cp ~/01classic/mq1/.erlang.cookie ~/01classic/mq3

#修改文件的权限
chmod 600 ~/01classic/mq1/.erlang.cookie
chmod 600 ~/01classic/mq2/.erlang.cookie
chmod 600 ~/01classic/mq3/.erlang.cookie

启动集群

#1. 创建虚拟网络
docker network create mq-net
#4. 创建mq1节点
docker run -d --net mq-net \
-v /root/01classic/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v /root//01classic/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 5671:5672 \
-p 15671:15672 \
rabbitmq:3.8-management
#5. 创建mq2节点
docker run -d --net mq-net \
-v /root//01classic/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v /root//01classic/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.8-management
#6. 创建mq3节点
docker run -d --net mq-net \
-v /root//01classic/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v /root//01classic/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 5673:5672 \
-p 15673:15672 \
rabbitmq:3.8-management

3 测试

元数据共享测试

  1. 打开mq1的控制台 http://192.168.200.137:15671,手动添加一个队列

2. 打开mq2和mq3的控制台,也能看到这个队列

 

数据共享测试

  1. 在mq1节点上,手动向simple.queue发送一条消息

     2. 在mq2和mq3上,可以查看到这条消息。其实不是数据共享,而是mq2和mq3帮我们从mq1上查询到消息,展示给我们看了  

可用性测试

  1. 关闭mq1容器(刚刚发送的消息,是在mq1上发送的)

    执行命令:docker stop mq1

  2. 再登录mq2或mq3的控制台,发现simple.queue不可用了

    说明:仅仅是把simple.queue的信息拷贝到了mq2和mq3,但是队列里的数据并没有拷贝过去

 

3. 镜像集群

1 介绍

在刚刚的案例中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。

镜像集群:本质是主从模式,具备下面的特征:

  • 镜像队列是一主多从结构:创建队列的节点称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。

  • 主从队列之间会同步队列和队列里的消息。

  • 所有操作都是主节点完成,然后同步给所有镜像节点;镜像节点仅仅作为备份

好处:

  • 具备自动故障恢复能力,当主队列宕机后,镜像节点会替代成新的主

  • 增加了可用性,主从之间会同步消息,所以即使主队列宕机,消息还有备份

缺点:主从之间同步消息的性能低。

  • 主队列收到消息后,需要同步到所有镜像队列(从队列)

  • 如果主队列宕机重启后:

    • 重启后的队列是空的,需要把所有消息(包括宕机前的消息和宕机期间的新消息)都同步过来。

    • 而这个同步是阻塞的,它会让整个队列不可用。如果队列里的消息堆积过多,会阻塞较长的时间

镜像集群的架构如图所示:

 

2 语法

镜像集群的三种模式

镜像模式的配置有3种模式,用于配置 主队列要有几个镜像队列:

这里我们以rabbitmqctl命令作为案例来讲解配置语法。语法示例:

exactly模式

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

说明:

  • rabbitmqctl set_policy:固定写法

  • ha-two:策略名称,自定义

  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称

  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容

    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量

    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像

    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

all模式

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'

说明:

  • ha-all:策略名称,自定义

  • "^all\.":匹配所有以all.开头的队列名

  • '{"ha-mode":"all"}':策略内容

    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

nodes模式

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

说明:

  • rabbitmqctl set_policy:固定写法

  • ha-nodes:策略名称,自定义

  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称

  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容

    • "ha-mode":"nodes":策略模式,此处是nodes模式

    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

创建集群

我们使用exactly模式的镜像,镜像数量设置为2.

执行以下命令:

docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3 测试

元数据共享测试

  1. 在mq1上创建一个队列two.queue

     2. 在mq2和mq3上可以看到队列two.queue

数据共享测试

  1. two.queue发送一条消息

     2. 在mq1、mq2、mq3任意一个节点上,都可以从two.queue队列中看到消息

其实查询消息,都是从mq1上查询得到的数据。因为two.queue在mq1节点上,mq1是主节点

可用性测试

  1. 关闭mq1:docker stop mq1

  2. 去mq2或mq3上查看,发现two.queue仍然健康,并且切换到了mq2节点上

4. 仲裁队列

1 介绍

仲裁队列:仲裁队列是RabbitMQ3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步

优点:

  • 所有消息都是持久化的

  • 仲裁队列基于Raft协议,比镜像队列更安全,性能更好

    当主队列收到一条消息,需要同步到副本队列,但是只要过半数的队列副本收到消息即可认为成功

    主队列宕机恢复后消息不丢失,不需要同步所有消息(只要同步宕机期间的新消息即可),且同步消息是非阻塞的

缺点:

  • 不支持惰性队列、队列和消息TTL、排它队列、非持久化消息等等

  • 更高的磁盘占用,不适合消息堆积过多的情况

2 添加仲裁队列

手动创建仲裁队列

  1. 创建队列

    Type:选择Quorum

    Name:队列名称,随便起

    Node:选择主节点

  

     2. 查看队列

下图中“+2”字样,表示队列有2个镜像队列

队列副本的数量由配置参数replication factor决定,参数值为 5 的仲裁队列将会有 1 个主副本和 4 个从副本;每个副本都在不通的 RabbitMQ 节点上。但是目前我们的集群只有3个节点,所以有1主2从;

代码声明仲裁队列  

@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁队列
        .build();
}

5. RabbitTemplate连接MQ集群

只要使用addresses代替掉原来的hostport即可

spring:
  rabbitmq:
    addresses: 192.168.200.137:5671, 192.168.200.137:5672, 192.168.200.137:5673
    username: itcast
    password: 123321
    virtual-host: /

大总结

消息可靠性

  • 为了保证:生产者发送的消息,最终成功被消费者处理

  • 确保消息成功到达交换机

    解决:用confirm确认机制

    用法:

    1. 生产者的配置文件里,开启confirm确认机制,使用correlated异步确认方式

    2. 生产者发送消息时,给每条消息附加一个CorrelationData对象,其中设置了confirm回调

      无论消息是否成功到达交换机,confirm回调函数都会执行

      我们在回调函数里编写代码,可以判断消息是否成功到达交换机

  • 确保消息成功到达队列

    解决:用return回调机制(消息回退)

    用法:

    1. 生产者的配置文件里,开启return消息回退

    2. 给RabbitTemplate设置return回调函数:

      如果消息成功到达队列,回调函数是不会执行的

      如果消息没有到达队列,消息会被退回,回调函数会执行,函数里的代码可以处理退回的消息

  • 保证MQ中间件持久化存储

    解决:SpringAMQP声明的队列、交换机、消息都是持久化的。不需要我们做什么事情

  • 确保消费者成功处理消息

    解决:消费者确认机制ack。默认使用auto自动确认

    • 效果:如果消费失败,消息默认会重新入队requeue,重新投递。会造成MQ不断投递 消息不断入队,压力大

    • 解决:在auto基础上,开启“消费者本地重试”

      • 效果:

        如果消费失败,SpringAMQP会再次调用消费者方法处理消息。而不会把消息重新入队

        可以设置生效的次数。如果重试次数耗尽仍然消费失败,走消息回收策略,默认是RejectAndDontRequeueRecoverer丢弃消息

      • 解决:在“消费者本地重试”基础上,设置最终的消息回收策略

        RejectAndDontRequeueRecoverer:直接丢弃消息

        ImmediateRequeueMessageRecoverer:消息重新入队

        RepublishMessageRecoverer:把消息投递到指定的交换机和队列

延迟消息的实现

  • 使用场景:

    订单超时未支付,自动取消订单

  • 实现方案:使用RabbitMQ实现

    使用死信交换机实现延迟消息:太麻烦

    使用延迟消息插件实现延迟消息:更简单

    使用@Bean方式使用插件实现延迟消息

    @Bean
    public TopicExchange delayedExchange(){
        //声明一个延迟消息交换机
        return ExchangeBuilder.topicExchange("交换机名称").delayed().build();
    }
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable("队列名称").build();
    }
    @Bean
    public Binding delayedBinding(TopicExchange delayedExchange, Queue delayedQueue){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("路由key");
    }
    Message msg = MessageBuilder.withBody(消息内容).setHeader("x-delay", 延迟毫秒值).build();
    rabbitTemplate.convertAndSend("交换机名", "路由key", msg);
    @RabbitListener(queuesToDeclare=@Queue("队列名"))
    public void handleDelayQueueMsg(String msg){
    }

    使用@RabbitListener方式使用插件实现延迟消息

    Message msg = MessageBuilder.withBody(消息内容).setHeader("x-delay", 延迟毫秒值).build();
    rabbitTemplate.convertAndSend("交换机名", "路由key", msg);
    @RabbitListener(bindings = @QueueBinding(
         value = @Queue("队列名"),
         exchange = @Exchange(value = "交换机名", type = 交换机类型, delayed = "true"),
         key = "路由key"
    ))
    public void handleDelayQueueMsg(String msg){
     
    }

消息堆积的处理

  • 处理方案:

    增加消费者,提升消费能力

    提升队列的消息堆积能力,可以使用惰性队列

  • 具体用法 :

    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable("队列名称").lazy().build();
    }

RabbitMQ的集群

  • 普通集群

    特点:

    • 把所有的队列分散到不同的节点上

    • 每个队列只有一份,没有备份副本

    • 所有服务器节点之间,会互相同步元数据(队列、交换机等等的信息)

    优点:

    • 相对单机RabbitMQ,可用性提升了:死一个节点,还有其它节点可用,整个MQ服务不会完全挂掉

    • 提升了MQ的并发能力和消息吞吐量:因为有多个节点共同提供消息收发服务

    缺点:

    • 一个队列只有一份,没有副本。一旦节点宕机,此节点上所有的队列都不可用了

  • 镜像队列

    特点:

    • 队列是主从集群模式,一主多从

      主队列提供消息收发服务;从队列只作为备份,不提供任何功能,在主队列宕机,从队列可以成为主

    • 主从之间会进行数据同步

    优点:

    • 可用性进一步提升了:一个节点宕机,还有镜像队列可以成为主队列,提供消息收发服务

    • 可以实现自动的故障转移

    缺点:

    • 主节点的数据同步到从节点:有一条新消息时,需要向所有从节点同步数据完成,才确认消息成功

    • 主节点宕机,主队列里的数据会丢失;重启之后,需要由新的主节点把所有消息全部同步过来。这个同步是阻塞式的

  • 仲裁队列

    特点:用于代替镜像队列

    • 队列是主从集群模式,一主多从

      主队列提供消息收发服务;从队列只作为备份,不提供任何功能,在主队列宕机,从队列可以成为主

    • 主从之间会进行数据同步

      一条消息到主节点,只需要同步给过半节点,这个条消息就可以确认已经存储成功了

    • 消息是持久化存储到磁盘的

    优点:

    • 可用性更高

    • 可以实现自动故障转移

    • 主从同步性能更高:

      一条消息到主节点,只需要同步给过半节点,这个条消息就可以确认已经存储成功了

      如果主节点宕机,消息不会丢失。当节点重启后,只需要从新的主节点里同步 宕机期间的新数据过来即可,不需要同步所有的数据过来

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/613434.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【前端部署】Ubuntu22.04 使用nginx部署vue前端项目教程

一.ubuntu安装nginx 1.更新本地软件包列表 sudo apt update2.安装nginx sudo apt install nginx3.验证nginx是否安装成功 sudo systemctl status nginx如果Nginx正在运行&#xff0c;则命令输出应该显示Active&#xff08;active (running)&#xff09;状态。 4.若nginx未运…

【MySQL】——课程平台的创建设计

&#x1f4bb;博主现有专栏&#xff1a; C51单片机&#xff08;STC89C516&#xff09;&#xff0c;c语言&#xff0c;c&#xff0c;离散数学&#xff0c;算法设计与分析&#xff0c;数据结构&#xff0c;Python&#xff0c;Java基础&#xff0c;MySQL&#xff0c;linux&#xf…

2024HW Linux应急响应基础学习

首先展示关于Linux的关键目录&#xff0c;这是应急响应查看的关键&#xff1a; 常用命令 top //查看进程资源的占用情况 ps -aux //查看进程 直接写ps aux也可以 netstat -antpl //查看网络连接 ls -alh /proc/pid //查看某个pid对应的可执行程序 pid记得修改 lsof /…

微信登录功能--网站应用

微信开发平台注册https://open.weixin.qq.com/ 账号中心-填写基本资料&#xff08;最好是公司注册&#xff09; 账号中心-开发者资质认证&#xff08;充钱&#xff0c;300&#xff09; 审核通过之后&#xff0c;管理中心-网站应用-创建网站应用&#xff08;AppSecret一定一定…

SMART700西门子触摸屏维修6AV6 648-0CC11-3AX0

西门子工控机触摸屏维修系列型号&#xff1a;PС477,PC677,TD200,TD400,KTP178,TP170A,TP170B,TP177A,TP177B,TP270,TP277,TP27,MP370,MP277,OP27,OP177B等。 触摸屏故障有&#xff1a;上电黑屏, 花屏,暗屏,触摸失灵,按键损坏,电源板,高压板故障,液晶,主板坏等,内容错乱、进不了…

nacos server安装部署傻瓜级教程

下载地址&#xff1a;GitHub - alibaba/nacos: an easy-to-use dynamic service discovery, configuration and service management platform for building cloud native applications.an easy-to-use dynamic service discovery, configuration and service management platfo…

C++ 搜索二叉树

目录 1.二叉搜索树概念 2. 实现二叉搜索树 2.1. 二叉搜索树的插入 2.2查找 2.3删除节点 3.二叉树的应用&#xff08;KV结构&#xff09; 1.二叉搜索树概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 若它的左子树不为…

AutoCAD中密集的填充打散后消失的问题

有时候在AutoCAD中&#xff0c;图案填充的填充面积过大或填充太过密集时&#xff0c;将该填充打散&#xff0c;也就是执行Explode时&#xff0c;会发现填充图案消失了。 原因是打散后线条太大&#xff0c;系统就不显示了。可以通过设置&#xff1a;HPMAXLINES 值&#xff0c;来…

奇诡 matlab 小 bug matlab git需要记录的改动太多

似乎是我有一次添加了太多的路径之后的事情。但是不敢说一定是这个导致的&#xff1a; 症状&#xff1a;只要对文本进行任何编辑操作&#xff0c;工作区就会出现"Processing … Cancel"的提示&#xff0c;如果不管的话这个提示不会消失&#xff0c;同时matlab变得越来…

Minimal-Supervised Medical Image Segmentation via Vector Quantization Memory

文章目录 Minimal-Supervised Medical Image Segmentation via Vector Quantization Memory摘要方法实验结果 Minimal-Supervised Medical Image Segmentation via Vector Quantization Memory 摘要 辅助重构分支&#xff1a;该分支通过提供额外的监督并产生学习视觉表示所需…

2025COSP深圳户外展已定档招商工作正式启动!抢占先机,领跑华南市场

想开拓全国最具消费能力的华南市场&#xff1f; 想招到优质的实力经销商&#xff1f; 想快速的提高品牌知名度&#xff1f; 2025-COSP深圳国际户外展会将于2025年2月27-3月1日在深圳福田会展中心盛大开幕&#xff01; 回顾过去 2024-COSP深圳国际户外展我们取得了较好的成绩…

UEC++ FString做为参数取值时报错error:C4840

问题描述 用来取FString类型的变量时报错&#xff1a; 问题解决 点击错误位置&#xff0c;跳转到代码&#xff1a; void AMyDelegateActor::TwoParamDelegateFunc(int32 param1, FString param2) {UE_LOG(LogTemp, Warning, TEXT("Two Param1:%d Param2:%s"), param…

流量分析(一)

数据库类流量分析 MySQL流量 常规操作&#xff0c;查找flag ctfhub{} 注意要选择字符集 Redis流量 查找ctfhub结果没找到 尝试把其变成十六进制继续进行查找 看到了前半段flag 接着往下看 找到了后半段的flag MongoDB流量 还是一样查找ctfhub 字符串没找到 转成十六进制也没…

【算法】二叉树中的dfs

快乐的流畅&#xff1a;个人主页 个人专栏&#xff1a;《算法神殿》《数据结构世界》《进击的C》 远方有一堆篝火&#xff0c;在为久候之人燃烧&#xff01; 文章目录 引言一、计算布尔二叉树的值二、求根节点到叶节点数字之和三、二叉树剪枝四、验证搜索二叉树五、二叉搜索树中…

U盘文件剪切丢失怎么办?揭秘原因并给出恢复方法

在日常生活和工作中&#xff0c;U盘已成为我们不可或缺的数据存储和传输工具。但有时候&#xff0c;我们在对U盘中的文件进行剪切操作时&#xff0c;会遇到文件丢失的情况。这种突如其来的数据消失往往会让人感到惊慌和困惑。那么&#xff0c;为什么U盘剪切时文件会丢失呢&…

详解GaussDB(DWS)中的行执行引擎

1.前言 GaussDB&#xff08;DWS&#xff09;包含三大引擎&#xff0c;一是SQL执行引擎&#xff0c;用来解析用户输入的SQL语句&#xff0c;生成执行计划&#xff0c;供执行引擎来执行&#xff1b;二是执行引擎&#xff0c;其中包含了行执行引擎和列执行引擎&#xff0c;执行引擎…

DataLab-数据分析的Ai辅助工具

添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09;DataLab是一个由DataCamp提供的强大在线数据分析平台&#xff0c;它通过AI技术简化了数据处理流程&#xff0c;使得用户无需编程或数据分析的高级技能即可快速获取数据洞察。它支持多种数据源&#xff0c;包…

路由器、交换机和网卡

大家使用VMware安装镜像之后&#xff0c;是不是都会考虑虚拟机的镜像系统怎么连上网的&#xff0c;它的连接方式是什么&#xff0c;它ip是什么&#xff1f; 路由器、交换机和网卡 1.路由器 一般有几个功能&#xff0c;第一个是网关、第二个是扩展有线网络端口、第三个是WiFi功…

显影不干净如何解决?

知识星球&#xff08;星球名&#xff1a;芯片制造与封测社区&#xff0c;星球号&#xff1a;63559049&#xff09;里的学员问&#xff1a;光刻工序完成后&#xff0c;晶圆表面有部分图形容易出现显影不净是什么原因&#xff1f;有什么好的解决办法吗&#xff1f; 光刻工序流程 …

SQL常用函数

一、日期相关函数 1、CURDATE() / CURRENT_DATE 返回当前日期 2、CURRENT_TIME()/CURTIME() 返回当前时间 3、CURRENT_TIMESTAMP 返回当前日期时间 4、DATE()从日期或日期时间表达式中提取日期值 5、DATEDIFF(d1,d2)计算日期 d1->d2 之间相隔的天数 6、DATE_FORMAT按表达式…