【SpringBoot整合系列】SpringBoot整合RabbitMQ-基本使用

目录

  • SpringtBoot整合RabbitMQ
    • 1.依赖
    • 2.配置
    • RabbitMQ的7种模式
      • 1.简单模式(Hello World)
        • 应用场景
        • 代码示例
      • 2.工作队列模式(Work queues)
        • 应用场景
        • 代码示例
        • 手动 ack
          • 代码示例
      • 3.订阅模式(Publish/Subscribe)
        • 应用场景
        • 代码示例
      • 4.路由模式(Routing)
        • 应用场景
        • 代码示例
      • 5.主题模式(Topics)
        • 应用场景
        • 代码示例
      • 6.远程过程调用(RPC)
        • 应用场景
        • 代码示例
          • 消息生产者开发
          • 消息发送者开发
      • 7.发布者确认(Publisher Confirms)
        • 应用场景
    • RabbitMQ的四种交换机
      • 1.直连交换机(Direct exchange)
        • 代码示例
      • 2.扇形交换机(Fanout exchange)
        • 代码示例
      • 3.主题交换机(Topic exchange)
        • 代码示例
      • 4.首部交换机(Headers exchange)
        • 代码示例

SpringtBoot整合RabbitMQ

1.依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

2.配置

server:
  port: 9090
spring:
  rabbitmq:
    host: 192.168.29.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

RabbitMQ的7种模式

1.简单模式(Hello World)

在这里插入图片描述

  • 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B
应用场景
  • 将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人
代码示例
  1. 配置类
    @Configuration
    public class HelloWorldConfig {
        public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue";
    
        @Bean
        public Queue queue1() {
            return new Queue(HELLO_WORLD_QUEUE_NAME);
        }
    }
    
  2. 监听并消费消息
    @Component
    public class HelloWorldConsumer {
        @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
        public void receive(String msg) {
            System.out.println("msg = " + msg);
        }
    }
    
  3. 生产消息并发送
    @SpringBootTest
    class MyMqBootApplicationTests {
        @Resource
        RabbitTemplate rabbitTemplate;
        @Test
        void helloworld() {
            rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello world!!!");
        }
    }
    

2.工作队列模式(Work queues)

在这里插入图片描述

  • 在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理
  • 一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
应用场景
  • 一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况
代码示例
  1. 监听并消费消息

    @Component
    public class HelloWorldConsumer {
        @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
        public void receive(String msg) {
            System.out.println("receive = " + msg);
        }
    	// concurrency 为 10,此时,receive2将会同时存在 10 个子线程去消费消息
        @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
        public void receive2(String msg) {
            System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
        }
    }
    
  2. 生产消息并发送

        @Test
        void work() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello");
            }
        }
    
  3. 运行结果:运行结果每次不一定一样

    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-9
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-10
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-3
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-7
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-5
    receive = hello
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-4
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-6
    receive2 = hello------->org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-8
    
手动 ack

手动 ack可以自行决定是否消费 RabbitMQ 发来的消息

代码示例
  1. 配置文件:配置手动ack
server:
  port: 9090
spring:
  rabbitmq:
    host: 192.168.29.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 配置手动ack
  1. 消费代码:receive2 拒绝了所有消息,第一个消费者消费了所有消息
    @Component
    public class HelloWorldConsumer {
        @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)
        public void receive(Message message, Channel channel) throws IOException {
            System.out.println("receive="+message.getPayload());
            //手动确认
            channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
        }
    
        @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10")
        public void receive2(Message message, Channel channel) throws IOException {
            System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName());
            //手动拒绝
            channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
        }
    }
    
  2. 测试结果(生产代码不变)
  • 此时receive2 拒绝了所有消息,receive消费了所有消息(如果receive2没有拒绝,receive断然不会消费10次)

3.订阅模式(Publish/Subscribe)

在这里插入图片描述

  • 一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
  • 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
  • 需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力
应用场景
  • 更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:
    • 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列
    • 一个缓存消息队列对应着多个缓存消费者
    • 一个数据库消息队列对应着多个数据库消费者
代码示例

具体内容看本文单独的目录 RabbitMQ的四种交换机 , 我这里单独拿出来解释了

4.路由模式(Routing)

在这里插入图片描述

  • 有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息
  • 一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可
