【MQ】Spring3 中 RabbitMQ 的使用与常见场景

一、初识 MQ

传统的单体架构,分布式架构的同步调用里,无论是方法调用,还是 OpenFeign 难免会有以下问题:

  1. 扩展性差(高耦合,需要依赖对应的服务,同样的事件,不断有新需求,这个事件的业务代码会越来越臃肿,这些子业务都写在一起)
  2. 性能下降(等待响应,最终整个业务的响应时长就是每次远程调用的执行时长之和)
  3. 级联失败(如果是一个事务,一个服务失败就会导致全部回滚,若是分布式事务就更加麻烦了,但其实一些行为的失败不应该导致整体回滚)
  4. 服务宕机(如果服务调用者未考虑服务提供者的性能,导致提供者因为过度请求而宕机)

但如果不是很要求同步调用,其实也可以用异步调用,如果是单体架构,你可能很快能想到一个解决方案,就是阻塞队列实现消息通知:

在这里插入图片描述

但是在分布式架构下,可能就需要一个中间件级别的阻塞队列,这就是我们要学习的 Message Queue 消息队列,简称 MQ,而现在流行的 MQ 还不少,在实现其基本的消息通知功能外,还有一些不错的扩展

以 RabbitMQ 和 Kafka 为例:

RabbitMQKafka
公司/社区RabbitApache
开发语言ErlangScala & Java
协议支持AMQP,XMPP,SMTP,STOMP自定义协议
可用性
单机吞吐量一般非常高(Kafka 亮点)
消息延迟微秒级毫秒以内
消息可靠性一般

消息延迟指的是,消息到队列,并在队列中“就绪”的时间与预期时间的差距,其实就是数据在中间件中流动的耗时,预期时间可以是现在、几毫秒后、几秒后、几天后…

据统计,目前国内消息队列使用最多的还是 RabbitMQ,再加上其各方面都比较均衡,稳定性也好,因此我们课堂上选择 RabbitMQ 来学习。

二、RabbitMQ 安装

Docker 安装 RabbitMQ:

mkdir /root/mq
cd /root/mq

docker rm mq-server -f
docker rmi rabbitmq:3.8-management -f
docker volume rm mq-plugins -f

docker pull rabbitmq:3.8-management

# 插件数据卷最好还是直接挂载 volume,而不是挂载我们的目录
docker run \
--name mq-server \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx \
--hostname mq1 \
-v mq-plugins:/plugins \
-p 15672:15672 \
-p 5672:5672 \
-d rabbitmq:3.8-management

三、RabbitMQ 基本知识

(1)架构

15672:RabbitMQ 提供的管理控制台的端口

5672:RabbitMQ 的消息发送处理接口

用户名密码就是安装时,启动容器时指定的用户名密码

MQ 对应的就是这里的消息代理 Broker:

在这里插入图片描述

RabbitMQ 详细架构图:

在这里插入图片描述

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的 exchange、queue

现在你可能只认识生产者、消费者、队列,其他是什么呢?

其实你可以理解为 MQ 也是存储东西的,存储的就是消息,virtual host 就是数据库,queue 就是表,消息就是一行数据,而 MQ 有特殊的机制,消息先通过 exchange 再决定前往哪个 queue

管理控制台的使用就不多说了

(2)五大模式

这只是最常见的五种模式:

  1. 简单模式

在这里插入图片描述

  1. 工作模式

在这里插入图片描述

  1. 发布订阅模式

关联交换机的队列都能收到一份消息,广播

在这里插入图片描述

  1. 路由模式

关联交换机时,提供 routing key(可以是多个,队列之间可以重复),发布消息时提供一个 routing key,由此发送给指定的队列

在这里插入图片描述

值得注意的是,简单模式和工作模式,其实也是有交换机的,任何队列都会绑定一个默认交换机 "",类型是 direct,routing key 为队列的名称

  1. 主题模式

在这里插入图片描述

路由模式的基础上,队列关联交换机时 routing key 可以是带通配符的

routing key 的单词通过 . 分割, # 匹配 n 个单词(n ≥ 0),* 只匹配一个单词

例如 #.red:

  • 可以匹配的 routing key:p1.red、red、p2.p1.red

