RabbitMQ从入门到入土

同步与异步

同步调用

优势:

  • 时效性强,等到结果后就返回

问题:

  • 扩展性差

  • 性能下降

  • 级联失败问题

异步调用

优势:

  • 耦合度低,扩展性强

  • 无需等待,性能好

  • 故障隔离,下游服务故障不影响上游

  • 缓存消息,削峰填谷

问题:

  • 不能立刻获得结果,时效性差

  • 不确定下游业务执行是否成功

  • 业务安全依赖于Broker的可靠性

Broker:(代理,常指一种中间件,用于协调和管理不同组件之间的通信、交互、服务调用)

异步调用基于消息推送的方式,一般包含3个角色。

  • 消息发送者

  • 消息代理:管理、暂存、转发消息

  • 消息接收者

MQ技术选型

MQ:消息队列,就是存放消息的队列,也就是异步调用中的Broker。

RabbitMQ、ActiveMQ、Rocket MQ、Kafka对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

个人认为:如果在项目中使用的是RabbitMQ,面试官问你为啥使用这个可以从以下入手:

  • 可用性高

  • 消息可靠性高

  • 虽然单机吞吐量一般,但是消息延迟低。为什么消息延迟低呢?

    • RabbitMQ之所以被认为能够提供较低的消息延迟,主要归因于以下几个因素:

      1. 高性能的底层实现:RabbitMQ是用Erlang语言编写的,Erlang专为高并发、分布式系统设计,提供了轻量级进程和共享无锁数据结构,这使得RabbitMQ在处理大量并发连接和消息时表现得非常高效。

      2. 零拷贝技术:RabbitMQ利用操作系统提供的零拷贝特性,减少数据在内核空间和用户空间之间的复制次数,从而加快消息传输速度,降低延迟。

      3. 多路复用的TCP连接:通过使用Channel(信道)这一概念,RabbitMQ可以在单个TCP连接上复用多个逻辑连接,减少了网络连接的开销,提升了通信效率。

      4. 可配置的消息优先级:RabbitMQ允许为消息设置优先级,这在某些场景下可以帮助紧急或高优先级的消息更快地被消费,减少它们的等待时间。

      5. 社区支持和成熟度:作为一个成熟的开源项目,RabbitMQ拥有活跃的开发者社区和丰富的文档资源,这意味着它经过了广泛的测试和优化,能够提供稳定的低延迟表现。

好,现在开始来介绍我们的RabbitMQ了

RabbitMQ

认识

整体架构和核心概念如下:

  • publisher:消息发布者

  • consumer:消息消费者

  • queue:队列,存储消息

  • exchange:交换机:转发消息

image-20240215142856543

Java客户端使用步骤

我们要知道的是RabbitMQ是根据AMQP协议来实现的:

AMQP:用于应用程序之间传递业务消息的开放标准

Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息

使用之前你要从官网进行下载RabbitMQ这个中间件,并启动它。

下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.3

官方使用地址:RabbitMQ Tutorials | RabbitMQ

  1. 引入Spring-amqp的依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.0</version> <!-- 或者使用最新版本 -->
    </dependency>

    2、配置RabbitMQ服务端信息

    spring:
      rabbitmq:
        host: 127.0.0.1 #ip
        port: 5672      #端口
        username: guest #账号
        password: guest #密码
        virtualHost:    #链接的虚拟主机
        addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
        requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
        publisherConfirms: true  #发布确认机制是否启用
        #确认消息已发送到交换机(Exchange)
        #publisher-confirm-type参数有三个可选值:
        #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
        #CORRELATED:消息从生产者发送到交换机后触发回调方法。
        #NONE(默认):关闭发布确认模式。
        #publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true
        publisherReturns: true #发布返回是否启用
        connectionTimeout: #链接超时。单位ms。0表示无穷大不超时

    3、利用RabbitTemplate发送消息

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    ​
    public class Producer {
    ​
        private static final String QUEUE_NAME = "hello";
    ​
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

    4、利用@RabbitListener注解声明要监听的队列,监听消息

    import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    ​
    public class Consumer {
    ​
        private static final String QUEUE_NAME = "hello";
    ​
        public static void main(String[] argv) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    ​
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    ​
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        }
    }

