SpringBoot日常:封装rabbitmq starter组件

文章目录

    • 逻辑实现
      • RabbitExchangeEnum
      • RabbitConfig
      • RabbitModuleInfo
      • RabbitModuleInitializer
      • RabbitProperties
      • RabbitProducerManager
      • POM.xml
      • spring.factories
    • 功能测试
      • application.yml配置
      • 生产者:
      • 消费者:
      • 测试结果:
      • 总结

本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter

逻辑实现

下面直接进入主题,介绍整体用到的文件和逻辑内容

RabbitExchangeEnum

交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机

/**
 * @Author 码至终章
 * @Version 1.0
 */
public enum RabbitExchangeEnum {

    DIRECT,
    TOPIC,
    FANOUT,
    HEADERS;
}

RabbitConfig

初始化配置文件

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author 码至终章
 * @Version 1.0
 */
@Configuration
public class RabbitConfig {

    /**
     * 通过yaml配置,创建队列、交换机初始化器
     */
    @Bean
    @ConditionalOnMissingBean
    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
        return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);
    }
}

RabbitModuleInfo

配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性

import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;

import java.util.Map;

/**
 * 队列和交换机机绑定关系实体对象
 *
 * @Author 码至终章
 * @Version 1.0
 */
@Data
public class RabbitModuleInfo {

    /**
     * 路由Key
     */
    private String routingKey;
    /**
     * 队列信息
     */
    private Queue queue;
    /**
     * 交换机信息
     */
    private Exchange exchange;

    /**
     * 交换机信息类
     */
    @Data
    public static class Exchange {
        /**
         * 交换机类型
         * 默认直连交换机
         */
        private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;
        /**
         * 交换机名称
         */
        private String name;
        /**
         * 是否持久化
         * 默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;
        /**
         * 当所有队绑定列均不在使用时,是否自动删除交换机
         * 默认false,不自动删除
         */
        private boolean autoDelete = false;
        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;
    }

    /**
     * 队列信息类
     */
    @Data
    public static class Queue {
        /**
         * 队列名称
         */
        private String name;
        /**
         * 是否持久化
         * 默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;
        /**
         * 是否具有排他性
         * 默认false,可多个消费者消费同一个队列
         */
        private boolean exclusive = false;
        /**
         * 当消费者均断开连接,是否自动删除队列
         * 默认false,不自动删除,避免消费者断开队列丢弃消息
         */
        private boolean autoDelete = false;
        /**
         * 绑定死信队列的交换机名称
         */
        private String deadLetterExchange;
        /**
         * 绑定死信队列的路由key
         */
        private String deadLetterRoutingKey;


        private Map<String, Object> arguments;
    }

}

RabbitModuleInitializer

执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。

