Spring Boot 集成 RabbitMQ

依赖与配置

在 pom.xml 中引入 RabbitMQ 相关依赖

<!-- AMQP 依赖, RabbitMq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.2.7</version>
</dependency>

在 application.yml 中添加配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

RabbitMQ使用

扇形交换机

扇形交换机,顾名思义,就是像一把扇子一样,一个交换器可以绑定多个队列,只要交换机接收到消息就会发送给所有和它绑定的队列。

假设扇形交换机 fanoutExchange 绑定了队列 fanoutQueue1 和 fanoutQueue2 ,那么我们往 fanoutExchange 发送一条消息,fanoutQueue1 和 fanoutQueue2 都会收到一条相同的消息,如果消息未被消费我们可以在 RabbitMQ 管理端看到这两个队列和队列内积压的一条相同的消息。

@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";

    //声明队列Q1
    @Bean("fanoutQ1")
    public Queue fanoutQ1() {
        return new Queue(FANOUT_QUEUE1);
    }

    //声明队列Q1
    @Bean("fanoutQ2")
    public Queue fanoutQ2() {
        return new Queue(FANOUT_QUEUE2);
    }

    //声明扇形交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    //队列Q1绑定扇形交换机
    @Bean("bindFanoutQ1")
    public Binding bindFanoutQ1() {
        return BindingBuilder.bind(fanoutQ1()).to(fanoutExchange());
    }

    //队列Q2绑定扇形交换机
    @Bean("bindFanoutQ2")
    public Binding bindFanoutQ2() {
        return BindingBuilder.bind(fanoutQ2()).to(fanoutExchange());
    }

}

编写 junit 测试, 发送消息

@Test
public void fanoutTest() {
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", "fantoutTest");
}

发送消息后由于没有消费者,可以在管理端看到积压在队列中的消息。

在这里插入图片描述

在这里插入图片描述

直连交换机

一个直连交换机可以有多个队列,但每个队列都有一个路由一一匹配,交换机根据路由将消息投递到对应队列中。当同一个队列有多个消费者时,消息不会被重复消费,直连交换机能够轮询公平的将消息分发给每个消费者。

假设直连交换机 directExchange 与队列 directQueue1 通过路由 directRoute1 绑定, 与directQueue2 通过路由 directRoute2 绑定。当生产者发送路由为 directRoute1 的消息给 directExchange 时,消息会被投递到 directQueue1 ,directQueue2 则接收不到消息。

@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";
    public static final String DIRECT_EXCHANGE = "direct.exchange";

    public static final String DIRECT_ROUTE_KEY1 = "direct.route.key1";

    //声明队列Q1
    @Bean("directQ1")
    public Queue directQ1() {
        return new Queue(DIRECT_QUEUE1);
    }

    //声明队列Q1
    @Bean("directQ2")
    public Queue directQ2() {
        return new Queue(DIRECT_QUEUE2);
    }

    //声明直连交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    //队列Q1绑定直连交换机
    @Bean("bindDirectQ1")
    public Binding bindDirectQ1() {
        return BindingBuilder.bind(directQ1()).to(directExchange()).with(DIRECT_ROUTE_KEY1);
    }

    //队列Q2绑定直连交换机
    @Bean("bindDirectQ2")
    public Binding bindDirectQ2() {
        return BindingBuilder.bind(directQ2()).to(directExchange()).with("");
    }

}

编写 junit 测试,投递消息给交换机。

@Test
public void directTest() {
    //消息被投递给 bindDirectQ2
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, "", "directTest");
    //消息被投递给 bindDirectQ1
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DIRECT_ROUTE_KEY1, "directTest-key1");
    //没有匹配的路由,bindDirectQ1 和 bindDirectQ2 都无法接收
    rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, "test", "directTest-test");
}

主题交换机