在发布消息时,要使用具体的 routing key,交换机发送给匹配的队列

(3)数据隔离

  1. 隔离 virtual host

在这里插入图片描述

  1. 隔离用户(赋予访问权限)

在这里插入图片描述

四、RabbitMQ 基本使用 Spring AMQP

引入 RabbitMQ 相关的 SDK,可以通过创建连接 Connection、创建通道 Channel,用 Channel 进行操作,接受消息也差不多,不过多演示:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("xx.xx.xx.xx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

但比较麻烦,Spring AMQP 框架可以自动装配 RabbitMQ 的操作对象 RabbitTemplate,这样我们就可以更方便的操作 MQ,并充分发挥其特性

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

默认包含 RabbitMQ 的实现,如果你想对接其他 AMQP 协议的 MQ,得自己实现其抽象封装的接口

(1)发送消息

注意,下面是 Spring3 的写法,所以会有点不一样,可能看不懂,稍后解释!

消息发送器封装:

@Repository
@RequiredArgsConstructor
@Slf4j
public class RabbitMQSender {

    private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");

    private final RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setTaskExecutor(EXECUTOR);
    }

    private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
        log.error("处理 ack 回执失败, {}", ex.getMessage());
        return null;
    };

    private MessagePostProcessor delayMessagePostProcessor(long delay) {
        return message -> {
            // 小于 0 也是立即执行
            // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
            message.getMessageProperties().setDelay((int) Math.max(delay, 0));
            return message;
        };
    };

    private CorrelationData newCorrelationData() {
        return new CorrelationData(UUIDUtil.uuid32());
    }

    /**
     * @param exchange 交换机
     * @param routingKey routing key
     * @param msg 消息
     * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
     * @param maxRetries 最大重试机会
     * @param <T> 消息的对象类型
     */
    private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
        log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
                exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
        CorrelationData correlationData = newCorrelationData();
        MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
        correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {

            private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器

            @Override
            public void accept(CorrelationData.Confirm confirm) {
                Optional.ofNullable(confirm).ifPresent(c -> {
                    if(c.isAck()) {
                        log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
                    } else {
                        log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
                        if(retryCount >= maxRetries) {
                            log.error("次数到达上限 {}", maxRetries);
                            return;
                        }
                        retryCount++;
                        log.warn("开始第 {} 次重试", retryCount);
                        CorrelationData cd = newCorrelationData();
                        cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
                        rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
                    }
                });
            }
        }, EXECUTOR);
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
    }

    public void sendMessage(String exchange, String routingKey, Object msg) {
        send(exchange, routingKey, msg, 0, 0);
    }

    public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){
        send(exchange, routingKey, msg, delay, 0);
    }

    public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) {
        send(exchange, routingKey, msg, 0, maxReties);
    }

    public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) {
        send(exchange, routingKey, msg, delay, maxReties);
    }

}

(2)接受消息

监听器:

  • RabbitTemplate 是可以主动获取消息的,也可以不实时监听,但是一般情况都是监听,有消息就执行
  • 监听的是 queue,若 queue 不存在,就会根据注解创建一遍
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "xxx"),
    exchange = @Exchange(name = "xxx", delayed = "true"),
    key = {"xxx"}
))
public void xxx(X x) {

}

(3)声明交换机与队列

可以通过 @Bean 创建 Bean 对象的方式去声明,可以自行搜索,我更喜欢监听器注解的形式,而且 Bean 的方式,可能会因为配置不完全一样,导致其他配置类的交换机队列无法声明(现象如此,底层为啥我不知道)

(4)消息转换器

消息是一个字符串,但为了满足更多需求,需要将一个对象序列化成一个字符串,但默认的序列化实现貌似用的是 java 对象的序列化,这种方式可能得同一个程序的 java 类才能反序列化成功,所以我们应该选择分布式的序列化方式,比如 json

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageConverterConfig {

    @Bean
    public MessageConverter messageConverter(){
        // 1. 定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER);
        // 2. 配置自动创建消息 id,用于识别不同消息
        jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE);
        return jackson2JsonMessageConverter;
    }
}

这里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己实现的 ObjectMapper

(5)配置文件