WorkQueue

  • 一个队列绑定多个消费者,加快消息处理速度。

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

Java声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建

  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

  • Binding:用于声明队列和交换机绑 定关系,可以用工厂类BindingBuilder构建

在consumer中创建一个类,声明队列和交换机:

1、通过配置实现

package cn.itcast.mq.config;
​
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
​
    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
​
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
​
    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
​
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
​
2、通过注解实现

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1", durable="true"),
        exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws Exception{
    System.out.println("消费者1收到了 direct.queue1的消息:【" + msg + "】");
}

交换机

消息一般都是通过exchange来发送消息的,而不是直接发送到队列中的。

常用交换机的类型有以下三种:

  • Fanout(广播):将消息发送给所有跟该交换机绑定了的queue(就是每个人都能收到)

  • Direct(定向):消息根据规则路由到指定的queue

  • Topic(话题):根据类别来进行发送消息(类似Direct)

Fouout交换机

这里定义了一个生产者发送消息:

    @Test
    public void testSendFanout(){
        String exchangeName = "test.fanout";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName,null,msg);
    }

下面是两个消费者

package cn.itcast.mq.listener;
​
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.time.LocalTime;
​
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanout1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
​
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanout2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
​
}
​

这里是接收到的消息,每个都接收到了。

消费者2........接收到消息:【hello, everyone!】18:50:53.628336200
消费者1接收到消息:【hello, everyone!】18:50:53.628336200

定向交换机

Direct Exchange会将收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一直的队列

添加配置类,将交换机和队列进行绑定:

@Configuration
@EnableRabbit
public class RabbitConfig {
​
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("test.direct");
    }
​
    @Bean
    Queue redQueue() {
        return new Queue("redQueue", false); // false 表示队列不是持久化的
    }
​
    @Bean
    Binding bindingRed(DirectExchange directExchange, Queue redQueue) {
        return BindingBuilder.bind(redQueue()).to(directExchange).with("red"); // 绑定键为"red"
    }
}

发送端:

  @Test
    public void testSendFanout(){
        String exchangeName = "test.direct";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName,"red",msg);
    }   //这个red是以及绑定了的BindingKey

消费端:

@Service
public class FanoutReceiver {
​
    @RabbitListener(queues = "redQueue")
    public void receiveMessage(String message) {
        System.out.println("Received from redQueue: " + message);
    }
}

话题交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分隔。例如:

China.news (代表中国新闻这个列表)

Queue与Exchange指定BindingKey时可以使用通配符:

  • #: 代指0个或多个单词

  • *:代指一个单词

配置类代码:

@Configuration
@EnableRabbit
public class RabbitConfig {
​
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("test.topic");
    }
​
    @Bean
    Queue chinaNewsQueue() {
        return new Queue("china.news.queue", false);
    }
​
    @Bean
    Queue chinaSportsQueue() {
        return new Queue("china.sports.queue", false);
    }
​
    @Bean
    Binding bindingChinaNews(TopicExchange topicExchange, Queue chinaNewsQueue) {
        return BindingBuilder.bind(chinaNewsQueue()).to(topicExchange).with("China.news"); 
    }
​
    @Bean
    Binding bindingChinaSports(TopicExchange topicExchange, Queue chinaSportsQueue) {
        return BindingBuilder.bind(chinaSportsQueue()).to(topicExchange).with("China.sports.*"); // 匹配以"China.sports."开头的所有Routing Key
    }
}

生产者代码:

@Test
public void testSendTopic() {
    String exchangeName = "test.topic";
    String routingKey = "China.news"; // 使用符合Topic规则的Routing Key
    String msg = "Breaking news from China!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}

消费者:

@Service
public class TopicReceiver {
​
    @RabbitListener(queues = "china.news.queue")
    public void receiveNews(String message) {
        System.out.println("Received news from 'China.news': " + message);
    }
​
    @RabbitListener(queues = "china.sports.queue")
    public void receiveSports(String message) {
        System.out.println("Received sports update from 'China.sports.*': " + message);
    }
}

消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

所以,我们就希望让消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

1、引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2、配置Bean

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

可靠性

发送者的可靠性

生产者重连

有时候由于网络波动,出现连接MQ失败情况。

我们可以通过配置开启连接失败后的重连机制:

spring:
    rabbitmq:
        connection-timeout: 1s #设置MQ的连接超时时间
            template:
                retry:
                    enabled: true #开启超时重试机制
                        inital-interval: 1000ms #失败后的初始等待时间
                        multiplier: 1 #失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipler
                        max-attempts: 3 #最大重试次数
                        

注意:

  • 当网络不稳定的时候,利用重试机制可以有效的提高消息发送成功率,但是SpringAMQP的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是呗阻塞的,会影响性能。

  • 如果对于业务性能有要求的时候,建议禁用重试机制。如果一定要用,请进行适当的配置。

生产者确认

生产者确认的作用:为了让生产者知道消息是否被消费成功。

RabbitMQ的Publisher Confirm 和Publisher Return两种确认机制:

  • ConfirmCallback:当RabbitMQ成功处理消息时调用

  • ReturnCallback:当消息无法路由到任何队列时调用,例如由于交换器找不到匹配的队列,此时会返回消息及原因。

开启确认机制后,以下两种情况会返回消息被接收的ACK:

  • 消息被投递到匹配的队列

  • 持久化消息写入磁盘

注意这一种情况:

当消息被投递到MQ后,但是路由失败(没有匹配规则的队列),一样会返回ACK

实现生产者确认
  1. 配置实现:

    spring:
        rabbitmq:
            publisher-confirm-type: correlated
            publisher-returns: true
            
    #publisher-confirm-type有3中模式:
    #- none:关闭confirm机制
    #- simple:同步阻塞等待MQ的回执消息
    #- correlated:MQ异步回调方式返回回执消息

  2. import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    public class RabbitMQProducerConfirmAndReturn {
    ​
        private static final String QUEUE_NAME = "my_queue";
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String ROUTING_KEY = "my_routing_key";
    ​
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 根据实际情况设置RabbitMQ服务器地址
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    ​
            // 开启发布确认
            channel.confirmSelect();
    ​
            AtomicInteger outstandingConfirms = new AtomicInteger(0);
    ​
            // 添加ConfirmCallback
            channel.addConfirmListener((deliveryTag, multiple) -> {
                System.out.println("Confirmed delivery for message with tag: " + deliveryTag);
                outstandingConfirms.decrementAndGet();
            }, (deliveryTag, multiple) -> {
                System.out.println("Negative acknowledgement received for message with tag: " + deliveryTag);
                // 这里可以根据需要处理未确认消息的逻辑
            });
    ​
            // 添加ReturnCallback
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                System.out.println("Returned message: " + new String(body));
                Map<String, Object> headers = properties.getHeaders();
                if (headers != null && headers.containsKey("messageId")) {
                    String messageId = (String) headers.get("messageId");
                    System.out.println("Message with ID " + messageId + " was not routed.");
                    // 这里可以添加消息未路由的重试逻辑
                }
            });
    ​
            try {
                String customMessageId = "msg-id-123";
                sendMessageWithId(channel, EXCHANGE_NAME, ROUTING_KEY, "Hello, World!".getBytes(), customMessageId);
            } finally {
                channel.close();
                connection.close();
            }
        }
    ​
        private static void sendMessageWithId(Channel channel, String exchange, String routingKey, byte[] messageBody, String messageId) throws IOException {
            Map<String, Object> headers = new HashMap<>();
            headers.put("messageId", messageId); // 自定义消息ID作为header
    ​
            BasicProperties props = new BasicProperties.Builder()
                    .deliveryMode(2) // 持久化消息
                    .headers(headers)
                    .build();
    ​
            channel.basicPublish(exchange, routingKey, props, messageBody);
            outstandingConfirms.incrementAndGet(); // 记录待确认的消息数
            System.out.println("Published message with ID " + messageId);
        }
    }

MQ的可靠性

数据持久化

默认情况,Rabbit会将收到的消息存在内存中。

RabbitMQ实现数据持久化(存放在磁盘中)有3个方面:

  • 交换机持久化

    • 声明的时候将 durable属性配置为true

  • 队列持久化

    • 声明的时候将 durable属性配置为true

  • 消息持久化

    • 设置消息属性中的deliveryMode2来标记消息为持久化