一个主题交换机可以有多个绑定队列,支持路由模糊匹配,可以使用星号(*)和井号(#)作为通配符进行匹配。其中,* 可以代替一个单词,# 可以代替任意个单词。

假设主题交换机 topicExchange 通过路由 topic.route.* 绑定队列 topicQueue1 , 通过路由 topic.route.# 绑定队列 topicQueue2。当生产者通过路由 topic.route.1 和 topic.route.1.1 投递消息给 topicExchange 时, topicQueue2 会接收到两条不同路由的消息, 而 topicQueue1 仅能接收到路由为 topic.route.1 的消息。

@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";

    public static final String TOPIC_ROUTE_KEY1 = "topic.route.*";
    public static final String TOPIC_ROUTE_KEY2 = "topic.route.#";

    //声明队列Q1
    @Bean("topicQ1")
    public Queue topicQ1() {
        return new Queue(TOPIC_QUEUE1);
    }

    //声明队列Q1
    @Bean("topicQ2")
    public Queue topicQ2() {
        return new Queue(TOPIC_QUEUE2);
    }

    //声明主题交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    //队列Q1绑定主题交换机
    @Bean("bindTopicQ1")
    public Binding bindTopicQ1() {
        return BindingBuilder.bind(topicQ1()).to(topicExchange()).with(TOPIC_ROUTE_KEY1);
    }

    //队列Q2绑定主题交换机
    @Bean("bindTopicQ2")
    public Binding bindTopicQ2() {
        return BindingBuilder.bind(topicQ2()).to(topicExchange()).with(TOPIC_ROUTE_KEY2);
    }

}

编写 junit 测试,投递两条不同路由的消息给主题交换机

@Test
public void topicTest(){
    //消息被投递给 topicQ1 和 topicQ2
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.route.1", "topicTest-*");
    //消息仅投递给 topicQ2
    rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.route.1.1", "topicTest-#");
}

可以从管理端看到 topicQueue2 由于绑定的是通配符#,故此两条消息都有被投递到队列中,topicQueue1 由于绑定的是通配符* 只匹配到一条消息,故此只有一条消息被投递到队列中。

在这里插入图片描述

首部交换机

首部交换机通过设置消息的头部信息来进行绑定队列的分发,它不依赖于路由键的匹配规则来分发消息,而是根据发送的消息内容中的headers属性进行匹配。当消息投递到首部交换器时,RabbitMQ会获取到该消息的headers(一个键值对的形式),并且对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则消息会路由到该队列,否则不会路由到该队列。

@Configuration
public class HeadExchangeConfig {

    public static final String HEADER_QUEUE1 = "header.queue1";
    public static final String HEADER_QUEUE2 = "header.queue2";
    public static final String HEADER_QUEUE3 = "header.queue3";
    public static final String HEADER_EXCHANGE = "header.exchange";

    public static final String HEADER_KEY1 = "headerKey1";
    public static final String HEADER_KEY2 = "headerKey2";

    //声明queue
    @Bean("headerQueue1")
    public Queue headerQueue1() {
        return new Queue(HEADER_QUEUE1);
    }

    @Bean("headerQueue2")
    public Queue headerQueue2() {
        return new Queue(HEADER_QUEUE2);
    }

    @Bean("headerQueue3")
    public Queue headerQueue3() {
        return new Queue(HEADER_QUEUE3);
    }

    //声明首部交换机
    @Bean
    public HeadersExchange headerExchange() {
        return new HeadersExchange(HEADER_EXCHANGE);
    }

    //声明Binding,绑定Header(消息头部参数)中 HEADER_KEY1 = a的队列。header的队列匹配可以用mathces和exisits
    @Bean
    public Binding bindHeaderQueue1() {
        return BindingBuilder.bind(headerQueue1()).to(headerExchange()).where(HEADER_KEY1).matches("a");
    }

    //绑定Header中 HEADER_KEY2 =1的队列。
    @Bean
    public Binding bindHeaderBusTyp1() {
        return BindingBuilder.bind(headerQueue2()).to(headerExchange()).where(HEADER_KEY2).matches("b");
    }

    //绑定Header中 HEADER_KEY1 = a 或者 HEADER_KEY2 = b 的队列。
    @Bean
    public Binding bindHeaderTxBusTyp1() {
        Map<String, Object> condMap = new HashMap<>();
        condMap.put(HEADER_KEY1, "a");
        condMap.put(HEADER_KEY2, "b");
        return BindingBuilder.bind(headerQueue3()).to(headerExchange()).whereAny(condMap).match();
    }

}