应用场景
  • 如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息
代码示例

参考本文单独的目录 RabbitMQ的四种交换机-1

5.主题模式(Topics)

在这里插入图片描述

  • 根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
  • 一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写
应用场景

同上,iphone促销活动可以接收主题为iphone的消息,如iphone12、iphone13,iphone…等

代码示例

参考本文单独的目录 RabbitMQ的四种交换机-3

6.远程过程调用(RPC)

在这里插入图片描述
如果我们需要在远程计算机上运行功能并等待结果就可以使用RPC,具体流程可以看图。

  • 首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
  • Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
  • Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。

这种情况其实非常适合处理异步调用。

应用场景
  • 需要等待接口返回数据,如订单支付
代码示例
消息生产者开发
  1. 依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
      		<dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
       		</dependency>
    
  2. 配置

    server:
      port: 7777
    spring:
      rabbitmq:
        host: 192.168.29.200
        port: 5672
        username: admin
        password: admin
        virtual-host: /
        publisher-confirm-type: correlated # 配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。
        publisher-returns: true #开启发送失败退回。
    
  3. 配置类

  4. /**
     * @author: zjl
     * @datetime: 2024/5/9
     * @desc: 
     * 这个配置类中我们分别配置了消息发送队列 msgQueue 和消息返回队列 replyQueue,
     * 然后将这两个队列和消息交换机进行绑定。常规操作。
     * 在 Spring Boot 中我们负责消息发送的工具是 RabbitTemplate,
     * 默认情况下,系统自动提供了该工具,但是这里我们需要对该工具重新进行定制,
     * 主要是添加消息发送的返回队列,最后我们还需要给返回队列设置一个监听器
     */
    @Configuration
    public class RabbitConfig {
    
        public static final String RPC_QUEUE1 = "queue_1";
        public static final String RPC_QUEUE2 = "queue_2";
        public static final String RPC_EXCHANGE = "rpc_exchange";
    
        /**
         * 设置消息发送RPC队列
         */
        @Bean
        public Queue msgQueue() {
            return new Queue(RPC_QUEUE1);
        }
    
        /**
         * 设置返回队列
         */
        @Bean
        public Queue replyQueue() {
            return new Queue(RPC_QUEUE2);
        }
    
        /**
         * 设置交换机
         */
        @Bean
        public TopicExchange exchange() {
            return new TopicExchange(RPC_EXCHANGE);
        }
    
        /**
         * 请求队列和交换器绑定
         */
        @Bean
        public Binding msgBinding() {
            return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
        }
    
        /**
         * 返回队列和交换器绑定
         */
        @Bean
        public Binding replyBinding() {
            return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
        }
        
        /**
         * 使用 RabbitTemplate发送和接收消息
         * 并设置回调队列地址
         */
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setReplyAddress(RPC_QUEUE2);
            template.setReplyTimeout(6000);
            return template;
        }
        
        /**
         * 给返回队列设置监听器
         */
        @Bean
        public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(RPC_QUEUE2);
            container.setMessageListener(rabbitTemplate(connectionFactory));
            return container;
        }
    }
    
  5. 消息发送

    /**
     * @author: zjl
     * @datetime: 2024/5/9
     * @desc:
     *     消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。
     *     服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,
     *     这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id
     *     以及返回消息头中的 spring_returned_message_correlation 字段值,
     *     我们就可以将返回的消息内容和发送的消息绑定到一起,
     *     确认出这个返回的内容就是针对这个发送的消息的。
     */
    @RestController
    @Slf4j
    public class RpcClientController {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public String send(String message) {
            // 创建消息对象
            Message newMessage = MessageBuilder.withBody(message.getBytes()).build();
    
            log.info("client send:{}", newMessage);
    
            //客户端发送消息
            Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);
    
            String response = "";
            if (result != null) {
                // 获取已发送的消息的 correlationId
                String correlationId = newMessage.getMessageProperties().getCorrelationId();
                log.info("correlationId:{}", correlationId);
    
                // 获取响应头信息
                HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();
    
                // 获取 server 返回的消息 id
                String msgId = (String) headers.get("spring_returned_message_correlation");
    
                if (msgId.equals(correlationId)) {
                    response = new String(result.getBody());
                    log.info("client receive:{}", response);
                }
            }
            return response;
        }
    }
    

