【RabbitMQ】RabbitMQ高级:如何保证消息可靠性

目录

  • 概述
  • 异常捕获机制
  • 事务机制
  • 持久化存储机制
  • 发送端确认机制
    • 概述
    • 开启发布确认的方法
    • 单个发布确认
    • 批量发布确认
    • 异步发布确认
  • 消费端确认机制
  • 消息限流
  • 消息幂等性处理

概述

前面学习了如何简单使用RabbitMQ,在实际使用RabbitMQ时,我们还需要考虑很多,比如消息的可靠性:即保证消息发到队列并且消费者正确消费。

那为什么要保证消息的可靠性呢?

比如用支付宝给商家支付,需要考虑转账的话,会不会把我的钱扣了,商家没有收到我的钱?

一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多少钱,如果你绑定的是银行卡,可能这个时候你并没有收到支付的确认消息。往往是在一段时间之后,你会收到银行卡发来的短信,告诉你支付的信息。

支付平台如何保证这笔帐不出问题?

在这里插入图片描述

支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。

支付平台通过如下几种方式保证数据一致性:

  1. 分布式锁

这个比较容易理解,就是在操作某条数据时先锁定,可以用redis或zookeeper等常用框架来实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等待上一个操作的锁释放后再依次执行。

优点:能够保证数据强一致性。 缺点:高并发场景下可能有性能问题。

  1. 消息队列

消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息。

比如我们在用户充值完成后,会发送充值消息给账户系统,账户系统再去更改账户余额。

优点:异步、高并发 缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成功完成,不可能失败。

我们可以从以下几方面来保证消息的可靠性:

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP/RabbitMQ的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性

异常捕获机制

先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

在这里插入图片描述

另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试

事务机制

没有捕获到异常并不能代表消息就一定投递成功了。

我们可以手动开启事务,如果一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。

在这里插入图片描述

持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  1. Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失。
  2. Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。
  3. 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2即可实现消息的持久化,保证消息自身不丢失。

在这里插入图片描述

注意:将消息标记为持久化并不能完全保证不会丢失消息。尽管生产者告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完毕,消息还在缓存中的一个间隔点,此时并没有真正的将消息写入磁盘,持久性保证并不强,如果需要更强有力的持久化策略,参考后面的发布确认模式。

发送端确认机制

概述

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

在这里插入图片描述

RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号。

另外,通过设置channel.basicAck方法中的multiple参数,表示是否批量应答。如果 multiple 为 true ,则代表批量应答 channel 上未应答的消息(可能会丢失数据)。如果 multiple 为 false ,则代表只会应答 channel 上正在处理完毕的消息(推荐使用)。

confirm 模式最大的好处在于异步,一旦发布一条消息,生产者应用程序就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认的方法

默认情况下,发布确认是关闭的。如果要开启需要调用方法 confirmSelect ,所以当我们想要使用发布确认的时候,都需要在 channel 上调用该方法。

Channel channel = RabbitmqUtils.getChannel();
// 开启发布确认模式
channel.confirmSelect();

单个发布确认

这是一个简单的发布确认方式,它是一种 同步发布确认的方式,也就是发布一条消息之后只有它被确认,后续的消息才能继续发布。

这种发布确认的方式有一个最大的缺点:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布。

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

/**
 * 生产者
 */
public class Producer {
    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 发送消息的个数
     */
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        // 单个发布确认 耗时:341
        singleReleaseConfirmed();
    }

    /**
     * 单个发布确认
     *
     * @throws Exception
     */
    public static void singleReleaseConfirmed() throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        // 开启发布确认模式
        channel.confirmSelect();

        // 队列持久化
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());

        long startTime = System.currentTimeMillis();

        // 批量发送消息,单个发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = String.valueOf(i);
            // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

            boolean b = channel.waitForConfirms();
            if (b) {
                System.out.println("消息发送成功");
            }
        }

        long endTime = System.currentTimeMillis();

        // 耗时:341
        System.out.println("耗时:" + (endTime - startTime));

        System.out.println("消息发送完毕");
    }
}

批量发布确认