编写 junit 测试, 往首部交换机投递信息

@Test
public void headerTest(){
    MessageProperties properties = new MessageProperties();
    properties.setHeader(HeadExchangeConfig.HEADER_KEY1, "a");
    
    //消息被投递到 headerQueue1 和 headerQueue3
    rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE, "", new Message("headerTest-a".getBytes(), properties));
    properties.setHeader(HeadExchangeConfig.HEADER_KEY1, "");
    properties.setHeader(HeadExchangeConfig.HEADER_KEY2, "b");
    
    //消息被投递到 headerQueue2 和 headerQueue3
    rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE, "", new Message("headerTest-b".getBytes(), properties));

}

备份交换机

通过设置交换机的alternate-exchange的参数设置备份交换机,当消息路由无法在当前交换机匹配到合适的队列投递时,将消息转到备份交换机,分发到其绑定的备份队列中。

@Configuration
public class BackupExchangeConfig {

    public static final String BACKUP_QUEUE = "backup.queue";
    public static final String BACKUP_EXCHANGE = "backup.exchange";
    public static final String BACKUP_ROUTE_KEY = "backup.key";

    public static final String NON_BACKUP_QUEUE = "nonbackup.queue";
    public static final String NON_BACKUP_EXCHANGE = "nonbackup.exchange";
    public static final String NON_BACKUP_ROUTE_KEY = "nonbackup.key";

    @Bean("backupQueue")
    public Queue backupQueue(){
        return new Queue(BACKUP_QUEUE, true, false, false);
    }

    @Bean("nonBackupQueue")
    public Queue nonBackupQueue(){
        return new Queue(NON_BACKUP_QUEUE, true, false, false);
    }

    @Bean("nonBackupExchange")
    public DirectExchange nonBackupExchange(){
        Map<String, Object> args = new HashMap<>(2);
        args.put("alternate-exchange", BACKUP_EXCHANGE);
        return new DirectExchange(NON_BACKUP_EXCHANGE,true,false, args);
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE,true,false);
    }

    @Bean("bindNonBackupQueue")
    public Binding bindNonBackupQueue(){
        return BindingBuilder.bind(nonBackupQueue()).to(nonBackupExchange()).with(NON_BACKUP_ROUTE_KEY);
    }

    @Bean("bindBackupQueue")
    public Binding bindBackupQueue(){
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }

}

编写测试用例, 投递消息给备份交换机

@Test
public void backupTest() {
    //路由正确匹配,消息投递到非备份队列中
    rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE, BackupExchangeConfig.NON_BACKUP_ROUTE_KEY, "nonBackupTest");
    //路由无法匹配,消息投递到备份队列中
    rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE, BackupExchangeConfig.NON_BACKUP_ROUTE_KEY + "123", "backupTest");
}

可以看到备份队列和非备份队列中各有一条消息。

在这里插入图片描述

死信交换机

死信交换机其实可以理解成一个拥有特殊意义的直连交换机,正常队列通过设置队列中的x-dead-letter-exchangex-dead-letter-routing-key 参数来设置绑定死信交换机,当消费者拒绝消费、消息积压队列达到最大长度或者消息过期时,消息从正常队列转到死信队列。

死信在转移到死信队列时,它的路由也会保存下来。但是如果配置了x-dead-letter-routing-key参数的话,路由就会被替换为配置的这个值。另外,死信在转移到死信队列的过程中,是没有经过消息发送者确认的,所以并不能保证消息的安全性。

消息被作为死信转移到死信队列后,会在Header当中增加一些消息。比如时间、原因(rejected,expired,maxlen)、队列等。另外header中还会加上第一次成为死信的三个属性(x-first-death-reason x-first-death-queuex-first-death-exchange),并且这三个属性在以后的传递过程中都不会更改。

死信队列也可以向其它队列一样被消费者正常订阅消费。

@Configuration
public class DeadLetterExchangeConfig {