注意事项:

  • 持久化消息并不保证零丢失,因为它们在内存中排队等待写入磁盘时仍有可能因系统崩溃而丢失。

  • 持久化会增加消息发布的延迟,因为消息必须等待被写入磁盘。

  • 队列和交换器的持久化并不会自动持久化其中的消息,消息的持久化需要单独设置。

  • 如果之前声明的队列或交换器是非持久化的,需要先删除原有队列或交换器,然后重新声明为持久化版本,否则会遇到错误。

  • 为了确保消息不丢失,除了持久化之外,还需要考虑消费者确认(Ack)机制,以及可能的死信队列和重试策略。

Lazy Queue

惰性队列特征:

  1. 接收到消息直接存入磁盘,非内存(内存中只保留最近的消息,默认2048条)

  2. 消费者要消费消息才会从磁盘读取消息并加载到内存中

  3. 支持数百万条的消息存储

在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

实现方式
  1. Java代码声明中实现

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")  //持久化
                .lazy()                 //开启lazy模式
                .build();
    }

  2. 消费端实现:

    @RabbitListener(queuesToDeclare = @Queue(
                name = "lazy.queue",
                arguments = @Argument(name = "x-queue-mode", value = "lazy")))//lazy开启了消息持久化
    public void listenLazyQueue(String msg){
        log.info("接收到lazy.queue的消息:{}",msg);
    }

消费者的确认机制

  • 为了确定消费者是否成功处理消息而提出的消费者确认机制。

  • 当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息

    • nack:消息处理失败,RabbitMQ需要再次投递消息

    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(消费者无法处理接收到的消息时;消费者在处理消息的过程中发生了异常)

实现:

SpringAMQP已经实现了消息确认功能,运行通过配置文件选择ACK处理方式,有3种方式:

  • none:不处理。即消息投递后,直接返回ack,不安全,不建议使用

  • manual :手动模式。需要自己在业务种调用api,发送ack或者reject,存在业务入侵,但是更灵活

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强,当业务正常执行,返回ack,出现异常

    • 业务异常,nack

    • 消息处理或校验异常,reject

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1
                    acknowledge-mode: manual

建议一般采用manual实现。

失败重试机制

  • 问题:消费者出现异常后,消息会不断的重新入队,发送给消费者,这样无限循环,导致mq消息处理飙升。

  • 解决方案:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1
                    acknowledge-mode: auto
                retry:
                    enable: true
                    initial-interval: 1000ms #初始失败等待时长
                    multiplier: 1
                    max-attempts: 3 #最大重试次数
                    st
                    
                    atuless: true #true无状态,false有状态。如果业务中包含事务,这里改成false

如果重试次数耗尽的时候,如果消息依然失败,还有兜底的策略,可以实现MessageRecoverer接口实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认这种方式)

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽,将失败消息发送给指定交换机(专门用来处理失败消息的,死信交换机)

RepublishMessageRecoverer的示例:

  1. 定义接收失败消息的交换机、队列,并绑定关系。此处实现略。

  2. 定义RepublishMessageRecoverer

    @ConditionalOnProperty(prefix = "spring.rabbitmq.retry",name = "enable", havingValue = "true")
    //可以在配置类上加入这个代码,只有上述条件满足的时候,配置才实现
    ​
    ​
    //这里还有配置交换机、队列、以及进行绑定
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, " error.direct", "error");
    }

业务幂等性

幂等:一个数学概念。f(x) = f( f(x) )。意思就是:同一个业务,执行一次或多次,对业务的影响是一致的。(比如说删除和查询)

唯一消息id

给每一消息都设置一个唯一id,利用唯一id区分是否重复消息。

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者

  2. 消费者接收到消息后处理自己的业务,业务完成将消息id保存到数据库

  3. 如果下次收到相同消息,去数据库中判断是否存储,存在则重复消息放弃处理

@Bean
public MessageConverter messageConverter(){
    //1、定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    //2、配置启动自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否重复消息
    jjmc.setCreateMessageIds(true);
}

这个消息id是保存在header中的