spring:
  rabbitmq:
    host: ${xxx.mq.host} # rabbitMQ 的 ip 地址
    port: ${xxx.mq.port} # 端口
    username: ${xxx.mq.username}
    password: ${xxx.mq.password}
    virtual-host: ${xxx.mq.virtual-host}
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true # 若是 false 则直接丢弃了,并不会发送者回执
    listener:
      simple:
        prefetch: 1 # 预取为一个(消费完才能拿下一个)
        concurrency: 2 # 消费者最少 2 个线程
        max-concurrency: 10 # 消费者最多 10 个线程
        auto-startup: true # 为 false 监听者不会实时创建和监听,为 true 监听的过程中,若 queue 不存在,会再根据注解进行创建,创建后只监听 queue,declare = "false" 才是不自动声明
        default-requeue-rejected: false # 拒绝后不 requeue(成为死信,若没有绑定死信交换机,就真的丢了)
        acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
        retry: # 这个属于 spring amqp 的 retry 机制
          enabled: false # 不开启失败重试
#          initial-interval: 1000
#          multiplier: 2
#          max-attempts: 3
#          stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false

五、常见问题

在这里插入图片描述

(1)RabbitMQ 如何保证消息可靠性

保证消息可靠性、不丢失。主要从三个层面考虑

如果报错可以先记录到日志中,再去修复数据(保底)

1、生产者确认机制

生产者确认机制,确保生产者的消息能到达队列

  1. publisher-confirm,针对的是消息从发送者到交换机的可靠性,成功则进行下一步,失败返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");

private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
    rabbitTemplate.setTaskExecutor(EXECUTOR);
}

private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
    log.error("处理 ack 回执失败, {}", ex.getMessage());
    return null;
};

private MessagePostProcessor delayMessagePostProcessor(long delay) {
    return message -> {
        // 小于 0 也是立即执行
        // setDelay 才是给 RabbitMQ 看的,setReceivedDelay 是给 publish-returns 看的
        message.getMessageProperties().setDelay((int) Math.max(delay, 0));
        return message;
    };
};

private CorrelationData newCorrelationData() {
    return new CorrelationData(UUIDUtil.uuid32());
}

/**
     * @param exchange 交换机
     * @param routingKey routing key
     * @param msg 消息
     * @param delay 延迟时间(如果是延迟交换机,delay 才有效)
     * @param maxRetries 最大重试机会
     * @param <T> 消息的对象类型
     */
private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){
    log.info("准备发送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}",
             exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries);
    CorrelationData correlationData = newCorrelationData();
    MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay);
    correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() {

        private int retryCount = 0; // 一次 send 从始至终都用的是一个 Consumer 对象,所以作用的都是同一个计数器

        @Override
        public void accept(CorrelationData.Confirm confirm) {
            Optional.ofNullable(confirm).ifPresent(c -> {
                if(c.isAck()) {
                    log.info("ACK {} 消息成功到达,{}", correlationData.getId(), c.getReason());
                } else {
                    log.warn("NACK {} 消息未能到达,{}", correlationData.getId(), c.getReason());
                    if(retryCount >= maxRetries) {
                        log.error("次数到达上限 {}", maxRetries);
                        return;
                    }
                    retryCount++;
                    log.warn("开始第 {} 次重试", retryCount);
                    CorrelationData cd = newCorrelationData();
                    cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR);
                    rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd);
                }
            });
        }
    }, EXECUTOR);
    rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData);
}

Spring3 的 RabbitMQ Confirm,需要配置为 correlated,发送消息时提供 CorrelationData,也就是与消息关联的数据,包括发送者确认时的回调方法

在这里插入图片描述

要想提供 Confirm 的回调办法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 对象(新的 JUC 工具类,可以查一查如何使用)

配置后,在未来根据回调函数进行处理(当然也可以直接设置在 RabbitTemplate 对象的 ConfirmCallBack)

还可以自己实现消息的发送者重试:

在这里插入图片描述

  1. publisher-returns,针对的是消息从交换机到队列的可靠性,成功则返回 ACK,失败触发 returns 的回调方法