    public static final String DEAD_QUEUE = "dead.queue";
    public static final String DEAD_EXCHANGE = "dead.exchange";
    public static final String DEAD_ROUTE_KEY = "dead.key";

    public static final String NON_DEAD_QUEUE = "nondead.queue";
    public static final String NON_DEAD_EXCHANGE = "nondead.exchange";
    public static final String NON_DEAD_ROUTE_KEY = "nondead.key";

    @Bean("deadQueue")
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE, true, false, false);
    }

    @Bean("nonDeadQueue")
    public Queue nonDeadQueue(){
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-message-ttl",10000);
        args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);
        return new Queue(NON_DEAD_QUEUE, true, false, false, args);
    }

    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE,false,false);
    }

    @Bean("nonDeadExchange")
    public DirectExchange nonDeadExchange(){
        return new DirectExchange(NON_DEAD_EXCHANGE,true,false);
    }

    @Bean("bindDeadQueue")
    public Binding bindDeadQueue(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTE_KEY);
    }

    @Bean("bindNonDeadQueue")
    public Binding bindNonDeadQueue(){
        return BindingBuilder.bind(nonDeadQueue()).to(nonDeadExchange()).with(NON_DEAD_ROUTE_KEY);
    }

}

编写 junit 测试, 投递一条10秒过期的消息,刚投递时消息存在于正常队列,10秒过期后转到死信队列。 投递消息和队列同时设置过期时间时,以时间更短的为准。

@Test
public void deadTest() {
    rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE, DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY, "deadTest 1");
}

延时队列

通过TTL+死信队列实现延时队列,与上述死信交换机使用大同小异,核心就是创建队列的时候设置如下三个参数:

  • x-message-ttl (必要) :当前队列消息多久未消费进入死信队列
  • x-dead-letter-exchange (必要):消息过期后进入的死信队列交换机
  • x-dead-letter-routing-key (非必要):消息的路由, 未设置时保留原队列的路由

TTL 消息可以通过以下方式创建

方式一:在队列中设置 x-message-ttl 参数

@Bean("nonDeadQueue")
public Queue nonDeadQueue(){
    Map<String, Object> args = new HashMap<>(2);
    args.put("x-message-ttl",10000);
    args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
    args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);
    return new Queue(NON_DEAD_QUEUE, true, false, false, args);
}

方式二: 在投递消息时设置消息的过期时间

MessageProperties properties = new MessageProperties();
properties.setExpiration("10000");
rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE, DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY, new Message("ttl test".getBytes(), properties));

两种方式同时都有设置时,时间短的设置生效。

动态创建队列、交换机及绑定关系

Spring Boot 封装了一些类用于对 RabbitMQ 的管理

  • AmqpAdmin
    用于管理队列、交换机及绑定关系 。

  • RabbitTemplate
    对消息操作的一些封装。

@Autowired
private AmqpAdmin amqpAdmin;

public void createComponents(){

	String queueName = "amqp.queue";
	String exchangeName = "amqp.exchange";
	
	//声明(创建)队列
	Queue queue = new Queue(queueName, false, false, false, null);
	amqpAdmin.declareQueue(queue);
	
	//声明交换机
	FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, false, false, null);
	amqpAdmin.declareExchange(fanoutExchange);
	
	//声明绑定
	amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "", null));
}

消息监听

启动类添加@EnableRabbit注解启用RabbitMQ ,通过注解 @RabbitListener@RabbitHandler 进行消息的消费

@Component
@RabbitListener(queues = "fanout.queue1")
public class FanoutQueueRecever {

    @RabbitHandler
    public void handle(String msg){
        System.out.println("收到来自 fanout.queue1 的消息 :" + msg);
    }
}

消息确认

保证发送方消息不丢失

开启生产端确认, 消息发送成功后回调,获得预期结果后才认为消息发送成功。

  • 交换机收到消息进行回调,ConfirmCallback

    spring.rabbitmq.publisher-confirm-type: correlated (高版本Spring使用)
    spring.rabbitmq.publisher-confirms: true(低版本Spring使用)

  • 消息正确抵达队列进行回调,ReturnsCallback

    spring.rabbitmq.publisher-returns: true
    spring.rabbitmq.template.mandatory: true, 只要抵达队列,以异步形式优先发送回调 ReturnCallback