@RabbitListener(queues = "yourQueueName")
public void listen(Message message) {
    MessageProperties messageProperties = message.getMessageProperties();
    String messageId = messageProperties.getMessageId(); // 获取消息ID
    if (messageId != null) {
        System.out.println("Received message with ID: " + messageId);
        // 进行幂等性检查或其他基于ID的处理逻辑
    }
    // 其他消息处理逻辑...
}

业务判断

结合实际业务逻辑做判断。

比如说一个订单业务,我们要防止订单状态修改后不再继续被修改,就可以对订单业务进行判断:

  • 如果订单是未支付状态,才变成已支付状态

  • 如果订单是已支付状态,状态不变

延迟消息

生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定消息后才收到消息。

死信交换机

死信:当队列中的消息满足下列情况之一后,就成为了死信。(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。

  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

在队列的dead-letter-exchange属性中指定一个A交换机,则该队列中的死信会投递到A交换机中,该A交换机就是死信交换机Dead Letter Exchange,简称DLX)。

image-20240611130025919

插件实现

官方提供了一个插件实现延迟消息,网址如下:Community Plugins | RabbitMQ

下载rabbitmq_delayed_message_exchage

1、声明延迟交换机

1)方式一

 @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct")
                .delayed()  //设置delay的属性为true
                .durable(true)
                .build();
    }

2)方式二

  
  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed = true 开启延迟交换机
            key = "delay"
    ))
    public void listenToDelayMessage(String msg){
        log.info("接收到delay.queue的延迟消息:{}",msg);
    }

2、测试:

@Test
    void testSendDelayMessage(){
        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息延迟时间
                message.getMessageProperties().setDelay(1000);
                return message;
            }
        });
        log.info("消息发送成功!");
    }

消息堆积

为什么会出现消息堆积这种问题呢?

RabbitMQ 消息堆积问题通常发生在生产者发送消息的速度远大于消费者处理消息的速度时,这可能导致队列中消息的累积,直至达到存储上限,进而影响系统性能甚至导致消息丢失。

解决方案

要生成解决方案的前提,我们首先要先确定对应的消息堆积产生场景,对症下药。

  • 消费者处理消息速度太慢

    • 增加消费者数量

    • 优化消费者性能,优化代码,增加资源

    • 消息预取限制(prefetch在配置文件中的配置),以避免一次处理过多消息导致处理缓慢

  • 队列容量太小

    • 增加队列容量

  • 网络故障,导致消息可能丢失,导致消息在队列中堆积

    • 监控 + 告警,确保网络故障发送时能快速发现并解决问题

    • 持久化 + 高可用:确保消息和队列持久化以避免消息丢失,并使用镜像队列提高可用性

  • 消费者故障

    • 使用死信队列:将无法处理的添加到死信队列中,避免阻塞主队列

    • 容错机制:消费者自动重复和错误处理逻辑

  • 队列配置不当

    • 优化队列配置:检测并优化消息确认模式,队列长度限制和其他相关配置

  • 消息太大了,处理时间较长

    • 消息分片:将大型消息分割成小的消息片段,加速处理

  • 业务逻辑复杂或耗时

    • 优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间

  • 消息产生速度快于消费者速度

    • 限流

    • 负载均衡:消费者间公平分配,避免个别消费者加载

  • 其他配置优化

    • 设置消息优先级,确保优先级高的先处理

    • 配置文件描述符的限制,内存使用限制

补充

消息队列的路由模型

消息队列的路由模型指的是消息从生产者到消费者的传递路径和方式。常见的消息队列路由模型包括:

  • 点对点模型(Point-to-Point):消息被发送到一个队列中,只有一个消费者可以接收并处理该消息。

  • 发布-订阅模型(Publish-Subscribe):消息被发送到一个主题(或交换机)中,多个消费者可以订阅该主题,并且每个消费者都可以收到消息并独立处理。

  • 路由模型(Routing):消息根据特定的路由规则被发送到不同的队列中,消费者根据队列接收并处理消息。

