引言
在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖。
市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。
Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。
什么是 Spring Cloud Stream
Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。
基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。
简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。
图一
主要概念:
1. application model(应用模型)
图二.Spring Cloud Stream 应用程序
由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder
与其建立联系,发送消息时应用程序通过 outputs
通道将消息传递给 Binder
,Binder
再把消息给消息中间件。接收消息时消息中间件将消息传递给 Binder
,Binder
再把消息通过 inputs
通道传递给应用程序。
比如 Kafka Binder 依赖如下图:
图三 spring cloud stream kafka依赖
2. The Binder Abstraction(Binder抽象)
Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。
Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。
Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。
3. Programming Model(编程模型)
核心概念
-
Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。
-
Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。
-
Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。
图四
环境搭建
本文环境:
-
Java:17
-
Spring Boot:3.0.2
-
Spring Cloud:2022.0.2
-
Spring Cloud Alibaba:2022.0.0.0
maven依赖配置
pom.xml
依赖如下:
消息驱动jar,用哪个mq引入哪个即可。
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
配置文件
application.yml
RocketMq 配置信息:
spring:
cloud:
stream:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876;127.0.0.1:9877
function:
# 组装和绑定
definition: myTopicC
binders:
default:
type: rocketmq
bindings:
## 生产者 新版本固定格式 函数名-{out/in}-{index}
demoChannel-out-0:
destination: boot-mq-topic
## 消费者 新版本固定格式 函数名字-{out/in}-{index}
demoChannel-in-0:
destination: boot-mq-topic
application.yml
Kafka 配置信息:
spring:
cloud:
stream:
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
function:
# 组装和绑定
definition: myTopicC
binders:
default:
type: kafka
bindings:
## 生产者 新版本固定格式 函数名-{out/in}-{index}
demoChannel-out-0:
destination: boot-mq-topic
## 消费者 新版本固定格式 函数名字-{out/in}-{index}
demoChannel-in-0:
destination: boot-mq-topic
消息生产者
创建一个简单的消息生产者:
@RestController
@Slf4j
public class ProducerStream {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/test-stream")
public String testStream() {
streamBridge.send("demoChannel-out-0",
MessageBuilder
.withPayload("消息体")
.build()
);
return "success";
}
}
消息消费者
创建一个消息消费者来接收消息:
@Slf4j
@Configuration
public class TestStreamConsumer {
@Bean
public Consumer<String> demoChannel() {
return message -> {
log.info("demoChannel接到消息:{}", message);
};
}
}
假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。
在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。
具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names
Spring Cloud Stream 发送消息流程
图五 spring cloud stream消息流程图
消息模型
通过图三可以看到 Sping Cloud Stream
的依赖关系。
Sping Cloud Stream -> Spring Integration -> Spring Messaging
可以看出来 Sping Cloud Stream
是基于 Spring Integration
做了一层封装,是依赖于 Spring Integration
这个组件的,而 Spring Integration
则依赖于 Spring Messaging
组件来实现消息处理机制的基础设施。
Spring Integration
是对Spring Messaging
的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。
各个异构系统相互集成时,Spring Integration
通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。Spring Messaging
是Spring
框架中的一个底层模块,用于提供统一的消息编程模型。
消息 Message
接口定义:
public interface Message<T> {
//消息体
T getPayload();
//消息头
MessageHeaders getHeaders();
}
消息通道 MessageChannel
接口定义:
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
//发送消息,无限期阻塞
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
//发送消息,阻塞直到到达指定超时时间
boolean send(Message<?> message, long timeout);
}
消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。
消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。
Spring message
把通道抽象成两种基本表现形式
-
支持轮询的 PollableChannel
-
实现发布-订阅模式的 SubscribableChannel
这两个通道都继承自具有消息发送功能的 MessageChannel
。
public interface SubscribableChannel extends MessageChannel {
//通过注册回调函数MessageHandler来实现事件响应
//注册消息处理器
boolean subscribe(MessageHandler handler);
//取消注册消息处理器
boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {
//通过轮询操作主动获取消息
//从通道中接收消息
@Nullable
Message<?> receive();
//指定超时时间,从通道中接收消息
@Nullable
Message<?> receive(long timeout);
}
MessageHandler接口定义:
@FunctionalInterface
public interface MessageHandler {
//处理消息方法
void handleMessage(Message<?> message) throws MessagingException;
}
再回到图五流程图中,我们最终可以看到 Kafka
和 RocketMQ
通过继承 AbstractMessageHandler
抽象类( AbstractMessageHandler
抽象类是实现了 MessageHandler
接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder
代码中来实现。
结论
回到我们的主题,Spring Cloud Stream
如何屏蔽不同 MQ
带来的差异性?
-
统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。
-
Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的
Binder
实现。 -
自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。
-
高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,
Spring Cloud Stream
提供了抽象层,由不同的消息队列去实现。
参考资料
-
官方文档:Spring Cloud Stream Reference Guide
-
《Spring核心技术和案例实战》