文章目录
- 逻辑实现
- RabbitExchangeEnum
- RabbitConfig
- RabbitModuleInfo
- RabbitModuleInitializer
- RabbitProperties
- RabbitProducerManager
- POM.xml
- spring.factories
- 功能测试
- application.yml配置
- 生产者:
- 消费者:
- 测试结果:
- 总结
本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter
逻辑实现
下面直接进入主题,介绍整体用到的文件和逻辑内容
RabbitExchangeEnum
交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机
/**
* @Author 码至终章
* @Version 1.0
*/
public enum RabbitExchangeEnum {
DIRECT,
TOPIC,
FANOUT,
HEADERS;
}
RabbitConfig
初始化配置文件
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author 码至终章
* @Version 1.0
*/
@Configuration
public class RabbitConfig {
/**
* 通过yaml配置,创建队列、交换机初始化器
*/
@Bean
@ConditionalOnMissingBean
public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);
}
}
RabbitModuleInfo
配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;
import java.util.Map;
/**
* 队列和交换机机绑定关系实体对象
*
* @Author 码至终章
* @Version 1.0
*/
@Data
public class RabbitModuleInfo {
/**
* 路由Key
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
* 默认直连交换机
*/
private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化
* 默认true持久化,重启消息不会丢失
*/
private boolean durable = true;
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
* 默认false,不自动删除
*/
private boolean autoDelete = false;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
/**
* 队列信息类
*/
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化
* 默认true持久化,重启消息不会丢失
*/
private boolean durable = true;
/**
* 是否具有排他性
* 默认false,可多个消费者消费同一个队列
*/
private boolean exclusive = false;
/**
* 当消费者均断开连接,是否自动删除队列
* 默认false,不自动删除,避免消费者断开队列丢弃消息
*/
private boolean autoDelete = false;
/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;
/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;
private Map<String, Object> arguments;
}
}
RabbitModuleInitializer
执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。
import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @Author cys
* @Date 2024/6/17 14:23
* @Version 1.0
*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {
AmqpAdmin amqpAdmin;
RabbitProperties rabbitProperties;
public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
this.amqpAdmin = amqpAdmin;
this.rabbitProperties = rabbitProperties;
}
@Override
public void afterSingletonsInstantiated() {
log.info("初始化rabbitmq交换机、队列----------------start");
declareRabbitModule();
log.info("初始化rabbitmq交换机、队列----------------end");
}
/**
* RabbitMQ 根据配置动态创建和绑定队列、交换机
*/
private void declareRabbitModule() {
List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();
if (CollectionUtils.isEmpty(rabbitModuleInfos)) {
return;
}
for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
configParamValidate(rabbitModuleInfo);
// 队列
Queue queue = convertQueue(rabbitModuleInfo.getQueue());
// 交换机
Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
// 绑定关系
String routingKey = rabbitModuleInfo.getRoutingKey();
String queueName = rabbitModuleInfo.getQueue().getName();
String exchangeName = rabbitModuleInfo.getExchange().getName();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
// 创建队列
if (!isExistQueue(queueName)) {
amqpAdmin.declareQueue(queue);
}
// 创建交换机
amqpAdmin.declareExchange(exchange);
// 队列 绑定 交换机
amqpAdmin.declareBinding(binding);
}
}
/**
* RabbitMQ动态配置参数校验
*
* @param rabbitModuleInfo 队列和交换机机绑定关系
*/
public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));
Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));
Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));
Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));
}
/**
* 转换生成RabbitMQ队列
*
* @param queue 队列
* @return Queue
*/
public Queue convertQueue(RabbitModuleInfo.Queue queue) {
Map<String, Object> arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null && arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 是否需要绑定死信队列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap<>(4);
}
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 转换生成RabbitMQ交换机
*
* @param exchangeInfo 交换机信息
* @return Exchange
*/
public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {
AbstractExchange exchange = null;
RabbitExchangeEnum exchangeType = exchangeInfo.getType();
String exchangeName = exchangeInfo.getName();
boolean isDurable = exchangeInfo.isDurable();
boolean isAutoDelete = exchangeInfo.isAutoDelete();
Map<String, Object> arguments = exchangeInfo.getArguments();
switch (exchangeType) {
case DIRECT:
// 直连交换机
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case TOPIC:
// 主题交换机
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case FANOUT:
//扇形交换机
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case HEADERS:
// 头交换机
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
}
return exchange;
}
/**
* 判断队列是否存在
*
* @param queueName 队列名
* @return boolean
*/
private boolean isExistQueue(String queueName) {
if (StringUtils.isBlank(queueName)) {
throw new RuntimeException("队列名称为空");
}
boolean flag = true;
Properties queueProperties = amqpAdmin.getQueueProperties(queueName);
if (queueProperties == null) {
flag = false;
}
return flag;
}
}
RabbitProperties
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author 码至终章
* @Version 1.0
*/
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {
private List<RabbitModuleInfo> modules;
}
RabbitProducerManager
发送消息的生产者方法
public class RabbitProducerManager {
private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);
private final RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String rabbitRouting, Object message) {
this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message);
}
public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {
this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});
}
public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
POM.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.7.18</version>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties
功能测试
application.yml配置
spring:
profiles:
active: dev
## rabbitmq链接配置
rabbitmq:
host: 192.168.199.199
port: 5672
username: test
password: 123456789
virtual-host: test
cys:
rabbit:
modules:
- exchange:
name: mytest
#type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct
type: DIRECT
queue:
name: default.queue
arguments:
# 队列中所有消息的最大存活时间。单位毫秒。 1分钟
x-message-ttl: 60000
# routing-key可以为空
routing-key: default.queue.key
生产者:
@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {
/**
* 主键
*/
@TableId(type = IdType.AUTO)
@TableField(value = "cust_id")
private Long custId;
}
@RestController
@RequestMapping("/mqtest")
public class MqController {
@Autowired
RabbitProducerManager rabbitProducerManager;
@Autowired
MailService mailService;
@GetMapping("/mqtest")
public void test(){
TaskEntity taskEntity = new TaskEntity();
taskEntity.setCustId(211212L);
rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));
}
}
消费者:
@Component
public class MyListener {
@RabbitListener(queues = "default.queue")
public void handMessage(String message){
TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);
System.out.println("接收到的消息"+taskEntity);
}
}
测试结果:
请求接口/mqtest/mqtest
总结
到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。