消息队列的应用经验(使用场景)

  • 异步通信:在系统内部或者不同系统之间进行异步通信,提高系统的响应速度和吞吐量。

  • 任务调度和削峰填谷:通过消息队列进行任务调度,将请求分散到不同的时间段或者不同的处理节点,避免系统在高峰时期负载过重。

  • 分布式事务:在分布式系统中使用消息队列进行事件的发布和订阅,保证系统的一致性和可靠性。

  • 日志收集和数据分析:通过消息队列将日志数据发送到消息队列中进行集中收集和处理,方便进行数据分析和监控。

消息消费顺序性问题

很多时候,我们的MQ使用并不需要保证顺序消费,比如订单超时等。

但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如在库存更新场景中,减少库存和增加库存的通知必须按照接收顺序处理,以防止库存数量错误或超卖现象。

那这个时候我们该怎么实现我们消息消费顺序性呢?

目前的方案是:

  • 单一消费者:一个队列绑定一个消费者,采用单活模式实现顺序消费

  • 分区策略:将不同的消息进行分区(Direct),然后不同的区绑定不同的消息队列,可以加大并发

  • 排序:顺序消息ID+手动排序

这里是一个单活模式的例子:

/**
     * 创建一个 单活模式的队列
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        HashMap<String, Object> args = new HashMap<>();
        // x-single-active-consumer 单活模式 队列
        // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,
        // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/700820.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】12.C语言内存函数

文章目录 1.memcpy使用和模拟实现2.memmove使用和模拟实现3.memset函数的使用4.memcmp函数的使用 memcpy&#xff1a;内存拷贝 memmove&#xff1a;内存移动 memset&#xff1a;内存设置 memcmp&#xff1a;内存比较 1.memcpy使用和模拟实现 memcpy&#xff1a;内存拷贝 void…

Mysql查询分析工具Explain的使用

一、前言 作为一名合格的开发人员&#xff0c;与数据库打交道是必不可少的&#xff0c;尤其是在业务规模和数据体量大规模增长的条件下&#xff0c;应用系统大部分请求读写比例在10:1左右&#xff0c;而且插入操作和一般的更新操作很少出现性能问题&#xff0c;遇到最多的&…

产品人生(12):从“产品生命周期管理”看如何做“职业规划”

产品生命周期管理是产品人常接触的一个概念&#xff0c;它是一种全面管理产品从概念构想、设计开发、生产制造、市场推广、销售使用&#xff0c;直至最终退役的全生命周期过程的方法论和一系列业务流程。下面我们来简单介绍下产品生命周期管理&#xff1a; 概念阶段&#xff1a…

MybatisPlus代码生成器使用案例

针对数据库中的实体类表&#xff0c;自动生成相关的pojo类&#xff0c;mapper&#xff0c;service等 1. Get-Started 基于mybatisplus&#xff0c;idea下载mybatisplus插件 sql文件 /*!40101 SET OLD_CHARACTER_SET_CLIENTCHARACTER_SET_CLIENT */; /*!40101 SET NAMES utf8 …

面试官:MySQL也可以实现分布式锁吗?

首先说结论&#xff0c;可以做&#xff0c;但不推荐做。 我们并不推荐使用数据库实现分布式锁。 如果非要这么做&#xff0c;实现大概有两种。 1、锁住Java的方法&#xff0c;借助insert实现 如何用数据库实现分布式锁呢&#xff0c;简单来说就是创建一张锁表&#xff0c;比…

PB案例学习笔记-19制作一个图片按钮

写在前面 这是PB案例学习笔记系列文章的第19篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

MAC认证

简介 MAC认证是一种基于接口和MAC地址对用户的网络访问权限进行控制的认证方法&#xff0c;它不需要用户安装任何客户端软件。设备在启动了MAC认证的接口上首次检测到用户的MAC地址以后&#xff0c;即启动对该用户的认证操作。认证过程中&#xff0c;不需要用户手动输入用户名…

Leetcode3174. 清除数字

Every day a Leetcode 题目来源&#xff1a;3174. 清除数字 解法1&#xff1a;栈 用栈模拟&#xff0c;遇到数字就弹出栈顶&#xff0c;遇到字母就插入栈。 最后留在栈里的就是答案。 代码&#xff1a; /** lc appleetcode.cn id3174 langcpp** [3174] 清除数字*/// lc c…

如何做好期货投资?

