从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ

前言

最近有个想法想整理一个内容比较完整springboot项目初始化Demo。

SpringBoot集成RabbitMQ

 RabbitMQ中的一些角色:

  • publisher:生产者

  • consumer:消费者

  • exchange个:交换机,负责消息路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 

junit用于测试。

一、pom引入依赖amqp

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <!--rabbitmq-->

二、application-dev.yaml 增加RabbitMQ相关配置

spring:
  #RabbitMQ服务器配置,地址账号密码,virtualhost等配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: murg
    password: 123456
    virtual-host: murg-host
    #队列中没有消息,阻塞等待时间
    template:
      receive-timeout: 2000
logging:
  level:
    org.springframework.security: debug


三、发布/订阅

RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。

此模式下根据交换机类型又分为三种。

1.Fanout类型:   广播模式    把消息交给所有绑定到交换机的队列

2.Direct类型:    路由模式     把消息交给符合指定routing key 的队列

3Topic类型:     主题模式      把消息交给符合主题通配符的队列

3.1 Fanout类型

在广播模式下,消息发送流程是这样的:

1) 可以有多个队列

2) 每个队列都要绑定到Exchange(交换机)

3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

4) 交换机把消息发送给绑定过的所有队列

5) 订阅队列的消费者都能拿到消

3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起

创建配置类FanoutConfig 

声明一个Fanout类型交换机命名为murg.fanout

声明两个Queue队列分别为fanout.queue1和fanout.queue2

分别将两个队列和交换机绑定,后续用于消费消息。

package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Fanout 广播模式下
 *声明交换机和队列
 */
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机和队列 将交换机和队列绑定在一起
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("murg.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2创建消息生产服务

创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。

 注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列

新增测试广播模式下发送消息方法

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * Rabbitmq消息生产者
 */
@Service
@RequiredArgsConstructor
public class MessageProducerService {
    private final RabbitTemplate rabbitTemplate;

    /**
     * 测试广播模式下发送消息
     * @param msg
     */
    public void testFanoutExchange(String msg) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.fanout","",msg);
    }

   
}

3.13创建消息消费服务

创建消息生产服务MessageConsumerService 

@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener 注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 消息消费者
 *
 */
@Service
@RequiredArgsConstructor
public class MessageConsumerService {

    /**
     * 下面是Fanout,广播模式的监听
     * 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2
     * @param msg
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }


  
}

3.14修改测试类进行测试

修改创建项目时生成的 BootdemoApplicationTests.class

增加以下注解

@ActiveProfiles("dev") 

指定运行环境为开发环境
@RunWith(SpringRunner.class)

指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。

@SpringBootTest(classes={BootdemoApplication.class})

指定启动类

package com.murg.bootdemo;


import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {
    @Autowired
    MessageProducerService messageProducerService;
    @Test
    public void contextLoads() {
        System.out.printf("aaaaaaaaaaaaa");
    }
    //测试
    @Test
    public void testFanoutExchange(){
        String msg = "遍身罗绮者,不是养蚕人";
        for (int i=0;i<10;i++){
            messageProducerService.testFanoutExchange(msg);
        }
    }

}

3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果

两个队列都可消费。 

3.2 Direct类型

3.2.1MessageProducerService生产服务增加方法testDirectExchange

在Direct模型下:
 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
 Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

/**
     * 测试Direct模式下发送消息
     * 在Direct模型下:
     *
     * 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
     *
     * 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
     *
     * Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
     * murg.Direct
     * @param msg
     */
    public void testDirectExchange(String msg,String rountingkey) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);
    }

3.2.2MessageConsumerService消费服务增加方法testDirectExchange

Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息

direct.queue1 消费 rountingkey为“蚕妇”的消息

direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息

    /**
     *
     * 下面是rountingKey 路由key 模式的消费监听
     * 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
     *
     * 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"蚕妇"} //指定消费rountingkey
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

3.2.3测试类增加测试方法

    @Test
    public void testDirectExchange() throws InterruptedException {
        String msg1 = "遍身罗绮者,不是养蚕人";
        String key1 ="蚕妇";
        String msg2 = "朱门酒肉臭,路有冻死骨";
        String key2 ="自京赴奉先县咏怀五百字";

        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testDirectExchange(msg2,key2);
                Thread.sleep(1000);
            }else {
                messageProducerService.testDirectExchange(msg1,key1);
                Thread.sleep(1000);

            }

        }
    }

调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果

3.3 Topic类型

3.3.1MessageProducerService生产服务增加方法testTopicExchange

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu    #.demo写法也可以
demo.*:只能匹配demo.spu

  public void testTopicExchange(String msg, String key) {
        rabbitTemplate.convertAndSend("murg.topic",key,msg);
    }

3.3.2MessageConsumerService消费服务增加方法testDirectExchange

@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息

topic.queue1 消费 rountingkey为“罗隐.*”的消息

topic.queue2 消费 rountingkey为“#.贫女”的消息

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"罗隐.*"} //指定消费rountingkey
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"#.贫女"}//指定消费rountingkey
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

3.3.3测试类增加测试方法 

    @Test
    public void testTopicExchange() throws InterruptedException {
        String msg = "采得百花成蜜后,为谁辛苦为谁甜";
        String key ="罗隐.蜂";
        String msg2 = "苦恨年年压金线,为他人作嫁衣裳";
        String key2 ="秦韬玉.贫女";
        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testTopicExchange(msg,key);
                Thread.sleep(1000);
            }else {
                messageProducerService.testTopicExchange(msg2,key2);
                Thread.sleep(1000);

            }
        }
    }

调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/912293.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

github高分项目 WGCLOUD - 运维实时管理工具

GitHub - tianshiyeben/wgcloud: Linux运维监控工具&#xff0c;支持系统硬件信息&#xff0c;内存&#xff0c;CPU&#xff0c;温度&#xff0c;磁盘空间及IO&#xff0c;硬盘smart&#xff0c;GPU&#xff0c;防火墙&#xff0c;网络流量速率等监控&#xff0c;服务接口监测&…

CDN到底是什么?

文章目录 CDN到底是什么&#xff1f;一、引言二、CDN的基本概念1、CDN的定义2、CDN的作用3、代码示例&#xff1a;配置CNAME记录 三、CDN的工作原理1、请求流程2、代码示例&#xff1a;DNS解析过程3、完整的CDN工作流程 四、总结 CDN到底是什么&#xff1f; 一、引言 在互联网…

DeFi 4.0峥嵘初现:主权金融时代的来临

近年来&#xff0c;Web3领域的创新似乎遇到了瓶颈&#xff0c;DeFi&#xff08;去中心化金融&#xff09;从热潮的巅峰逐渐进入了一个沉寂期。我们再也没有见到像DeFi Summer那样的行业兴奋&#xff0c;资本市场的动荡和Meme币的出现&#xff0c;似乎让人们忘记了曾经的区块链技…

Linux:调试器 gdb/cgdb 的使用

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、调试前的预备二. 使用&#xff08;gdb的常用命令&#xff09;三. 推荐安装cgdb总结 前言 本文主要讲解如何在Linux环境下面来对代码进行调试 一、调试前的…

知识中台赋能法律咨询服务:八大核心优势

法律咨询服务领域&#xff0c;知识中台以其独特的功能和优势&#xff0c;为行业发展注入了新的活力。以下是知识中台在法律咨询服务中展现的八大核心优势&#xff1a; 一、法律知识资源的全面整合 知识中台致力于收集、整理和整合各类法律知识资源&#xff0c;包括法律法规、…

03集合基础

目录 1.集合 Collection Map 常用集合 List 接口及其实现 Set 接口及其实现 Map 接口及其实现 Queue 接口及其实现 Deque 接口及其实现 Stack类 并发集合类 工具类 2.ArrayList 3.LinkedList 单向链表的实现 1. 节点类&#xff08;Node&#xff09; 2. 链表类&a…

VBA高级应用30例应用3在Excel中的ListObject对象:插入行和列

《VBA高级应用30例》&#xff08;版权10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以…

万字长文详解JavaScript基础语法--前端--前端样式--JavaWeb

&#x1f64b;大家好&#xff01;我是毛毛张! &#x1f308;个人首页&#xff1a; 神马都会亿点点的毛毛张 今天毛毛张带来的前端教程的第三期&#xff1a;JavaScript 文章目录 4.JavaScript4.1 JS简介4.1.1 JS起源4.1.2 JS 组成部分4.1.3 JS的引入方式 4.2 JS的数据类型和运…

工业主板在汽车制造中的应用

工业主板在汽车制造中的应用非常广泛&#xff0c;主要得益于其高稳定性、高集成性、以及强大的计算和处理能力。以下是对工业主板在汽车制造中应用的详细分析&#xff1a; 一、应用场景 自动驾驶车辆&#xff1a; 工业主板作为自动驾驶车辆的核心计算平台&#xff0c;负责处…

post sim下如何将timing信息反标到仿真工具

文章目录 0 前言1 调用格式2 option介绍2.1 sdf_file (**必须**)2.2 module_instance (**可选**)2.3 config_file (**可选**)2.4 log_file (**可选**)2.5 mtm_spec (**可选**)2.6 scale_factors (**可选**)2.7 scale_type (**可选**) 0 前言 跑post sim时需要带入timing信息&a…

软件设计师-上午题-15 计算机网络(5分)

计算机网络题号一般为66-70题&#xff0c;分值一般为5分。 目录 1 网络设备 1.1 真题 2 协议簇 2.1 真题 3 TCP和UDP 3.1 真题 4 SMTP和POP3 4.1 真题 5 ARP 5.1 真题 6 DHCP 6.1 真题 7 URL 7.1 真题 8 浏览器 8.1 真题 9 IP地址和子网掩码 9.1 真题 10 I…

VLAN 高级技术实验

目录 一、实验背景 二、实验任务 三、实验步骤 四、实验总结 一、实验背景 假如你是公司的网络管理员&#xff0c;为了节省内网的IP地址空间&#xff0c;你决定在内网部署VLAN聚合&#xff0c;同时为了限制不同业务之间的访问&#xff0c;决定同时部署MUX VLAN。 二、实验…

一文快速预览经典深度学习模型(一)——CNN、RNN、LSTM、Transformer、ViT

Hi&#xff0c;大家好&#xff0c;我是半亩花海。本文主要简要并通俗地介绍了几种经典的深度学习模型&#xff0c;如CNN、RNN、LSTM、Transformer、ViT&#xff08;Vision Transformer&#xff09;等&#xff0c;便于大家初探深度学习的相关知识&#xff0c;并更好地理解深度学…

这是一个bug求助帖子--安装kali 遇坑

第一个报错 介质&#xff1a;kali-linux-2024.1-live-amd64 环境&#xff1a;Dell笔记本 i510代cpu 现象及操作 安装完以后 然后我换了个国内的源进行了以下操作 apt-get update&#xff1a;更新源列表 apt-get upgrade&#xff1a;更新所有可以更新的软件包 然后进行清理。…

qt QClipboard详解

1、概述 QClipboard是Qt框架中的一个类&#xff0c;它提供了对窗口系统剪贴板的访问能力。剪贴板是一个临时存储区域&#xff0c;通常用于在应用程序之间传递文本、图像和其他数据。QClipboard通过统一的接口来操作剪贴板内容&#xff0c;使得开发者能够方便地实现剪切、复制和…

PyTorch核心概念:从梯度、计算图到连续性的全面解析(三)

文章目录 Contiguous vs Non-Contiguous TensorTensor and ViewStrides非连续数据结构&#xff1a;Transpose( )在 PyTorch 中检查Contiguous and Non-Contiguous将不连续张量&#xff08;或视图&#xff09;转换为连续张量view() 和 reshape() 之间的区别总结 参考文献 Contig…

DeBiFormer实战:使用DeBiFormer实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…

iOS SmartCodable 替换 HandyJSON 适配记录

前言 HandyJSON群里说建议不要再使用HandyJSON&#xff0c;我最终选择了SmartCodable 来替换&#xff0c;原因如下&#xff1a; 首先按照 SmartCodable 官方教程替换 大概要替换的内容如图&#xff1a; 详细的替换教程请前往&#xff1a;使用SmartCodable 平替 HandyJSON …

Sqoop学习

目录 一、Soop简介 二、Sqoop的安装 1. 上传压缩包到/opt/install目录下 2.解压 3.修改文件名 4.拷贝sqoop-1.4.7.bin__hadoop-2.6.0目录下的sqoop-1.4.7.jar包到/opt/soft/sqoop147目录下 5.拷贝sqoop-1.4.7.bin__hadoop-2.6.0/lib目录下该jar包到sqoop/lib目录下 6.复…

WiFi一直获取不到IP地址是怎么回事?

在当今这个信息化时代&#xff0c;WiFi已成为我们日常生活中不可或缺的一部分。无论是家庭、办公室还是公共场所&#xff0c;WiFi都为我们提供了便捷的无线互联网接入。然而&#xff0c;有时我们可能会遇到WiFi连接后无法获取IP地址的问题&#xff0c;这不仅影响了我们的网络使…