@Component
@RequiredArgsConstructor
@Slf4j
public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback {

    // 不存在 routing key 对应的队列,那在我看来转发到零个是合理的现象,但在这里也认为是路由失败(MQ 认为消息一定至少要进入一个队列,之后才能被处理,这就是可靠性)(反正就是回执了,你爱咋处理是你自己的事情)
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 可能一些版本的 mq 会因为是延时交换机,导致发送者回执,只要没有 NACK 这种情况其实并不是不可靠(其实我也不知道有没有版本会忽略)
        // 但是其实不忽略也不错,毕竟者本来就是特殊情况,一般交换机是不存储的,但是这个临时存储消息
        // 但这样也就代表了,延时后消息路由失败是没法再次处理的(因为我们交给延时交换机后就不管了,可靠性有 mq 自己保持)
        MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties();
        // 这里的 message 并不是原本的 message,是额外的封装,x-delay 在 publish-returns 里面封装到 receiveDelay 里了
        Integer delay = messageProperties.getReceivedDelay();
        // 如果不是延时交换机,却设置了 delay 大于 0,是不会延时的,所以是其他原因导致的(以防万一把消息记录到日志里)
        if(Objects.nonNull(delay) && delay.compareTo(0) > 0) {
            log.info("交换机 {}, 路由键 {} 消息 {} 延迟 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay));
            return;
        }
        log.warn("publisher-returns 发送者回执(应答码{},应答内容{})(消息 {} 成功到达交换机 {},但路由失败,路由键为 {})",
                returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(),
                returnedMessage.getExchange(), returnedMessage.getRoutingKey());
    }
}


RabbitMQSender:

private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread");

private final RabbitTemplate rabbitTemplate;

private final PublisherReturnsCallBack publisherReturnsCallBack;

@PostConstruct
public void init() {
    rabbitTemplate.setTaskExecutor(EXECUTOR);
    // 设置统一的 publisher-returns(confirm 也可以设置统一的,但最好还是在发送时设置在 future 里)
    // rabbitTemplate 的 publisher-returns 同一时间只能存在一个
    // 因为 publisher confirm 后,其实 exchange 有没有转发成功,publisher 没必要每次发送都关注这个 exchange 的内部职责,更多的是“系统与 MQ 去约定”
    rabbitTemplate.setReturnsCallback(publisherReturnsCallBack);
}

同理你也可以按照自己的想法进行重试…

在测试练习阶段里,这个过程是异步回调的,如果是单元测试,发送完消息进程就结束了,可能就没回调,程序就结束了,自然就看不到回调时的日志

如果既没有 ACK 也没有 NACK,也没有发布者回执,那就相当于这个消息销声匿迹了,没有任何的回应,那么就会抛出异常,我们可以处理这个异常,比如打印日志、重发之类的…

private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> {
    log.error("处理 ack 回执失败, {}", ex.getMessage());
    return null;
};

2、持久化

消息队列的数据持久化,确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化

默认都是持久化的

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、消费者确认

队列的消息出队列,并不会立即删除,而是等待消费者返回 ACK 或者 NACK

消费者要什么时候发送 ACK 呢?

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

如果出现这种场景,就是不可靠的,所以应该是消息处理后,再发送 ACK

Spring AMQP 有三种消费者确认模式:

  1. manual,手段 ack,自己用 rabbitTemplate 去发送 ACK/NACK(这个比较麻烦,不用 RabbitListener 接受消息才必须用这个)
  2. auto,配合 RabbitListener 注解,代码若出现异常,NACK,成功则 ACK
  3. none,获得消息后直接 ACK,无论是否执行成功

出现 NACK 后要如何处理(此过程还在我们的服务器):

  1. 拒绝(默认)
  2. 重新入队列
  3. 返回 ACK,消费者重新发布消息指定的交换机
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MessageRecovererConfig {

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成为死信(默认)
//        return new ImmediateRequeueMessageRecoverer(); // nack、requeue
//        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、发送给指定的交换机,confirm 机制需要设置到 rabbitTemplate 里
    }

}

Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 消费者执行成功 ack、异常 nack(manual 为手动、none 代表无论如何都是 ack)
        retry: # 这个属于 spring amqp 的 retry 机制
          enabled: false # 不开启失败重试
          initial-interval: 1000 # 第一次重试时间间隔
          multiplier: 3 # 每次重试间隔的倍数
          max-attempts: 4 # 最大接受次数
          stateless: true # true 代表没有状态,若有消费者包含事务,这里改为 false