期货&#xff0c;这个词对于很多人来说可能还是个陌生的词汇&#xff0c;但是&#xff0c;随着经济的发展和人们对金融投资的需求增加&#xff0c;期货投资也变得越来越受到关注。那么&#xff0c;如何才能做好期货投资呢&#xff1f; 首先&#xff0c;了解期货的基本知识是非…

现货黄金交易多少克一手?国内外情况大不同

如果大家想参与国际市场上的现货黄金交易&#xff0c;就应该从它交易细则的入手&#xff0c;先彻底认识这个品种&#xff0c;因为它是来自欧美市场的投资方式&#xff0c;所以无论是从合约的计的单位&#xff0c;计价的货币&#xff0c;交易的具体时间&#xff0c;以及买卖过程…

word空白页删除不了怎么办?

上方菜单栏点击“视图”&#xff0c;下方点击“大纲视图”。找到文档分页符的位置。将光标放在要删除的分节符前&#xff0c;按下键盘上的“Delet”键删除分页符。

Filament 【表单操作】修改密码

场景描述&#xff1a; 新增管理员信息时需要填写密码&#xff0c;修改管理员信息时密码可以为空&#xff08;不修改密码&#xff09;&#xff0c;此时表单中密码输入有冲突&#xff0c;需要对表单中密码字段进项条件性的判断&#xff0c;使字段在 create 操作时为必需填写&…

服务器部署spring项目jar包使用bat文件,省略每次输入java -jar了

echo off set pathC:\Program Files\Java\jre1.8.0_191\bin START "YiXiangZhengHe-8516" "%path%/java" -Xdebug -jar -Dspring.profiles.activeprod -Dserver.port8516 YiXiangZhengHe-0.0.1-SNAPSHOT.jar 将set path后面改成jre的bin文件夹 START 后…

在微信小程序中安装和使用vant框架

目录 1、初始化项目2、安装vant相关依赖3、修改 app.json4、修改 project.config.json5、构建npm6、使用示例 本文将详细介绍如何在微信小程序中安装并使用vant框架&#xff5e; 开发工具&#xff1a;微信开发者工具 1、初始化项目 从终端进入小程序项目目录&#xff0c;执行…

一个数据查询导出工具

数据查询导出工具 安装说明 安装完成后在桌面会创建“数据查询导出工具”的查询工具。 程序初始化 配置数据库连接 首次运行&#xff0c;请先配置数据库连接 点击“数据库连接”后&#xff0c;会出现下面的窗体&#xff0c;要求输入维护工程师密码。&#xff08;维护工程师密码…

VScode如何调试

调试 1.打断点 1.点击调试按钮 3.点击下拉选择环境node&#xff0c;点击绿三角选择输入调试的命令&#xff08;具体命令查看package.json中scripts中的哪一个命令和运行的文件&#xff09;&#xff0c;点击右边的设置&#xff08;可以直接跳下面第八步&#xff01;&#xff…

实体类status属性使用枚举类型的步骤

1. 问题引出 当实体类的状态属性为Integer类型时&#xff0c;容易写错 2. 初步修改 把状态属性强制为某个类型&#xff0c;并且自定义一些可供选择的常量。 public class LessonStatus {public static final LessonStatus NOT_LEARNED new LessonStatus(0,"未学习"…

SQL 数据库学习 Part 1

数据和信息 信息 信息是客观存在的&#xff0c;是关于现实世界事物的存在方式或运动状态 数据 数据是用来记录信息的可识别的符号&#xff0c;是信息的具体表现形式 数据和信息的联系 数据是信息的符号表示或载体信息则是数据的内涵&#xff0c;是对数据的语义解释 数据…

牛客热题:设计LRU缓存结构

&#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;力扣刷题日记 &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 文章目录 牛客热题&#xff1a;设计LRU缓存结构题目链接方法一…

如何实现办公终端安全

在网络安全日益严峻的当下&#xff0c;可信白名单作为一种高效的终端安全防护手段&#xff0c;正在逐渐受到业界的广泛关注和应用。本文将简要探讨可信白名单如何实现终端安全的原理、方法及其在实际应用中的优势与挑战。 首先&#xff0c;我们需要了解可信白名单的基本原理。可…