和单个发布确认相比,批量发布确认是先发布一批消息,然后一起确认,可以极大的提高吞吐量,但是这种方式的缺点是:当发生故障的时候会导致发布出现问题时,不知道是哪个消息出现问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

批量发布确认也是同步的,一样会阻塞后续消息的发布。

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

/**
 * 生产者
 */
public class Producer {
    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 发送消息的个数
     */
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        // 批量发布确认 耗时:39
        batchReleaseConfirmed();
    }

    /**
     * 批量发布确认
     *
     * @throws Exception
     */
    public static void batchReleaseConfirmed() throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        // 开启发布确认模式
        channel.confirmSelect();

        // 队列持久化
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());

        long startTime = System.currentTimeMillis();

        // 批量确认消息的大小
        int batchSize = 100;

        // 批量发送消息,批量发布确认
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = String.valueOf(i);
            // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

            if (i % batchSize == 0) {
                boolean b = channel.waitForConfirms();
                if (b) {
                    System.out.println("消息发送成功");
                }
            }

        }

        long endTime = System.currentTimeMillis();

        // 耗时:341
        System.out.println("耗时:" + (endTime - startTime));

        System.out.println("消息发送完毕");
    }

}

异步发布确认

异步发布确认虽然编程逻辑比上面两个要复杂,但是性价比是最高的(无论是可靠性还是效率)。它是利用回调函数来达到消息可靠性传递的。

在这里插入图片描述

示例:

import cn.hutool.core.map.MapUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 生产者
 */
public class Producer {
    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 发送消息的个数
     */
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        // 异步发布确认 耗时:33
        asynchronousReleaseConfirmed();
    }

    /**
     * 异步发布确认
     */
    public static void asynchronousReleaseConfirmed() throws Exception {
        Channel channel = RabbitmqUtils.getChannel();
        // 开启发布确认模式
        channel.confirmSelect();

        // 队列持久化
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());

        /**
         * 线程安全有序的哈希表,适用于高并发的情况
         * ① 可以将序号和消息进行关联。
         * ② 可以批量删除条目。
         * ③ 支持高并发。
         */
        ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();

        // 准备消息的监听器,用来监听那些消息成功了,那些消息失败了。
        // 消息确认成功的回调函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println("确认的消息的 ID = " + deliveryTag);
            // 删除掉已经确认的消息,剩下的就是未确认的消息
            if (multiple) { // 如果是批量
                // 删除已经确认的消息
                ConcurrentNavigableMap<Long, String> headMap = map.headMap(deliveryTag);
                headMap.clear();
            } else {
                // 只清除当前序列号的消息
                map.remove(deliveryTag);
            }
        };
        // 消息确认失败的回调函数
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println("未确认的消息的 ID = " + deliveryTag);
            // 输出未确认的消息
            String msg = map.get(deliveryTag);
            System.out.println("未确认的消息 = " + msg);

        };
        // 异步
        channel.addConfirmListener(ackCallback, nackCallback);

        long startTime = System.currentTimeMillis();
        // 批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = String.valueOf(i);

            // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

            // 记录所有要发送的消息
            // channel.getNextPublishSeqNo()获取下一个消息的序列号
            map.put(channel.getNextPublishSeqNo(), msg);

        }

        long endTime = System.currentTimeMillis();

        // 耗时:341
        System.out.println("耗时:" + (endTime - startTime));

        System.out.println("消息发送完毕");
    }

}

消费端确认机制

前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。

Consumer ACK,就是用来确认消息被消费者成功消费。,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。

一般而言,我们有如下处理手段:

  1. 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。则只要收到消息后就立即确认(消息出列,标记已消费),如果业务不自行处理则有丢失数据的风险。
  2. 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期。
  3. 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法basicAck方法返回Ack。

SpringBoot项目中支持如下的一些配置:

#最大重试次数 
spring.rabbitmq.listener.simple.retry.max-attempts=5 
#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack 确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true 
#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000 
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual

ack模式如下:

在这里插入图片描述

Springboot中使用如下:

  1. pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
  1. application.properties