这就是整个消息生产者的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来

消息发送者开发
  1. 依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
       		 </dependency>
    
  2. 配置

    server:
      port: 8888
    spring:
      rabbitmq:
        host: 192.168.29.200
        port: 5672
        username: admin
        password: admin
        virtual-host: /
        publisher-confirm-type: correlated # 配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。
        publisher-returns: true #开启发送失败退回。
    
  3. 配置类

    @Configuration
    public class RabbitConfig {
    
        public static final String RPC_QUEUE1 = "queue_1";
        public static final String RPC_QUEUE2 = "queue_2";
        public static final String RPC_EXCHANGE = "rpc_exchange";
    
        /**
         * 配置消息发送队列
         */
        @Bean
        Queue msgQueue() {
            return new Queue(RPC_QUEUE1);
        }
    
        /**
         * 设置返回队列
         */
        @Bean
        Queue replyQueue() {
            return new Queue(RPC_QUEUE2);
        }
    
        /**
         * 设置交换机
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(RPC_EXCHANGE);
        }
    
        /**
         * 请求队列和交换器绑定
         */
        @Bean
        Binding msgBinding() {
            return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
        }
    
        /**
         * 返回队列和交换器绑定
         */
        @Bean
        Binding replyBinding() {
            return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
        }
    }
    
  4. 消息消费

    @RestController
    @Slf4j
    public class RpcConsumerReceiver {
        /** 服务端首先收到消息并打印出来。
        * 服务端提取出原消息中的 correlation_id。
        * 服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。
        */
        @Resource
        private RabbitTemplate rabbitTemplate;
        @RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
        public void process(Message msg) {
            log.info("server receive : {}",msg.toString());
            Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
            CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
            rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
        }
    }
    

7.发布者确认(Publisher Confirms)

  • 与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。
  • 在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理
应用场景
  • 对于消息可靠性要求较高,比如钱包扣款

RabbitMQ的四种交换机

1.直连交换机(Direct exchange)

  • 具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
  • DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上
  • 例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
代码示例
  1. 配置类
    @Configuration
    public class RabbitDirectConfig {
        //首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
        //创建一个Binding对象将Exchange和Queue绑定在一起。
        //DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。
        public final static String DIRECTNAME = "mq-direct";
        @Bean
        public Queue queue() {
            return new Queue("hello-queue");
        }
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(DIRECTNAME, true, false);
        }
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue())
                    .to(directExchange()).with("direct");
        }
    }
    
  2. 消费者
    通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息
    @Component
    public class DirectReceiver {
        @RabbitListener(queues = "hello-queue")
        public void handler1(String msg) {
            System.out.println("DirectReceiver:" + msg);
        }
    }
    
  3. 生产者发送消息
    @RestController
    public class SendController {
        @Resource
        private RabbitTemplate rabbitTemplate;
        @RequestMapping("/send")
        public String send(){
            rabbitTemplate.convertAndSend("hello-queue", "hello direct!");
            return "success";
        }
    }
    

2.扇形交换机(Fanout exchange)

  • 广播消息到所有队列,没有任何处理,速度最快
  • FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
代码示例
  1. 配置类

    @Configuration
    public class RabbitFanoutConfig {
        //在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,
        // 然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上
        public final static String FANOUTNAME = "mq-fanout";
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUTNAME, true, false);
        }
        @Bean
        public Queue queueOne() {
            return new Queue("queue-one");
        }
        @Bean
        public Queue queueTwo() {
            return new Queue("queue-two");
        }
        @Bean
        public Binding bindingOne() {
            return BindingBuilder.bind(queueOne()).to(fanoutExchange());
        }
        @Bean
        public Binding bindingTwo() {
            return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
        }
    }
    
  2. 消费者

    @Component
    public class FanoutReceiver {
        @RabbitListener(queues = "queue-one")
        public void receiver1(String message) {
            System.out.println("FanoutReceiver:receiver1:" + message);
        }
        @RabbitListener(queues = "queue-two")
        public void receiver2(String message) {
            System.out.println("FanoutReceiver:receiver2:" + message);
        }
    }
    
  3. 生产者发送消息

    @RestController
    public class SendController {
        @Resource
        private RabbitTemplate rabbitTemplate;
        @RequestMapping("/send")
        public String send(){
            //注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null
            rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null, "hello fanout!");
            return "success";
        }
    }
    