import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @Author cys
 * @Date 2024/6/17 14:23
 * @Version 1.0
 */
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {

    AmqpAdmin amqpAdmin;

    RabbitProperties rabbitProperties;

    public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
        this.amqpAdmin = amqpAdmin;
        this.rabbitProperties = rabbitProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        log.info("初始化rabbitmq交换机、队列----------------start");
        declareRabbitModule();
        log.info("初始化rabbitmq交换机、队列----------------end");
    }

    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    private void declareRabbitModule() {
        List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();
        if (CollectionUtils.isEmpty(rabbitModuleInfos)) {
            return;
        }
        for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
            configParamValidate(rabbitModuleInfo);
            // 队列
            Queue queue = convertQueue(rabbitModuleInfo.getQueue());
            // 交换机
            Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
            // 绑定关系
            String routingKey = rabbitModuleInfo.getRoutingKey();
            String queueName = rabbitModuleInfo.getQueue().getName();
            String exchangeName = rabbitModuleInfo.getExchange().getName();
            Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
            // 创建队列
            if (!isExistQueue(queueName)) {
                amqpAdmin.declareQueue(queue);
            }
            // 创建交换机
            amqpAdmin.declareExchange(exchange);
            // 队列 绑定 交换机
            amqpAdmin.declareBinding(binding);
        }
    }

    /**
     * RabbitMQ动态配置参数校验
     *
     * @param rabbitModuleInfo 队列和交换机机绑定关系
     */
    public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
        String routingKey = rabbitModuleInfo.getRoutingKey();
        Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");
        Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));
        Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));
        Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));
        Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));

    }

    /**
     * 转换生成RabbitMQ队列
     *
     * @param queue 队列
     * @return Queue
     */
    public Queue convertQueue(RabbitModuleInfo.Queue queue) {
        Map<String, Object> arguments = queue.getArguments();

        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        // 是否需要绑定死信队列
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {
            if (arguments == null) {
                arguments = new HashMap<>(4);
            }
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);

        }
        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }


    /**
     * 转换生成RabbitMQ交换机
     *
     * @param exchangeInfo 交换机信息
     * @return Exchange
     */
    public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {

        AbstractExchange exchange = null;
        RabbitExchangeEnum exchangeType = exchangeInfo.getType();
        String exchangeName = exchangeInfo.getName();
        boolean isDurable = exchangeInfo.isDurable();
        boolean isAutoDelete = exchangeInfo.isAutoDelete();

        Map<String, Object> arguments = exchangeInfo.getArguments();

        switch (exchangeType) {
            case DIRECT:
                // 直连交换机
                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case TOPIC:
                // 主题交换机
                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case FANOUT:
                //扇形交换机
                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case HEADERS:
                // 头交换机
                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
        }
        return exchange;
    }

    /**
     * 判断队列是否存在
     *
     * @param queueName 队列名
     * @return boolean
     */
    private boolean isExistQueue(String queueName) {
        if (StringUtils.isBlank(queueName)) {
            throw new RuntimeException("队列名称为空");
        }

        boolean flag = true;
        Properties queueProperties = amqpAdmin.getQueueProperties(queueName);
        if (queueProperties == null) {
            flag = false;
        }
        return flag;
    }


}

RabbitProperties

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author 码至终章
 * @Version 1.0
 */
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {

    private List<RabbitModuleInfo> modules;
}

RabbitProducerManager

发送消息的生产者方法

public class RabbitProducerManager {
    private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);
    private final RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String rabbitRouting, Object message) {
        this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
        log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message);
    }

    public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {
        this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
        log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});
    }

    public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}

POM.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.7.18</version>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
			<version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>
    </dependencies>

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties

功能测试

application.yml配置

spring:
  profiles:
    active: dev
  ## rabbitmq链接配置  
  rabbitmq:
    host: 192.168.199.199
    port: 5672
    username: test
    password: 123456789
    virtual-host: test

cys:
  rabbit:
    modules:
      - exchange:
          name: mytest
          #type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct
          type: DIRECT
        queue:
          name: default.queue
          arguments:
            # 队列中所有消息的最大存活时间。单位毫秒。 1分钟
            x-message-ttl: 60000
        # routing-key可以为空
        routing-key: default.queue.key

生产者:

@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {
    /**
     * 主键
     */
    @TableId(type = IdType.AUTO)
    @TableField(value = "cust_id")
    private Long custId;
}

@RestController
@RequestMapping("/mqtest")
public class MqController {

    @Autowired
    RabbitProducerManager rabbitProducerManager;
    @Autowired
    MailService mailService;

    @GetMapping("/mqtest")
    public void test(){
        TaskEntity taskEntity = new TaskEntity();
        taskEntity.setCustId(211212L);
        rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));
    }
}

消费者:

@Component
public class MyListener {

    @RabbitListener(queues = "default.queue")
    public void handMessage(String message){

        TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);
        System.out.println("接收到的消息"+taskEntity);

    }
}

测试结果:

请求接口/mqtest/mqtest
在这里插入图片描述

总结

