SpringBoot整合RabbitMQ的快速使用教程

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {
    public static void main( String[] args ) {
        SpringApplication.run(ConsumerApp.class, args);
    }

    //使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化
    @Bean
    public MessageConverter jacksonMessageConvertor(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能
        return jackson2JsonMessageConverter;
    }
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {

    private static String EXCHANGE_NAME="amq.topic";
    private static String QUEUE_NAME="alarm.data.topic.queue";
    private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";

    /**
     * 声明交换机
     */

    @Bean
    public TopicExchange exchange(){
         // durable:是否持久化,默认是false
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。
        return new TopicExchange(EXCHANGE_NAME,true,false);
    }

    /**
     * 声明告警队列
     * @return
     */
    @Bean("alarmQueue")
    public Queue alarmQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue(QUEUE_NAME,true,false,false);
    }

    /**
     * 声明确认告警队列
     * @return
     */
    @Bean("confirmAlarmQueue")
    public Queue confirmAlarmQueue(){
        return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);
    }

    /**
     * 声明告警队列绑定关系
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");
    }

    /**
     * 声明确认告警队列绑定关系
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");
    }
2、生产消息代码
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static String EXCHANGE_NAME="amq.topic";
    private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";

    @Test
    void producerAlarmMsg() {
        String msg = "发送一条告警消息";
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);
        System.out.println("msg = " + msg);
    }

    @Test
    void producerConfirmAlarmMsg() {
        String msg = "发送一条确认告警消息";
        rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);
        System.out.println("msg = " + msg);
    }

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {

        @Autowired
        private IAlarmService alarmService;


        @RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")
        public void getAlarmInfo(String data){
            alarmService.dealAlarmData(data);
        }

        @RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")
        public void getConfirmAlarmInfo(String data){
            alarmService.dealConfirmAlarmData(data);
        }
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {

    @Override
    public void dealAlarmData(String data) {

        EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);
        List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();
        DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);
        dceEquipAlarmDto.setCreateTime(new Date());
        dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);
        //查询出需要新增或者更新的数据
        Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();
        //开启事务,保证新增、更新、删除的原子性
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
        List<DceEquipAlarmDto> list=new ArrayList<>();
        list.add(dceEquipAlarmDto);
        try {
            //新增
            if (!flag) {
                dceEquipAlarmMapper.insertBatch(list);
            }
            //更新
            if (flag) {
                dceEquipAlarmMapper.updateBatch(list);
            }
            //提交事务
            transactionManager.commit(transaction);
        } catch (Exception e) {
            //回滚
            transactionManager.rollback(transaction);
            log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);
        }
    }

    @Override
    public void dealConfirmAlarmData(String data) {

        EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);
        Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));
        alarmResp.setConfirmTime(confirmTime);
        dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());

    }

}

注:以上代码为对接告警信息和对接告警确认消息的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/656096.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【老王最佳实践-6】Spring 如何给静态变量注入值

有些时候&#xff0c;我们可能需要给静态变量注入 spring bean&#xff0c;尝试过使用 Autowired 给静态变量做注入的同学应该都能发现注入是失败的。 Autowired 给静态变量注入bean 失败的原因 spring 底层已经限制了&#xff0c;不能给静态属性注入值&#xff1a; 如果我…

【AI算法岗面试八股面经【超全整理】——机器学习】

AI算法岗面试八股面经【超全整理】 概率论信息论机器学习深度学习CVNLP 目录 1、回归损失函数2、分类损失函数3、误差&#xff08;Error&#xff09;、偏差&#xff08;Bias&#xff09;、方差&#xff08;Variance&#xff09;4、PCA&#xff08;Principle Component Analysi…

数据库语法树优化

目录 一、σ、π、⋈ 1.选择σ 2.投影π 3.连接⋈ 二、 构建语法树 ① 解读sql语句 ② 写出关系代数表达式 ③ 画出语法树 三、优化语法树 四、练习 语法树优化方法 一、σ、π、⋈ 1.选择σ 选择就是在关系R中选择满足给定条件的诸元组。 通过条件SdeptIS选择出系别…

5,串口编程---实现简单的用串口发送接收数据

单片机通过串口向PC机发送数据 PC机通过串口接收单片机发过来的数据 1.UART和USART的区别&#xff1a; USART支持同步通信方式,可以通过外部时钟信号进行同步传输,而UART仅支持异步通信方式 本开发板STM32F103ZET6有5个串口&#xff0c;用串口1作调试串口&#xff0c;因为串…

【算法实战】每日一题:设计一个算法,用最少数量的矩形覆盖一系列宽度为d、高度为w的矩形,且使用矩形不能超出边界

题目 设计一个算法&#xff0c;用最少数量的矩形覆盖一系列宽度为d、高度为w的矩形建筑物侧墙&#xff0c;且矩形不能超出边界。 核心思路 考虑这种结构 前面递增后面一个与前面的某个高度一致&#xff0c;这时候考虑最下面的覆盖&#xff08;即都是从最下面向上覆盖&#…

进程互斥经典问题(读写者问题、理发店问题)

目录 读写者问题 问题描述 问题分析 进程互斥问题三部曲 读者写者算法实现 一、找进程——确定进程关系 二、找主营业务 三、找同步约束 a.互斥 b.资源 c.配额 理发店问题 问题描述 问题分析 进程互斥问题三部曲 理发店问题算法实现 一、找进程——确定进程…

特朗普竞选带火PoliFi,以Bitget为例

以特朗普系列Meme币为代表的政治金融(PoliFi)概念币市场正在掀起热潮&#xff0c;前美国总统特朗普(Donald Trump)在本月稍早公开力挺加密货币&#xff0c;接着又在周二宣布接受比特币、以太币、SOL、USDC、DOGE…等政治献金&#xff0c;让相关通证高涨。 据CoinGecko数据&…

鲜花门店小程序开发流程:详细教程,让你轻松掌握

想要开发一款专属于自己鲜花门店的小程序吗&#xff1f;不知道从何开始&#xff1f;别担心&#xff0c;本文将为你提供详细的开发流程&#xff0c;帮助你轻松掌握。 1. 注册登录乔拓云网并进入操作后台 首先&#xff0c;你需要注册并登录乔拓云网&#xff0c;然后进入操作后台…

简单随机数据算法

文章目录 一&#xff0c;需求概述二&#xff0c;实现代码三、测试代码四、测试结果五、源码传送六、效果演示 一&#xff0c;需求概述 系统启动时&#xff0c;读取一组图片数据&#xff0c;通过接口返回给前台&#xff0c;要求&#xff1a; 图片随机相邻图片不重复 二&#…

AcWing 2568:树链剖分 ← 线段树+DFS

【题目来源】https://www.acwing.com/problem/content/2570/【题目描述】 给定一棵树&#xff0c;树中包含 n 个节点&#xff08;编号 1∼n&#xff09;&#xff0c;其中第 i 个节点的权值为 ai。 初始时&#xff0c;1 号节点为树的根节点。 现在要对该树进行 m 次操作&#xf…

力扣225. 用队列实现栈

Problem: 225. 用队列实现栈 文章目录 题目描述&#xff1a;思路Code 题目描述&#xff1a; 思路 1.对一个queue模拟栈的操作&#xff0c;同时用一个int类型的变量topElem记录每次每次队列队尾的元素&#xff08;也即是模拟stack中的stack的栈顶元素&#xff09;&#xff1b; 2…

Linux之单机项目部署

1、虚拟机&#xff08;VMware&#xff09;创建Linux系统 1.1、创建虚拟机 1.2、配置虚拟机IOS映射文件 1.3、虚拟机内部相关配置 等待加载即可&#xff0c;加载完后会弹出图形化界面&#xff0c;如图&#xff1a; 注意&#xff1a;一般我们做为管理员使用ROOT账号来操作&#x…

Tomcat端口配置

Tomcat是开源免费的服务器&#xff0c;其默认的端口为8080&#xff0c;本文讲述一下如何配置端口。 最后在浏览器中输入localhost:8888即可打开Tomcat界面

怎样打造一份个性化画册呢?我来教你

在这个数字化的时代&#xff0c;传统的照片已经不能满足我们对个性化回忆的需求。个性化画册&#xff0c;不仅能够承载我们的记忆&#xff0c;还能展现自我风格。今天&#xff0c;就让我来教你如何打造一份属于自己的个性化画册。 1.要制作电子杂志,首先需要选择一款适合自己的…

IT人的拖延——一放松就停不下来,耽误事?

拖延的表现 在我们的日常工作中&#xff0c;经常会面对这样一种情况&#xff1a;因为要做的Sprint ticket比较复杂或者长时间的集中注意力后&#xff0c;本来打算休息放松一下&#xff0c;刷刷剧&#xff0c;玩玩下游戏&#xff0c;但却一个不小心&#xff0c;没控制住时间&am…

【全开源】JAVA同城搬家系统源码小程序APP源码

JAVA同城搬家系统源码 特色功能&#xff1a; 强大的数据处理能力&#xff1a;JAVA提供了丰富的数据结构和算法&#xff0c;以及强大的并发处理能力&#xff0c;使得系统能够快速地处理大量的货物信息、司机信息、订单信息等&#xff0c;满足大规模物流的需求。智能路径规划&a…

常见5大开发进度盲点问题及解决方案

在软件开发项目中&#xff0c;识别并解决常见的进度管理盲点问题&#xff0c;对于确保项目按时、按预算、高质量完成至关重要。它直接关系到项目能否顺利进行&#xff0c;忽视任何一个问题&#xff0c;都可能导致项目延期、成本超支、质量下降&#xff0c;甚至项目失败。 因此&…

@ConfigurationProperties结合Nacos配置动态刷新之底层原理分析

Hello&#xff0c;我是大都督周瑜&#xff0c;本文给大家分析一下ConfigurationProperties结合Nacos配置动态刷新的底层原理&#xff0c;记得点赞、关注、分享哦&#xff01; 公众号&#xff1a;IT周瑜 应用背景 假如在Nacos中有Data ID为common.yml的配置项&#xff1a; m…

数智赋能内涝治理,四信城市排水防涝解决方案保障城市安全运行

由强降雨、台风造成城市低洼处出现大量积水、内涝的情况时有发生&#xff0c;给人们出行带来了极大不便和安全隐患&#xff0c;甚至危及群众生命财产安全。 为降低内涝造成的损失&#xff0c;一方面我们要大力加强城市排水基础设施的建设&#xff1b;另一方面要全面掌握城市内涝…

目标检测基础初步学习

目标检测&#xff08;Object Detection&#xff09; 目标检测任务说明 在动手学习深度学习中对目标检测任务有如下的描述。 图像分类任务中&#xff0c;我们假设图像中只有一个主要物体对象&#xff0c;我们只关注如何识别其类别。 然而&#xff0c;很多时候图像里有多个我们…