spring.application.name=consumer_ack 
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/ 
spring.rabbitmq.username=root 
spring.rabbitmq.password=123456 
spring.rabbitmq.port=5672
#最大重试次数 
spring.rabbitmq.listener.simple.retry.max-attempts=5 
#是否开启消费者重试(为false时关闭消费者重试, # 意思不是“不重试”,而是一直收到消息直到jack确认或者一直到超时) 
spring.rabbitmq.listener.simple.retry.enabled=true 
#重试间隔时间(单位毫秒) 
spring.rabbitmq.listener.simple.retry.initial-interval=5000 
# 重试超过最大次数后是否拒绝 
spring.rabbitmq.listener.simple.default-requeue-rejected=false 
#ack模式 
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 主入口类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemo {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo.class, args);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Thread.sleep(5000);
            for (int i = 0; i < 10; i++) {
                MessageProperties props = new MessageProperties();
                props.setDeliveryTag(i);
                Message message = new Message(("消息:" + i).getBytes("utf-8"), props);
                // this.rabbitTemplate.convertAndSend("ex.biz", "biz", "消息:" + i);
                this.rabbitTemplate.convertAndSend("ex.biz", "biz", message);
            }
        };
    }
}
  1. RabbitConfig
package com.lagou.rabbitmq.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        return new Queue("q.biz", false, false, false, null);
    }

    @Bean
    public Exchange exchange() {
        return new DirectExchange("ex.biz", false, false, null);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("biz").noargs();
    }
}
  1. MessageListener
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

import java.io.IOException;
import java.util.Random;

// @Component
public class MessageListener {
    private Random random = new Random();

    /**
     * NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据 的风险
     * AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
     * MANUAL模式,需要显式的调用当前channel的basicAck方法
     *
     * @param channel
     * @param deliveryTag
     * @param message
     */
    // @RabbitListener(queues = "q.biz", ackMode = "NONE")
    // @RabbitListener(queues = "q.biz", ackMode = "AUTO")
    @RabbitListener(queues = "q.biz", ackMode = "MANUAL")
    public void handleMessageTopic(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload String message) {
        System.out.println("RabbitListener消费消息,消息内容:" + message);
        try {
            if (random.nextInt(10) % 3 != 0) {
                // 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
                // channel.basicNack(deliveryTag, false, true)
                // 手动拒绝消息。第二个参数表示是否重新入列
                channel.basicReject(deliveryTag, true);
            } else {
                // 手动ack,deliveryTag表示消息的唯一标志,multiple表示是 否是批量确认
                channel.basicAck(deliveryTag, false);
                System.err.println("已确认消息:" + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  1. BizController
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
public class BizController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Random random = new Random();

    @RequestMapping("/biz")
    public String getBizMessage() {
        String message = rabbitTemplate.execute(new ChannelCallback<String>() {
            @Override
            public String doInRabbit(Channel channel) throws Exception {
                final GetResponse getResponse = channel.basicGet("q.biz", false);
                if (getResponse == null) return "你已消费完所有的消息";
                String message = new String(getResponse.getBody(), "utf-8");
                if (random.nextInt(10) % 3 == 0) {
                    channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
                    return "已确认的消息:" + message;
                } else {
                    // 拒收一条消息
                    channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), true);
                    // 可以拒收多条消息
                    // channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, true); 
                    return "拒绝的消息:" + message;
                }
            }
        });
        return message;
    }
}

消息限流

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧…

下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。

  1. 限制生产者发送

RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。

全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

在这里插入图片描述

  1. 基于连接流控

RabbitMQ 还默认提供了一种基于credit flow流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。

总的状态:

在这里插入图片描述

connection、channel、queue的状态:

在这里插入图片描述

  1. 消费端限流

RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。

比较值得注意的是QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。

消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。

如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。

总结:

生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。

再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。

提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:

  1. 优化应用程序的性能,缩短响应时间(需要时间)
  2. 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
  3. 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)

示例:

@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        // SimpleRabbitListenerContainerFactory发现消息中有content_type有text 就会默认将其转换为String类型的,没有content_type都按byte[]类型 
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);
        // 设置并发线程数 
        factory.setConcurrentConsumers(10);
        // 设置最大并发线程数 
        factory.setMaxConcurrentConsumers(20);
        return factory;
}