3.主题交换机(Topic exchange)

  • 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词
  • TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,
  • 当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上
代码示例
  1. 配置类

    @Configuration
    public class RabbitTopicConfig {
        /**
         * 首先创建 TopicExchange,参数和前面的一致。
         * 然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,
         * 第二个 Queue 用来存储和 “huawei” 有关的消息,
         * 第三个 Queue 用来存储和 “phone” 有关的消息。
         * 
         * 将三个 Queue 分别绑定到 TopicExchange 上,
         * 第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,
         * 第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,
         * 第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
         */
        public final static String TOPICNAME = "mq-topic";
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPICNAME, true, false);
        }
        @Bean
        public Queue xiaomi() {
            return new Queue("xiaomi");
        }
        @Bean
        public Queue huawei() {
            return new Queue("huawei");
        }
        @Bean
        public Queue phone() {
            return new Queue("phone");
        }
        @Bean
        public Binding xiaomiBinding() {
            return BindingBuilder.bind(xiaomi()).to(topicExchange())
                    .with("xiaomi.#");
        }
        @Bean
        public Binding huaweiBinding() {
            return BindingBuilder.bind(huawei()).to(topicExchange())
                    .with("huawei.#");
        }
        @Bean
        public Binding phoneBinding() {
            return BindingBuilder.bind(phone()).to(topicExchange())
                    .with("#.phone.#");
        }
    }
    
  2. 消费者

    @Component
    public class TopicReceiver {
        @RabbitListener(queues = "phone")
        public void receiver1(String message) {
            System.out.println("PhoneReceiver:" + message);
        }
        @RabbitListener(queues = "xiaomi")
        public void receiver2(String message) {
            System.out.println("XiaoMiReceiver:"+message);
        }
        @RabbitListener(queues = "huawei")
        public void receiver3(String message) {
            System.out.println("HuaWeiReceiver:"+message);
        }
    }
    
  3. 生产者发送消息

@RestController
public class SendController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
    public String send(){
    	//根据 RabbitTopicConfig 中的配置,
    	//第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,
    	//第二条消息将被路由到名为 “huawei” 的 Queue 上,
    	//第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,
    	//第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,
    	//最后一条消息则将被路由到名为 “phone” 的 Queue 上
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
                "xiaomi.news","小米新闻..");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
                "huawei.news","华为新闻..");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
                "xiaomi.phone","小米手机..");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
                "huawei.phone","华为手机..");
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,
                "phone.news","手机新闻..");
        return "success";
    }
}

4.首部交换机(Headers exchange)

  • 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则
  • HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关
代码示例
  1. 配置类

    @Configuration
    public class RabbitHeaderConfig {
        /**
         * 这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,
         * 第一个 bindingName 方法中,
         * whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,
         * 就把该消息路由到名为 “name-queue” 的 Queue 上,
         * 这里也可以使用 whereAll 方法,
         * 表示消息的所有 Header 都要匹配。
         * whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。
         * bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,
         * 都将消息路由到名为 “age-queue” 的 Queue 上
         */
        public final static String HEADERNAME = "mq-header";
        @Bean
        public HeadersExchange headersExchange() {
            return new HeadersExchange(HEADERNAME, true, false);
        }
        @Bean
        public Queue queueName() {
            return new Queue("name-queue");
        }
        @Bean
        public Queue queueAge() {
            return new Queue("age-queue");
        }
        @Bean
        public Binding bindingName() {
            Map<String, Object> map = new HashMap<>();
            map.put("name", "mq");
            return BindingBuilder.bind(queueName())
                    .to(headersExchange()).whereAny(map).match();
        }
        @Bean
        public Binding bindingAge() {
            return BindingBuilder.bind(queueAge())
                    .to(headersExchange()).where("age").exists();
        }
    }
    
  2. 消费者

    @Component
    public class HeaderReceiver {
    	//注意这里的参数用 byte 数组接收
        @RabbitListener(queues = "name-queue")
        public void receiver1(byte[] msg) {
            System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length));
        }
        @RabbitListener(queues = "age-queue")
        public void receiver2(byte[] msg) {
            System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length));
        }
    }
    
  3. 生产者发送消息