保证消费者消息不丢失

开启消费端确认(保证每个消息都被正确消费,此时才可以删除这个消息)

  • 手动ack消息
    spring.rabbitmq.listener.simple.acknowledge-mode: manual

    • AcknowledgeMode.NONE:自动确认
      自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
      如果消息已经被处理,但后续出现异常导致事务回滚,也同样造成了实际意义的消息丢失。

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认
      如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。
      如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限,不足以处理后续投递的消息。

  • ACK的几种方法

    • channel.basicNack(deliveryTag, multiple, requeue); 拒绝消费。
      deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

      multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

      requeue: 是否重新入队

    • channel.basicAck(deliveryTag, multiple); 确认消费,参数解释同上。

    • channel.basicReject(deliveryTag, requeue); 拒绝消费,不支持批量操作,用法与basicNack()类似。

代码实现

yaml 文件配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 开启发送端确认
    #publisher-confirms: true
    publisher-confirm-type: correlated
    #开启发送端消息抵达确认
    publisher-returns: true
    #只要抵达队列。以异步发送优先回调returnconfirm
    template:
      mandatory: true
    # 手动ack消息
    listener:
      simple:
        acknowledge-mode: manual

配置RabbitMQ, 设置发送者消息确认逻辑

@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 发送消息触发confirmCallback回调, 无论是否到达队列,只要有到达交换机都会触发这个回调
     * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
     * @param ack:消息是否成功收到(ack=true,消息抵达Broker)
     * @param cause:失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("发送消息触发confirmCallback回调");
        System.out.println(String.format("correlationData:%s\nack:%s\ncause:%s", correlationData, ack, cause));
    }

    /**
     * 消息未到达队列触发returnCallback回调,只要消息没有投递给指定的队列,就触发这个失败回调
     * @param returnedMessage 返回的消息,包含
     *                        message:投递失败的消息详细信息
     *                        replyCode:回复的状态码
     *                        replyText:回复的文本内容
     *                        exchange:接收消息的交换机
     *                        routingKey:接收消息的路由键
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("消息未到达队列触发returnCallback回调");
        System.out.println(returnedMessage.toString());
    }
}

实现消费者确认

@Component
@RabbitListener(queues = "fanout.queue1")
public class FanoutQueueRecever {

    @RabbitHandler
    public void handle(String msg, Channel channel, @Headers Map<String, Object> headers){
        System.out.println("收到来自 fanout.queue1 的消息 :" + msg);
        System.out.println("tag:" + headers.get(AmqpHeaders.DELIVERY_TAG));
        try {
            if("ack".equalsIgnoreCase(msg)){
                channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
            }
            if("nack".equalsIgnoreCase(msg)){
                channel.basicNack((long)headers.get(AmqpHeaders.DELIVERY_TAG), false, true);
            }
            if("reject".equalsIgnoreCase(msg)){
                channel.basicReject((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Payload 注解的对象需要实现序列化
     * @Headers 获取所有头部信息
     * @Header  获取单个头部信息
     */
    @RabbitHandler(isDefault = true)
    public void handleMap(@Payload MyMessage message, Channel channel, @Headers Map<String, Object> headers){
        System.out.println("收到来自 fanout.queue1 的消息 :" + message.toString());
        System.out.println("tag:" + headers.get(AmqpHeaders.DELIVERY_TAG));
        try {
            channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

编写 junit 测试, 发送测试消息

@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public class MyMessage implements Serializable {

    private String id;

    private String name;

}
@Test
public void confirmTest(){
    //未知交换机, 触发 confirmCallback 回调
    //rabbitTemplate.convertAndSend("unknow", "unknow", "confirmTest");
    //未知路由, 消息到达交换机但是无法到达队列, 触发 confirmCallback 回调和 returnCallback 回调
    //rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic", "topicTest-*");

    //正常到达队列,触发confirmCallback回调
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", "fantoutTest", new CorrelationData("1"));
    rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, "", new MyMessage("1", "张三"), new CorrelationData("2"));

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/800227.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

详解注意力机制上篇【RNN,Seq2Seq(Encoder-Decoder,编码器-解码器)等基础知识】

NLP-大语言模型学习系列目录 一、注意力机制基础——RNN,Seq2Seq等基础知识 二、注意力机制【Self-Attention,自注意力模型】 &#x1f525; 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;理解和生成自然语言的能力对于构建智能系统至关重要。从文本分类、机器翻…

昇思25天学习打卡营第22天|基于MindSpore的红酒分类实验

基于MindSpore的红酒分类实验 K近邻算法实现红酒聚类 1、实验目的 了解KNN的基本概念&#xff1b;了解如何使用MindSpore进行KNN实验。 2、K近邻算法原理介绍 K近邻算法&#xff08;K-Nearest-Neighbor, KNN&#xff09;是一种用于分类和回归的非参数统计方法&#xff0c;…

WSL-Ubuntu20.04环境使用YOLOv8 TensorRT推理加速

在阅读本章内容之前&#xff0c;需要把部署环境以及训练环境都安装好。 1.TensorRTX下载 这里使用Wang-xinyu大佬维护的TensorRTX库来对YOLOv8进行推理加速的演示&#xff0c;顺便也验证一下前面环境配置的成果。 github地址&#xff1a;GitHub - wang-xinyu/tensorrtx&#x…

SourceTree rebase(变基)的使用

参考资料 【Sourcetree】コミットを一つにまとめる【Sourcetree】リベースする 目录 前提0.1 merge与rebase0.2 merge合并分支0.3 rebase合并分支0.4 &#x1f4a5;超级注意事项&#x1f4a5; 一. 代码已提交&#xff0c;未推送&#xff0c;交互式变基1.1 通过SourceTree操作1…

Richtek立锜科技可用于智能门铃的电源管理解决方案

新型的智能门铃不仅能满足呼叫、提醒的需要&#xff0c;还能在线监控、远程操作、闯入通知、记录过程&#xff0c;系统构成相对复杂&#xff0c;与传统门铃相比有了很大的改变。 从电源管理的角度来观察&#xff0c;满足这样需求的系统构成也相对复杂&#xff1a; 处于外置状态…

ElementUIV12相关使用方法

今日内容 零、 复习昨日 零、 复习昨日 一、Element UI Element&#xff0c;一套为开发者、设计师和产品经理准备的基于 Vue 2.0 的桌面端组件库 官网&#xff1a; https://element.eleme.cn/#/zh-CN Element Plus,基于 Vue 3&#xff0c;面向设计师和开发者的组件库 官网: htt…

多样化数据可视化方法的全面示例:基于Python的多样化数据可视化

文章目录 前言代码效果展示 前言 本文演示了使用Python进行温度数据的多样化可视化方法。通过导入、处理和分析气象数据&#xff0c;我们生成了多种图表&#xff0c;包括直方图、核密度估计图、箱型图、小提琴图、条形图、山脊图、经验累积分布函数图和折线图。这些图表帮助我…

从产品手册用户心理学分析到程序可用性与易用性的重要区别

注&#xff1a;机翻&#xff0c;未校对。 Designing for People Who Have Better Things To Do With Their Lives 为那些生活中有更重要事情要做的人设计 When you design user interfaces, it’s a good idea to keep two principles in mind: 在设计用户界面时&#xff0c;…

微软Office PLUS办公插件下载安装指南

微软OfficePLUS插件下载安装指南 简介&#xff1a; OfficePLUS微软官方出品的Office插件 &#xff0c;OfficePLUS拥有30万高质量模板素材&#xff0c;能帮助Word、Excel、Powerpoint、PDF等多种办公软件提升效率&#xff0c;具有智能化、模板质量高、运行快、稳定性强等优点。…

探索Facebook:数字社交的魔力源泉

在当今信息爆炸和全球互联的时代&#xff0c;社交媒体平台成为了人们生活中不可或缺的一部分。而在这些平台中&#xff0c;Facebook无疑是最具影响力和创新性的代表之一。自2004年成立以来&#xff0c;Facebook不仅改变了人们的沟通方式&#xff0c;更通过不断的技术创新和用户…

Pycharm与Gitlab交互

环境准备 1、下载配置好本地Git 2、配置Pycharm上的Git 3、gitlab账号 Gitlab配置 Gitlab配置中文 账号》设置》偏好设置》简体中文 创建项目 命令行操作 打开项目会展示以下步骤 在pycharm克隆gitlab的项目 通过菜单栏 1、在PyCharm的顶部菜单栏中&#xff0c;选择“V…

探索智能合约在金融科技中的前沿应用与挑战

随着区块链技术的发展和普及&#xff0c;智能合约作为其核心应用之一&#xff0c;在金融科技&#xff08;FinTech&#xff09;领域中展现出了巨大的潜力和挑战。本文将深入探讨智能合约的基本概念、前沿应用案例&#xff0c;以及面临的技术挑战和发展趋势&#xff0c;旨在帮助读…

R语言进行集成学习算法:随机森林

# 10.4 集成学习及随机森林 # 导入car数据集 car <- read.table("data/car.data",sep ",") # 对变量重命名 colnames(car) <- c("buy","main","doors","capacity","lug_boot","safety"…

昇思25天学习打卡营第11天|RNN实现情感分类

概述 情感分类是自然语言处理中的经典任务&#xff0c;是典型的分类问题。本节使用MindSpore实现一个基于RNN网络的情感分类模型&#xff0c;实现如下的效果&#xff1a; 输入: This film is terrible 正确标签: Negative 预测标签: Negative输入: This film is great 正确标…

SpringBoot之健康监控(Actuator)

1&#xff0c;基本介绍 Spring Actuator 是 Spring Boot 提供的一个扩展模块&#xff0c;用于监控和管理应用程序的生产环境。它通过 HTTP 端点暴露了大量的监控和管理功能&#xff0c;使得开发者可以在运行时查看应用程序的运行状况、配置信息、性能指标等。 主要功能&#…

【Qt】探索Qt框架:开发经典贪吃蛇游戏的全过程与实践

文章目录 引言项目链接&#xff1a;1. Qt框架的使用简介2. 贪吃蛇游戏设计2.1 游戏规则和玩法介绍2.2 游戏界面设计概述 3. 核心代码解析3.1 主界面&#xff08;GameHall&#xff09;3.1.1 布局和功能介绍3.1.2 代码实现分析 3.2 游戏选择界面&#xff08;GameSelect&#xff0…

WPF+MvvmLight 项目入门完整教程(一)

WPF+MvvmLight入门完整教程一 创建项目MvvmLight框架安装完善整个项目的目录结构创建自定义的字体资源下载更新和使用字体资源创建项目 打开VS2022,点击创建新项目,选择**WPF应用(.NET Framework)** 创建一个名称为 CommonProject_DeskTop 的项目,如下图所示:MvvmLight框架…

redis原理之底层数据结构(二)-压缩列表

1.绪论 压缩列表是redis最底层的结构之一&#xff0c;比如redis中的hash&#xff0c;list在某些场景下使用的都是压缩列表。接下来就让我们看看压缩列表结构究竟是怎样的。 2.ziplist 2.1 ziplist的组成 在低版本中压缩列表是由ziplist实现的&#xff0c;我们来看看他的结构…

uniapp 微信小程序根据后端返回的文件链接打开并保存到手机文件夹中【支持doc、docx、txt、xlsx等类型的文件】

项目场景&#xff1a; 我们在使用uniapp官方提供的uni.downloadFile以及uni.saveFile时&#xff0c;会发现这个文件下载的默认保存位置和我们预想的不太一样&#xff0c;容易找不到&#xff0c;而且没有提示&#xff0c;那么我们就需要把文件打开自己保存并且有提示保存到哪个…

fastadmin导入vue

前台 require-frontend.js或frontend-init.js 后台 require-backend.js或backend-init.js 后台 方法一 require-backend.js 在 paths 中加入’vue’:‘…/libs/vue/vue.min’, 在shim 中加入 paths: {............vue:../libs/vue/vue.min, } shim: {............vue: {ex…