到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/782669.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【电机控制】EG2134无刷电机驱动、控制一体板——开环、无感SMO验证

【电机控制】EG2134无刷电机驱动、控制一体板——开环、无感SMO验证 文章目录 前言一、硬件二、软件三、开环SVPWM四、SMO无感观测器闭环控制五、参考文献总结 前言 【电机控制】直流有刷电机、无刷电机汇总——持续更新 【电机控制】EG2134无感FOC驱控一体板-滑模观测器 使用…

C++11中新特性介绍-之(二)

11.自动类型推导 (1) auto类型自动推导 auto自动推导变量的类型 auto并不代表某个实际的类型&#xff0c;只是一个类型声明的占位符 auto并不是万能的在任意场景下都能推导&#xff0c;使用auto声明的变量必须进行初始化&#xff0c;以让编译器推导出它的实际类型&#xff0c;…

苏东坡传-读书笔记十

不管怎么说&#xff0c;能使读者快乐的确是苏东坡作品的一个特点。苏东坡最快乐就是写作之时。一天&#xff0c;苏东坡对朋友说&#xff1a;“我一生之至乐在执笔为文之时&#xff0c;心中错综复杂之情思&#xff0c;我笔皆可畅达之。我自谓人生之乐&#xff0c;未有过于此者也…

红黑树模拟实现

概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路径会比其他路径长出俩倍&#xff0c;因而是接近平衡…

昇思25天学习打卡营第20天|RNN实现情感分类

数据准备 使用IMDB影评数据集&#xff0c;包含Positive和Negative两类。 数据下载 import os import shutil import requests import tempfile from tqdm import tqdm from typing import IO from pathlib import Path# 指定保存路径为 home_path/.mindspore_examples cache…

蚓链实践告诉你“企业确保达成数字化营销效果的方法”

在如今这个数字化盛行的时代&#xff0c;企业想在激烈的市场竞争里崭露头角&#xff0c;确保数字营销效果那可是至关重要&#xff01;今天就来给大家聊聊实现这一目标的基本条件&#xff0c;来自蚓链数字化营销系统的广大用户体验总结。 一、精准的目标定位 企业一定要清楚地知…

第一作者讲述《生态系统架构:人工智能时代从业者的新思维》背后的故事:Episode One

当前&#xff0c;人工智能技术正不断渗透到各行各业&#xff0c;对企业和组织的系统和流程带来深刻的影响。生态系统架构可以帮助企业进行更好的规划和管理人工智能系统&#xff0c;使人工智能技术能够更好地为企业所用&#xff0c;从而实现企业的数字化转型和更好的商业表现。…

信号量——Linux并发之魂

欢迎来到 破晓的历程的 博客 引言 今天&#xff0c;我们继续学习Linux线程本分&#xff0c;在Linux条件变量中&#xff0c;我们对条件变量的做了详细的说明&#xff0c;今天我们要利用条件变量来引出我们的另一个话题——信号量内容的学习。 1.复习条件变量 在上一期博客中&…

HTML5实现我的音乐网站源码

文章目录 作者&#xff1a;[xcLeigh](https://blog.csdn.net/weixin_43151418) 1.设计来源1.1 界面效果1.2 轮播图界面1.3 音乐播放界面1.4 视频播放界面 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作…

DAY22-力扣刷题

1.被围绕的区域 方法一&#xff1a;深度优先搜索 class Solution {int n, m;public void solve(char[][] board) {n board.length;if (n 0) {return;}m board[0].length;for (int i 0; i < n; i) {dfs(board, i, 0);dfs(board, i, m - 1);}for (int i 1; i < m - 1…

项目方案:社会视频资源整合接入汇聚系统解决方案(九)-视频监控汇聚应用案例

