Spring Cloud Stream是Spring Cloud体系内的一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,其目的是简化消息业务在Spring Cloud应用中的开发。
Spring Cloud Stream的架构图如下所示,应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定的中间件绑定器Binder实现连接到外部代理。
Spring Cloud Stream的实现基于发布/订阅机制,核心由四部分构成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。
Spring Messaging:Spring Framework中的统一消息编程模型,其核心对象如下:
- Message: 消息对象,包含消息头Header和消息体Payload。
- MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送致消息通道。
- MessageHandler:消息处理器接口,用于处理消息逻辑。
Spring Integration:Spring Framework中用于支持企业集成的一种扩展机制,作用是提供一个简单的模型来构建企业集成解决方案,对Spring Messaging进行了扩展。
- MessageDispatcher: 消息分发接口,用于分发消息和添加删除消息处理器。
- MessageRouter:消息路由接口,定义默认的输出消息通道。
- Filter:消息的过滤注解,用于配置消息过滤表达式。
- Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
- Splitter:消息的分割,用于将一条消息拆分成多条。
Binders:目标绑定器,负责与外部消息中间件系统集成的组件。
- doBindProducer:绑定消息中间件客户端发送消息模块。
- doBindConsumer:绑定消息中间件客户端接收消息模块。
Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。
Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于将RocketMQ集成到Spring Cloud Stream。
Spring Cloud Alibaba RocketMQ架构图
Spring Cloud Alibaba RocketMQ的架构图如下所示:
- MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口。
- MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口。
- Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
- Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
Spring Cloud Stream消息发送流程
Spring Cloud Stream消息发送流程如下图所示,包括发送、订阅、分发、委派、消息处理等,具体实现如下:
在业务代码中调用MessageChannel接口的Send()方法,例如source.output().send(message)。
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1L;
default boolean send(Message<?> message) {
return this.send(message, -1L);
}
boolean send(Message<?> var1, long var2);
}
AbstractMessageChannel是消息通道的基本实现类,提供发送消息和接收消息的公用方法。
@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {
public boolean send(Message<?> messageArg, long timeout) {
// 省略部分代码
sent = this.doSend(message, timeout);
// 省略部分代码
return sent;
}
protected abstract boolean doSend(Message<?> var1, long var2);
}
消息发送到AbstractSubscribableChannel类实现的doSend()方法。
protected boolean doSend(Message<?> message, long timeout) {
try {
return this.getRequiredDispatcher().dispatch(message);
} catch (MessageDispatchingException var6) {
String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";
throw new MessageDeliveryException(message, description, var6);
}
}
通过消息分发类MessageDispatcher把消息分发给MessageHandler。
private MessageDispatcher getRequiredDispatcher() {
MessageDispatcher dispatcher = this.getDispatcher();
Assert.state(dispatcher != null, "'dispatcher' must not be null");
return dispatcher;
}
protected abstract MessageDispatcher getDispatcher();
从AbstractSubscribableChannel的实现类DirectChannel得到MessageDispatcher的实现类UnicastingDispatcher。
public class DirectChannel extends AbstractSubscribableChannel {
protected UnicastingDispatcher getDispatcher() {
return this.dispatcher;
}
}
调用dispatch()方法把消息分发给各个MessageHandler。
public class UnicastingDispatcher extends AbstractDispatcher {
public final boolean dispatch(Message<?> message) {
if (this.executor != null) {
Runnable task = this.createMessageHandlingTask(message);
this.executor.execute(task);
return true;
} else {
return this.doDispatch(message);
}
}
private boolean doDispatch(Message<?> message) {
if (this.tryOptimizedDispatch(message)) {
return true;
} else {
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
} else {
ArrayList exceptions = null;
while(!success && handlerIterator.hasNext()) {
MessageHandler handler = (MessageHandler)handlerIterator.next();
try {
handler.handleMessage(message);
success = true;
} catch (Exception var9) {
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {
return "Dispatcher failed to deliver Message";
}, var9);
if (exceptions == null) {
exceptions = new ArrayList();
}
exceptions.add(runtimeException);
boolean isLast = !handlerIterator.hasNext();
if (!isLast && this.failover) {
this.logExceptionBeforeFailOver(var9, handler, message);
}
this.handleExceptions(exceptions, message, isLast);
}
}
return success;
}
}
}
}
遍历所有MessageHandler,调用handlerMessage()处理消息。
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
Set<MessageHandler> handlers = this.getHandlers();
return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();
}
查看MessageHandler是从哪里来的,也就是handlers列表中的MessageHandler是如何添加的。
public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {
public boolean subscribe(MessageHandler handler) {
MessageDispatcher dispatcher = this.getRequiredDispatcher();
boolean added = dispatcher.addHandler(handler);
this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);
return added;
}
}
AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()添加到handlers列表。
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {
public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {
// 创建Producer的messageHandler
final MessageHandler producerMessageHandler;
final ProducerDestination producerDestination;
try {
// 省略部分代码
producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);
// 省略部分代码
// 创建SendingHandler并调用subscribe
((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));
// 省略部分代码
}
}
Producer的MessageHandler是由消息中间件Binder来完成的,Spring Cloud Stream提供了创建MessageHandler的规范。
AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加载所有Bean。