消息幂等性处理

在我们追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费。

RabbitMQ层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息中间件都没有实现。

事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛。

一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。对于幂等的方法,不用担心重复执行会对系统造成任何改变

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/313266.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js设计与实现阅读-2

Vue.js设计与实现阅读-2 1、前言2、框架设计的核心要素2、1 提升用户体验2、2 控制代码体积2、3 Tree-Shaking2、4 特性开关2、5 错误处理 1、前言 上一篇我们了解到了 命令式和声明式的区别&#xff0c;前者关注过程&#xff0c;后者关注结果了解了虚拟dom存在的意义&#x…

数据库SELECT语句

文章目录 一、检索数据二、排序检索三、过滤数据四、数据过滤4.1 组合WHERE子句1. AND操作符2. OR操作符3. 计算次序 4.2 IN操作符4.3 NOT操作符 五、用通配符过滤LIKE操作符1. 百分号&#xff08;%&#xff09;通配符2. 下划线&#xff08;_&#xff09;通配符 使用通配符的技…

盈利之道:下单前的必问之问

投资者在过去的交易经历中&#xff0c;通常都会面临所谓的“交易低谷”。交易低谷是指在交易过程中难以实现盈利或可能导致进一步亏损的阶段。这种面临损失或没有盈利的时期可能发生在任何人身上&#xff0c;无论是由于市场变化、投资者策略调整还是其他原因。为了应对这种情况…

CSS基础方法——引入方式、属性、基础选择器

CSS 主要用于设置 HTML 页面中的文本样式&#xff08;字体、大小、颜色、对齐方式……&#xff09;、图片样式&#xff08;宽高、边框样式、边距……&#xff09;以及版面的布局和外观显示样式。 1、CSS引入方式 行内样式 写在标签中&#xff0c;通常不使用&#xff0c;只做…

优惠券兑换码生成需求——事务同步回调问题分析

前段时间收到一个优惠券兑换码的需求&#xff1a;管理后台针对一个优惠券发起批量生成兑换码&#xff0c;这些兑换码可以导出分发到各个合作渠道&#xff08;比如&#xff1a;抖音、京东等&#xff09;&#xff0c;用户通过这些渠道获取到兑换码之后&#xff0c;再登录到我司研…

提升测试效率,轻松并行运行测试——探秘Pytest插件pytest-xdist

在软件开发中&#xff0c;测试是确保代码质量的重要一环。然而&#xff0c;随着项目规模的增大&#xff0c;测试用例的数量也随之增多&#xff0c;测试的执行时间可能成为一个瓶颈。为了解决这个问题&#xff0c;Pytest提供了丰富的插件生态系统&#xff0c;其中 pytest-xdist …

黑群晖6.x 7.x ABB Active Backup for Business 套件激活方法

注意事项&#xff1a; 要先下载安装好Active Backup for Business套件再操作。SN码在【控制面板】 - 【信息中心】 -【产品序列号】。建议复制到记事本内修改内容。群晖的https是默认的5001端口&#xff0c;如果你的https端口号换过请自行修改&#xff1a;5001 为当前的端口号…

spacedesk 变成黑白的分析

测试发现只要调整时间到2024 就会出现黑白而且是建立连接是才检测的&#xff0c;那么应该存在于R3部分的可能性大 IDA分析找到2024

[论文阅读]4DRadarSLAM: A 4D Imaging Radar SLAM System for Large-scale Environments

目录 1.摘要和引言&#xff1a; 2. 系统框架&#xff1a; 2.1 前端&#xff1a; 2.2 回环检测&#xff1a; 2.3 后端&#xff1a; 3.实验和分析&#xff1a; 4.结论 1.摘要和引言&#xff1a; 这篇论文介绍了一种名为“4DRadarSLAM”的新型4D成像雷达SLAM系统&#xff0…

RT-DETR优化:UNetv2多层次特征融合模块结合DualConv、GSConv