目录 一、概述 1.1 应用背景 1.2 总体目标 1.3 设计原则 1.4 设计依据 1.5 术语解释 二、需求分析 2.1 政策分析 2.2 业务分析 2.3 系统需求 三、系统总体设计 3.1设计思路 3.2总体架构 3.3联网技术要求 四、视频整合及汇聚接入 4.1设计概述 4.2社会视频资源分…

5.opencv深浅拷贝

图像处理的复制操作 深浅拷贝 图像复制分成两种&#xff0c;第一种假复制&#xff0c;从原图片选择一部分图片拿出来观察&#xff0c;此时新生成的图片和原图实际上是同一张图片&#xff0c;即浅拷贝 将图片的一部分复制下来&#xff0c;放到新的内存中&#xff0c;即两张完全…

AI视频教程下载-使用ChatGPT成为全栈JavaScript开发者

学习使用Express JS和React JS进行全栈JavaScript开发 ChatGPT Express JS MongoDB React JS Tailwind 解锁全栈网页开发的世界&#xff0c;我们为初学者和中级学习者设计了全面的课程。在这段沉浸式的旅程中&#xff0c;你将深入前端和后端开发的基本概念&#xff0c;为自…

【DataSophon】DataSophon1.2.1 ranger usersync整合

目录 一、简介 二、实现步骤 2.1 ranger-usersync包下载编译 2.2 构建压缩包 2.3 编辑元数据文件 2.4 修改源码 三、重新安装 一、简介 如下是DDP1.2.1默认有的rangerAdmin&#xff0c; 我们需要将rangerusersync整合进来 ,实现将Linux机器上的用户和组信息同步到Ranger…

【Linux】线程(轻量级进程)

目录 一、线程概念 二、线程特性 2.1 进程更加轻量化 2.2 线程的优点 2.3 线程的缺点 2.4 线程的异常 2.5 线程用途 三、进程和线程 四、线程控制 4.1 包含线程的编译链接 4.2 创建线程 4.3 获得线程自身的ID 4.4 线程终止 4.5 线程等待 4.6 线程分离 4.6 线程…

Java数据结构9-排序

1. 排序的概念及引用 1.1 排序的概念 排序&#xff1a;所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#xff0c;递增或递减的排列起来的操作。 稳定性&#xff1a;假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录…

【Java】垃圾回收学习笔记(一):Root Search 根可达算法+垃圾回收的起点

文章目录 1. 引用计数法优点缺点 2. 可达性分析 Root Search2.1 那些对象是GC Roots2.2 引用的分类2.3 回收方法区 3. 实现细节3.1 GC的起点&#xff1a;节点枚举OopMap&#xff1a;帮助高效的根节点枚举 3.2 何时开始GC&#xff1a;安全点与安全区域如何选取安全点如何让程序进…

在mac下 Vue2和Vue3并存 全局Vue2环境创建Vue3新项目(Vue cli2和Vue cli4)

全局安装vue2 npm install vue-cli -g自行在任意位置创建一个文件夹vue3&#xff0c;局部安装vue3,注意不要带-g npm install vue/cli安装完成后&#xff0c;进入目录&#xff0c;修改vue为vue3 找到vue3/node-moudles/.bin/vue&#xff0c;把vue改成vue3。 对环境变量进行配置…

web安全基础名词概念

本节内容根据小迪安全讲解制作 第一天 域名&#xff1a; 1.1什么是域名&#xff1f; 网域名称(英语&#xff1a;Domain Name&#xff0c;简称&#xff1a;Domain)&#xff0c;简称域名、网域&#xff0c;是由一串用点分隔的字符组成的互联网上某一台计算机或计算机组的名称&a…

java核心-泛型

目录 概述什么是泛型分类泛型类泛型接口泛型方法 泛型通配符分类 泛型类型擦除分类无限制类型擦除有限制类型擦除 问题需求第一种第二种 概述 了解泛型有利于学习 jdk 、中间件的源码&#xff0c;提升代码抽象能力&#xff0c;封装通用性更强的组件。 什么是泛型 在定义类、接…