解释:第一次失败,一秒后重试、第二次失败,三秒后重试,第三次失败,九秒后重试,第四次失败就没机会了(SpringAMQP会抛出异常AmqpRejectAndDontRequeueException)

失败之后根据对应的处理策略进行处理

(2)死信交换机

消息过期、消息执行失败并且不重试也不重新入队列,堆积过多等情况,消息会成为死信,若队列绑定死信交换机,则转发给死信交换机,若没有则直接丢弃

队列1 -> 死信交换机 -> 队列2,这个过程是消息队列内部保证的可靠性,消息也没有包含原发送者的信息,甚至连接已经断开了,所以没有 publisher-confirm 也没有 publisher-returns

在这里插入图片描述

这个机制和 republish 有点像,但是有本质的区别,republish 是消费者重发,而这里是队列将死信转发给死信交换机

死信的情况:

  1. nack && requeue == false
  2. 超时未消费
  3. 队列满了,由于队列的特性,队列头会先成为死信

(3)延迟功能如何实现

刚才提到死信的诞生可能是超时未消费,那么其实这个点也可以简单的实现一个延迟队列:

队列为一个不被监听的专门用来延迟消息发送的缓冲带,其死信交换机才是目标交换机

message.getMessageProperties().setExpiration("1000");

设置的是过期时间,其本意并不是延迟,是可以实现延迟~

在这里插入图片描述

另外,队列本身也能设置 ttl 过期时间,但并不是队列的过期时间(显然不合理,截止后无论啥都丢了,冤不冤啊,至少我想不到这种场景),而是队列中的消息存活的最大时间,消息的过期时间和这个取一个最小值才是真实的过期时间

值得注意的是,虽然能实现延时消息的功能,但是

  1. 实现复杂
  2. 延迟可能不准确,因为队列的特性,如果队列头未出队列,哪怕其后者出现死信,也只能乖乖等前面的先出去之后才能前往死信交换机(例如消息的 ttl 分别为 9s、3s、1s,最终三个消息会被同时转发,因为“最长寿的”排在了前面)

这种方式的顺序优先级大于时间优先级

而 RabbitMQ 也提供了一个插件,叫 DelayExchange 延时交换机,专门用来实现延时功能

Scheduling Messages with RabbitMQ | RabbitMQ

  • 请自行上网下载

延时交换机的声明:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

延时消息的发送:

private MessagePostProcessor delayMessagePostProcessor(long delay) {
    return message -> {
        // 小于 0 也是立即执行
        message.getMessageProperties().setDelay((int) Math.max(delay, 0));
        return message;
    };
};

这里设置的是 Delay,不是过期时间,哪怕超过了时间也不叫做死信

期间一直存在延时交换机的硬存里,延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

(4)消息堆积如何解决

死信的成因还可能是堆叠过多

我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的

  1. 提高消费者的消费能力 ,可以使用多线程消费任务
  2. 增加更多消费者,提高消费速度,使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
  3. 扩大队列容积,提高堆积上限

但是,RabbitMQ 队列占的是内存,间接性的落盘,提高上限最终的结果很有可能就是反复落库,特别不稳定,且并没有解决消息堆积过多的问题

我们可以使用 RabbitMQ 惰性队列,惰性队列的好处主要是

  1. 接收到消息后直接存入磁盘而非内存,虽然慢,但没有间歇性的 page-out,性能比较稳定
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存,正常消费后就删除了
  3. 基于磁盘存储,消息上限高,支持数百万条的消息存储

声明方式:

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列
  • x-queue-mode 参数的值为 lazy
@RabbitListener(bindings = @QueueBinding(
    exchange = @Exchange(name = "xxx"),
    value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    key = "xxx"
))

交换机、队列扩展属性叫参数,消息的拓展属性叫头部,扩展属性一般都以 x- 开头(extra)

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 使用惰性队列,可以再mq中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的 page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

(5)高可用如何保证