🚀🚀🚀本文改进:多层次特征融合(SDI)结合DualConv、GSConv模块等实现二次创新 🚀🚀🚀SDI 亲测在多个数据集能够实现涨点,同样适用于小目标检测 🚀🚀🚀RT-DETR改进创新专栏:http://t.csdnimg.cn/vuQTz 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定…

Vue、uniApp、微信小程序、Html5等实现数缓存

此文章带你实现前端缓存&#xff0c;利用时间戳封装一个类似于Redis可以添加过期时间的缓存工具 不仅可以实现对缓存数据设置过期时间&#xff0c;还可以自定义是否需要对缓存数据进行加密处理 工具介绍说明 对缓存数据进行非对称加密处理 对必要数据进行缓存&#xff0c;并…

太平洋产险海南分公司:春季爱车保养,就看这几点!

一年之计在于春&#xff0c;春天不仅是万物复苏的好时节&#xff0c;也是一年中非常适合汽车养护的季节。 刚刚过去的春节&#xff0c;汽车的使用频率大大增加&#xff0c;很多车主都准备对爱车进行一次全面保养。加上立春过后&#xff0c;天气渐暖&#xff0c;许多车主也计划开…

答题小程序源码系统:自带流量主广告位+视频激励广告 带完整的代码安装包以及搭建教程

随着互联网的迅速发展&#xff0c;各种应用程序层出不穷&#xff0c;而答题类小程序由于其独特的互动性和吸引力&#xff0c;成为了当前最热门的应用之一。答题小程序源码系统是一款基于微信小程序开发的源代码系统&#xff0c;它具有丰富的功能和灵活的定制性&#xff0c;可以…

搭建算法日志自检小系统

&#x1f952; 前言 目前演示的是一个工具&#xff0c;但如此&#xff0c;未来完成有潜力可以演变为一整套系统。 &#x1f451;现场人员自检失败表计点位教程V2.0 NOTE: 如果没有“logfiles-meter-tool“目录的请联系我们进行提供&#xff01; &#x1f447; 进入<dist>…

使用AutoDL云计算平台训练并测试Pytorch版本NeRF代码

文章目录 前言一、数据集及代码获取二、租用并设置服务器三、Pycharm远程开发四、训练并测试代码 前言 因为第一次在云服务器上跑代码&#xff0c;所以在这里记录一下。 一、数据集及代码获取 nerf-pytorch项目是 NeRF 的忠实 PyTorch 实现&#xff0c;它在运行速度提高 1.3 倍…

docker 利用特权模式逃逸并拿下主机

docker 利用特权模式逃逸并拿下主机 在溯源反制过程中&#xff0c;会经常遇到一些有趣的玩法&#xff0c;这里给大家分享一种docker在特权模式下逃逸&#xff0c;并拿下主机权限的玩法。 前言 在一次溯源反制过程中&#xff0c;发现了一个主机&#xff0c;经过资产收集之后&…

SSL证书与HTTPS的关系

SSL证书是一种数字证书&#xff0c;由权威的证书颁发机构颁发。它包含了一个公钥和有关证书所有者的一些信息&#xff0c;如名称、组织、邮箱等。SSL证书的主要作用是实现数据加密和身份验证&#xff0c;确保数据在传输过程中的安全性和完整性。 HTTPS是一种基于HTTP协议的安全…

Web开发:SQLsugar的安装和使用

一、安装 第一步&#xff0c;在你的项目中找到解决方案&#xff0c;右键-管理解决方案的Nuget 第二步&#xff0c;下载对应的包&#xff0c;注意你的框架是哪个就下载哪个的包&#xff0c;一个项目安装一次包即可 点击应用和确定 安装好后会显示sqlsugar的包 二、使用&#xf…

UOS Python+Qt5实现声卡回路测试

1.回路治具设计&#xff1a; 2.Ui界面&#xff1a; 3.源代码&#xff1a; # -*- coding: utf-8 -*-# Form implementation generated from reading ui file SoundTestWinFrm.ui # # Created by: PyQt5 UI code generator 5.15.2 # # WARNING: Any manual changes made to this…