package cn.smbms.controller;

import cn.smbms.config.RabbitFanoutConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author: zjl
 * @datetime: 2024/5/9
 * @desc: 
 */
@RestController
public class SendController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
    public String send(){
		//这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去
        Message nameMsg = MessageBuilder
                .withBody("hello header! name-queue".getBytes())
                .setHeader("name", "sang").build();
        Message ageMsg = MessageBuilder
                .withBody("hello header! age-queue".getBytes())
                .setHeader("age", "99").build();
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
        rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
        return "success";
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/609965.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

超详细的胎教级Stable Diffusion使用教程(一)

这套课程分为五节课&#xff0c;会系统性的介绍sd的全部功能和实操案例&#xff0c;让你打下坚实牢靠的基础 一、为什么要学Stable Diffusion&#xff0c;它究竟有多强大&#xff1f; 二、三分钟教你装好Stable Diffusion 三、小白快速上手Stable Diffusion 四、Stable dif…

本安防爆手机在电力行业中的应用

在电力行业这一充满挑战与风险的领域中&#xff0c;安全始终是最为首要的考量。电力巡检、维修等作业往往涉及易燃、易爆环境&#xff0c;这就要求工作人员配备能够在极端条件下保障通讯和作业安全的专业设备。防爆手机应运而生&#xff0c;以其独特的设计和卓越的性能&#xf…

05、Kafka 操作命令

05、Kafka 操作命令 1、主题命令 &#xff08;1&#xff09;创建主题 kafka-topics.sh --create --bootstrap-server 192.168.135.132:9092,192.168.135.133:9092,192.168.135.134:9092 --topic test1 --partitions 4 --replication-factor 3–bootstrap-server&#xff1a;…

Istio 流量管理(请求路由、流量转移、请求重试、流量镜像、故障注入、熔断等)介绍及使用

一、Istio 流量管理 Istio是一个开源的服务网格&#xff0c;它为分布式微服务架构提供了网络层的抽象。它使得服务之间的通信变得更为可靠、安全&#xff0c;并且提供了细粒度的流量管理、监控和策略实施功能。Istio通过在服务之间插入一个透明的代理&#xff08;Envoy&#x…

防静电段码屏驱动VK2C23适用于水电气表以及工控仪表类产品

VK2C23是一个点阵式存储映射的LCD驱动器&#xff0c;可支持最大224点&#xff08;56SEGx4COM&#xff09;或者最大416点&#xff08;52SEGx8COM&#xff09;的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据&#xff0c;也可通过指令进入省电模式。其高抗干扰&#xff…

Django实验(远程访问+图片显示)

众所周知&#xff0c;Python除了不能生孩子什么都会。Python也是可以做web服务的。 Python做web有一个重点优势是&#xff1a;做一个快速的AI Demo。 第一步&#xff1a;安装一个版本5.0以上django 第二步&#xff1a;构建咱们的Django工程&#xff0c;我取名为BBQ django-adm…

static静态成员变量和静态方法

当有new创建一个对象的,里面属性和方法,通过构造函数,能定义多个不同的对象,在我们做面向对象开发的时候,给一个场景,人在一个班级的时候,你的老师可能是固定的。 当我们用构造方法去构造的时候&#xff0c;每次都去传递一个固定的实参去定义个老师。 这样好会显得代码非常的…

Linux网络编程:TCP并发服务器实现

目录 1、前言 2、多进程代码实现 2.1 创建新的进程 2.2 客户端接收响应函数 2.3 僵尸进程处理 2.4 完整代码 2.5 代码测试 3、多线程代码实现 3.1 创建新的线程 3.2 线程函数定义 3.3 完整代码 3.4 代码测试 4、总结 1、前言 前面实现了基本的TCP编程&#xf…

理解机器学习中的类别不平衡问题

大家好&#xff0c;实际世界的数据集通常是杂乱的,当不同类别之间的样本分布不均匀时&#xff0c;就会出现类别不平衡。或者说&#xff0c;某些类别的样本比其他类别多得多。例如&#xff0c;考虑一个信用卡违约数据集&#xff0c;信用卡违约是一个相对较少发生的事件&#xff…

Java入门基础学习笔记2——JDK的选择下载安装

搭建Java的开发环境&#xff1a; Java的产品叫JDK&#xff08;Java Development Kit&#xff1a; Java开发者工具包&#xff09;&#xff0c;必须安装JDK才能使用Java。 JDK的发展史&#xff1a; LTS&#xff1a;Long-term Support&#xff1a;长期支持版。指的Java会对这些版…

Sass语法介绍-变量介绍

02 【Sass语法介绍-变量】 sass有两种语法格式Sass(早期的缩进格式&#xff1a;Indented Sass)和SCSS(Sassy CSS) 目前最常用的是SCSS&#xff0c;任何css文件将后缀改为scss&#xff0c;都可以直接使用Sassy CSS语法编写。 所有有效的 CSS 也同样都是有效的 SCSS。 Sass语…

javaMail快速部署——发邮件喽~

目录 功能阐述 前序步骤 &#xff08;1&#xff09;到QQ邮箱中获取到授权码 代码实现 坑 今天在写一个修改密码的功能的时候要用到邮箱的发送&#xff0c;然后因为这个项目比较老旧了&#xff0c;采用的是javaWeb和jsp的配置&#xff0c;对于我只使用过springBoot整合的ja…

京东手势验证码-YOLO姿态识别+Bézier curve轨迹拟合

这次给老铁们带来的是京东手势验证码的识别。 目标网站&#xff1a;https://plogin.m.jd.com/mreg/index 验证码如下图: 当第一眼看到这个验证码的时候&#xff0c;就头大了&#xff0c;这玩意咋识别&#xff1f;&#xff1f;&#xff1f; 静下心来细想后的一个方案&#xf…

JavaWeb中的Session和Cookie

前言 什么是会话跟踪技术 Cookie 1.什么是cookie 2.Cookie的应用 2.1 保持用户登录状态 2.2 记录用户名 3. Cookie的设置和获取 3.1 、通过HttpServletResponse.addCookie的方式设置Cookie 3.2、浏览器中查看cookie的内容 3.3、服务端获取客户端携带的cookie&#xf…

240+ Stylized Arctic Textures - Snow, Ice More

240+风格化的雪、冰、雪岩和其他雪纹理的集合,用于北极风格化/幻想/rpg风格的游戏环境。 在这个系列中,你会在风格化/幻想/rpg风格的游戏中找到大量适合北极和其他雪地环境的纹理——雪、冰、雪地岩石、雪地草、雪地砾石、雪地等等! 每个纹理都是可平铺/无缝的,并与各种不同…

C++语法|进程虚拟地址空间和函数调用栈

本文来自施磊老师的课程&#xff0c;老师讲的非常不错&#xff0c;我的笔记也是囫囵吞枣全部记下&#xff0c;但是我在这里推荐一本书&#xff0c;真的真的建议初学C或者想要进阶C的同学们看看&#xff1a;《CPU眼里的C/C》 文章目录 进程的虚拟地址空间和布局进程虚拟地址空间…

布隆过滤器和黑名单,解决Redis缓存穿透

目录 1.什么是布隆过滤器&#xff1f; 2.布隆过滤器的原理 3.空间计算 4.布隆过滤器的视线场景&#xff1a; 5.在Spring Boot中集成Redisson实现布隆过滤器 6、Redisson实现布隆过滤器 6.1导入依赖 6.2使用 布隆过滤器&#xff08;Bloom Filter&#xff09;是1970年由布…

邮件大附件系统如何进行安全、高效的大附件发送?

邮件大附件系统是一套解决传统电子邮件系统&#xff0c;在发送大文件时遇到限制的解决方案。由于传统电子邮件系统通常对附件大小有限制&#xff0c;这使得发送大文件变得困难。邮件大附件系统通过各种技术手段&#xff0c;允许用户发送超过传统限制的大文件&#xff0c;通常在…

修改latex中block中公式与block标题间隔过大的问题

修改block中公式与block间隔过大的问题 如图的block中公式出现了空白:代码见下方 \begin{proof}[证明]\begin{align*}&Z\alpha \beta _XX\beta _YY\varepsilon \rightarrow XZ\alpha X\beta _XX^2\beta _YXY\varepsilon X&\\&E\left( Z \right) \alpha \beta _XE\…

【Java】Java中栈溢出的常见情况及解决方法

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…