RabbitMQ 在服务大规模项目时,一般情况下不会像数据库那样存储的瓶颈,用惰性队列已经是很顶天了的,其特性和用途不会有太极端的存储压力

更多的是在并发情况下,处理消息的能力有瓶颈,可能出现节点宕机的情况,而避免单节点宕机,数据丢失、无法提供服务等问题需要解决,也就是需要保证高可用性

Erlang 是一种面向并发的语言,天然支持集群模式,RabbitMQ 的集群有两种模式:

  1. 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
  2. 镜像集群:是一种主从集群,在普通集群的基础上,添加了主从备份的功能,提高集群的数据可用性

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险(虽然重启能解决,但那不是强一致,而是最终一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用 Raft 协议确保主从的数据一致性

1、普通集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  1. 交换机的信息
  2. 队列的信息

但不包括队列中的消息(动态的数据不同步)

监听队列的时候,如果监听的节点不存在该队列(只是知道元数据),当前节点会访问队列所在的节点,该节点返回数据到当前节点并返回给监听者

队列所在节点宕机,队列中的消息就会“丢失”(是在重启之前,这个消息就消失无法被处理的意思)

在这里插入图片描述

如何部署,上网搜搜就行

2、镜像集群

各个节点之间,实时同步 MQ 元数据(一些静态的共享的数据):

  1. 交换机的信息
  2. 队列的信息

本质是主从模式,创建队列的节点为主节点,其他节点为镜像节点,队列中的消息会从主节点备份到镜像节点中

注意

  • 像 Redis 那样的主从集群,同步都是全部同步来着
  • 但 RabbitMQ 集群的主从模式比较特别,他的粒度是队列,而不是全部

也就是说,一个队列的主节点,可能是另一个队列的镜像节点,所以分析某个场景的时候,要确认是哪个队列,单独进行观察分析讨论

  • 不同队列之间只有交互,不会相互影响数据同步

针对某一个队列,所有写操作都在主节点完成,然后同步给镜像节点,读操作任何一个都 ok

主节点宕机,镜像节成为新的主节点

在这里插入图片描述

镜像集群有三种模式:

  1. exactly 准确模式,指定副本数 count = 主节点数 1 + 镜像节点数,集群会尽可能的维护这个数值,如果镜像节点出现故障,就在另一个节点上创建镜像,比较建议这种模式,可以设置为 N/2 + 1
  2. all 全部模式,count = N,主节点外全部都是镜像节点
  3. nodes 模式,指定镜像节点名称列表,随机一个作为主节点,如果列表里的节点都不存在或不可用,则创建队列时的节点作为主节点,之后访问集群,列表中的节点若存在才会创建镜像节点

没有镜像节点其实就相当于普通模式了

如何配置上网搜搜就行,比较麻烦,需要设置策略,以及匹配的队列(不同队列分开来讨论,可以设置不同的策略)

3、仲裁队列

RabbitMQ 3.8 以后,推出了新的功能仲裁队列来

  1. 代替镜像集群,都是主从模式,支持主从数据同步,默认是 exactly count = 5
  2. 约定大于配置,使用非常简单没有复杂的配置,队列的类型选择 Quorum 即可
  3. 底层采用 Raft 协议确保主从的数据强一致性

Spring Boot 配置:

在这里插入图片描述

仲裁队列声明:

@RabbitListener(bindings = @QueueBinding(
    exchange = @Exchange(name = "xxx"),
    value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")),
    key = "xxx"
))

队列不声明默认就是普通集群,这里声明的仲裁队列也只是针对一个队列

(6)消息重复消费问题

在保证MQ消息不重复的情况下,MQ 的一条消息被消费者消费了多次

消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常或者是服务宕机,MQ 迟迟没有接收到 ACK 也没有 NACK,此时 MQ 不会将发送的消息删除,按兵不动,消费者重新监听或者有其他消费者的时候,交由它消费,而这条消息如果在之前就消费过了的话,则会导致重复消费

解决方案:

  1. 消息消费的业务本身具有幂等性,再次处理相同消息时不会产生副作用,一些时候可能需要用到分布式锁去维护幂等性
    • 比如一个订单的状态设置为结束,那重复消费的结果一致
  2. 记录消息的唯一标识,如果消费过了的,则不再消费
    • 消费成功将 id 缓存起来,消费时查询缓存里是否有这条消息
    • 设置允许的缓存时间时,你不必想得太极端,一般很快就有消费者继续监听拿到消息,哪怕真有那个情况,这里带来的损失大概率可以忽略不记了,一切要结合实际情况!

有时候两种方案没有严格的界定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968979.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW与USB设备开发

开发一台USB设备并使用LabVIEW进行上位机开发&#xff0c;涉及底层驱动的编写、USB通信协议的实现以及LabVIEW与设备的接口设计。本文将详细介绍如何开发USB设备驱动、实现LabVIEW与USB设备的通信以及优化数据传输&#xff0c;帮助用户顺利完成项目开发。下面是一个详细的说明&…

kali连接xshell

1.先保证宿主机&#xff1a;以太网适配器 VMware Network Adapter VMnet8 和kali&#xff08;net 模式&#xff09;在同一个网段 windows VMnet8开启 查看是否是自动获取ip ipv4 和ipv6一样的 查看 windows VMnet8的IPv4的地址 查看 kali 的IP地址 window ping的结果&#xf…

557. 反转字符串中的单词 III 简单

557. 反转字符串中的单词 IIIhttps://leetcode.cn/problems/reverse-words-in-a-string-iii/ 给定一个字符串 s &#xff0c;你需要反转字符串中每个单词的字符顺序&#xff0c;同时仍保留空格和单词的初始顺序。 示例 1&#xff1a; 输入&#xff1a;s "Lets take LeetC…

多语言订货系统的语言适配与本地化开发策略

在全球化浪潮的席卷下&#xff0c;商业世界的边界日益模糊&#xff0c;企业纷纷踏上国际化征程&#xff0c;与世界各地的客户展开紧密合作。在这一背景下&#xff0c;多语言订货系统成为企业开拓全球市场的关键基础设施&#xff0c;其语言适配能力与本地化开发策略&#xff0c;…

OpenWRT中常说的LuCI是什么——LuCI介绍(一)

我相信每个玩openwrt的小伙伴都或多或少看到过luci这个东西&#xff0c;但luci到底是什么东西&#xff0c;可能还不够清楚&#xff0c;今天就趁机来介绍下&#xff0c;openwrt中的luci&#xff0c;到底是个什么东西。 什么是LuCI&#xff1f; 首先&#xff0c;LuCI是OpenWRT中…

第39周:猫狗识别 2(Tensorflow实战第九周)

目录 前言 一、前期工作 1.1 设置GPU 1.2 导入数据 输出 二、数据预处理 2.1 加载数据 2.2 再次检查数据 2.3 配置数据集 2.4 可视化数据 三、构建VGG-16网络 3.1 VGG-16网络介绍 3.2 搭建VGG-16模型 四、编译 五、训练模型 5.1 上次程序的主要Bug 5.2 修改版…

vue3 描边加载动画

效果&#xff1a; 组件代码&#xff1a; <template><divclass"loading-wrap"ref"loadingWrap":style"[{ borderRadius: styles.borderRadius || 4px },{ borderColor: styles.borderColor || #409eff },{ border: loading ? 1px solid #40…

20240911 光迅科技 笔试

文章目录 1、选择题1.11.21.31.41.51.61.71.81.91.101.111.121.131.141.152、编程题2.1岗位:嵌入式软件工程师 题型:15 道选择题,1 道编程题 注意:本文章暂无解析,谨慎分辨答案对错 1、选择题 1.1 若某图有 100 个顶点、90 条边,则该图一定是 (C) 有向图连通图非连…

C++软件开发常见面试题(二)

struct和class的区别 指针和引用的区别&#xff1f;c为什么提供了引用这个东西&#xff1f; 说const 指针和指针 const的区别&#xff1f;例如const A*是什么意思&#xff1f;了解const 函数吗&#xff1f;具体是不修改哪些数据成员呢&#xff1f; 多态。追问&#xff1a;动态…

[生信云问题分析] 为什么医院/单位/校园网络,无法通过ssh协议访问服务器

使用生信云,生信分析更省心轻松&#xff1b;欢迎访问生信圆桌 www.tebteb.cc了解 背景 许多科研人员在日常工作中需要使用单位的网络&#xff0c;但有时会遇到一个奇怪的现象&#xff1a;虽然网页可以正常打开&#xff0c;却无法通过SSH协议访问科研服务器。SSH&#xff08;Se…

java项目之基于推荐算法的图书购物网站源码(ssm+mybatis+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的基于推荐算法的图书购物网站项目。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 基于推荐算法的…

鸿蒙HarmonyOS NEXT开发:优化复杂UI页面的性能——自定义组件冻结(freezeWhenInactive属性)

文章目录 一、自定义组件冻结1、freezeWhenInactive 二、当前支持的场景1、页面路由2、TabContent3、Navigation4、组件复用 三、限制条件 一、自定义组件冻结 自定义组件冻结功能专为优化复杂UI页面的性能而设计&#xff0c;尤其适用于包含多个页面栈、长列表或宫格布局的场景…

java练习(19)

ps:练习来自力扣 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 平衡 二叉搜索树。 // 定义二叉树节点类 class TreeNode {int val;TreeNode left;TreeNode right;TreeNode() {}TreeNode(int val) { this.val val; }TreeNode…

如何在华为harmonyOS上调试软件

1、设置-》关于手机-》HarmonyOS 版本连按多下&#xff0c;输入锁屏密码。显示开发者模式已打开。 2、设置-》搜索“开发人员选项”-》开启“开发人员选项”选项。 3、在 开发者选项 中找到 “USB 调试” 并开启。 4、开启 “仅充电时允许 ADB 调试”。 5、设置中开启 &quo…

Leetcode 算法题 14. 最长公共前缀

起因&#xff0c; 目的: 计划: 近期先做10个简单的题目&#xff0c;找找感觉&#xff0c; 然后开始做中等的。 题目来源&#xff1a; 14. 最长公共前缀 参考题解&#xff0c; 第二个写法&#xff0c;纵向扫描 代码 1 def solu(strs):# 方法二&#xff1a;纵向扫描# strs…

称呼计算器:智能科技,简化您的计算生活

一款手机应用程序&#xff0c;安卓设备上使用。这款计算器应用以其简洁的界面、实用的功能和良好的用户体验而受到用户的喜爱。 计算器的主要特点包括&#xff1a; 基本计算功能&#xff1a;支持加、减、乘、除等基本运算。 科学计算器模式&#xff1a;提供更高级的数学运算功…

SkyWalking 10.1.0 实战:从零构建全链路监控,解锁微服务性能优化新境界

文章目录 前言一、集成SkyWalking二、SkyWalking使用三、SkyWalking性能剖析四、SkyWalking 告警推送4.1 配置告警规则4.2 配置告警通知地址4.3 下发告警信息4.4 测试告警4.5 慢SQL查询 总结 前言 在传统监控系统中&#xff0c;我们通过进程监控和日志分析来发现系统问题&…

Docker+Jenkins自动化部署SpringBoot项目【详解git,jdk,maven,ssh配置等各种配置,附有示例+代码】

文章目录 DockerJenkins部署SpringBoot项目一.准备工作1.1安装jdk111.2安装Maven 二.Docker安装Jenkins2.1安装Docker2.2 安装Jenkins2.3进入jenkins 三.Jenkins设置3.1安装jenkins插件3.2全局工具配置全局配置jdk全局配置maven全局配置git 3.3 系统配置安装 Publish Over SSH …

有哪些免费的SEO软件优化工具

随着2025年互联网的不断发展&#xff0c;越来越多的企业意识到在数字营销中&#xff0c;网站的曝光度和排名至关重要。无论是想要提高品牌知名度&#xff0c;还是想要通过在线销售增加收益&#xff0c;SEO&#xff08;搜索引擎优化&#xff09;都是一项不可忽视的关键策略。而要…

deepseek本地部署,断网仍可用!

写在前面&#xff1a;本机搭建的回答速度特别慢&#xff0c;&#xff0c;&#xff0c;&#xff0c;能联网且追求快速解决问题的不建议使用&#xff01;&#xff01;&#xff01;&#xff01; 1、访问 Ollama 官网&#xff1a;https://ollama.com/ 2、选择Windows下载 …