WebSocket协议定义了两种类型的消息(文本和二进制),但其内容未作定义。该协议定义了一种机制,供客户端和服务器协商在WebSocket之上使用的子协议(即更高级别的消息传递协议),以定义各自可以发送何种消息、格式是什么、每个消息的内容等等。子协议的使用是可选的,但无论如何,客户端和服务器都需要就定义消息内容的一些协议达成一致。
一、概览
STOMP(Simple Text Oriented Messaging Protocol)最初是为脚本语言(如Ruby、Python和Perl)创建的,用于连接到企业 message broker。它被设计用来解决常用信息传递模式的一个最小子集。STOMP可以通过任何可靠的双向流媒体网络协议使用,如TCP和WebSocket。尽管STOMP是一个面向文本的协议,但消息的 payload 可以是文本或二进制。
STOMP是一个基于框架的协议,其框架是以HTTP为模型的。下面列出了STOMP框架的结构:
COMMAND
header1:value1
header2:value2
Body^@
客户端可以使用 SEND
或 SUBSCRIBE
命令来发送或订阅消息,以及描述消息内容和谁应该收到它的 destination
header。这就实现了一个简单的发布-订阅机制,你可以用它来通过 broker 向其他连接的客户端发送消息,或者向服务器发送消息以请求执行某些工作。
当你使用Spring的STOMP支持时,Spring WebSocket 应用程序充当客户的STOMP broker。消息被路由到 @Controller
消息处理方法或简单的内存中 broker,该 broker 跟踪订阅并将消息广播给订阅用户。你也可以将Spring配置为与专门的STOMP broker(如RabbitMQ、ActiveMQ等)合作,进行消息的实际广播。在这种情况下,Spring维护与 broker 的TCP连接,向其转发消息,并将消息从它那里传递给连接的WebSocket客户端。因此,Spring web 应用可以依靠统一的基于HTTP的 security、通用验证和熟悉的编程模型来处理消息。
下面的例子显示了一个客户端订阅接收股票报价,服务器可能会定期发布这些报价(例如,通过一个预定任务,通过 SimpMessagingTemplate
向 broker 发送消息):
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@
下面的例子显示了一个客户端发送了一个交易请求,服务器可以通过 @MessageMapping
方法来处理:
SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@
执行后,服务器可以向客户广播交易确认信息和细节。
在STOMP规范中,destination 的含义是故意不透明的。它可以是任何字符串,而且完全由STOMP服务器来定义它们所支持的 destination 的语义和语法。然而,destination 是非常常见的,它是类似路径的字符串,其中 /topic/..
意味着发布-订阅(一对多),/queue/
意味着点对点(一对一)的消息交换。
STOMP服务器可以使用 MESSAGE
命令向所有订阅者广播信息。下面的例子显示了一个服务器向一个订阅的客户发送一个股票报价:
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@
一个服务器不能发送未经请求的消息。所有来自服务器的消息必须是对特定客户订阅的回应,而且服务器消息的 subscription
header 必须与客户端 subscription
的 id
header 相匹配。
前面的概述是为了提供对STOMP协议最基本的理解。我们建议阅读协议的全部 规范。
二、 好处
使用STOMP作为子协议可以让Spring框架和Spring Security提供更丰富的编程模型,而不是使用原始WebSockets。关于HTTP与原始TCP的对比,以及它如何让Spring MVC和其他Web框架提供丰富的功能,也可以提出同样的观点。以下是一个好处清单:
-
不需要发明一个自定义的消息传输协议和消息格式。
-
STOMP客户端,包括Spring框架中的一个 Java客户端,都是可用的。
-
你可以(选择性地)使用消息代理(如RabbitMQ、ActiveMQ和其他)来管理订阅和广播消息。
-
应用逻辑可以组织在任何数量的
@Controller
实例中,消息可以根据STOMP destination header 被路由到它们,而不是用一个给定连接的单一WebSocketHandler
来处理原始WebSocket消息。 -
你可以使用 Spring Security 来保护基于 STOMP destination 和消息类型的消息。
三、 启用 STOMP
spring-messaging
和 spring-websocket
模块中提供了基于WebSocket的STOMP的支持。一旦你有了这些依赖,你就可以通过 SockJS Fallback 在WebSocket上暴露一个STOMP端点,如下面的例子所示:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic", "/queue");
}
}
/portfolio 是WebSocket(或SockJS)客户端需要连接以进行WebSocket握手的端点的HTTP URL。 |
destination header 以 /app 开头的STOMP消息被路由到 @Controller 类中的 @MessageMapping 方法。 |
使用内置的消息 broker 进行订阅和广播,将 destination header 以 /topic 或 /queue 开头的消息路由到 broker。 |
下面的例子显示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
对于内置的简单 broker,/topic 和 /queue 前缀没有任何特殊含义。它们只是一个惯例,用来区分发布/订阅和点对点的消息传递(即许多订阅者和一个消费者)。当你使用外部 broker 时,请检查该 broker 的STOMP页面,以了解它支持什么样的STOMP destination 和前缀。 |
要从浏览器连接,对于SockJS,你可以使用 sockjs-client。对于STOMP,许多应用程序都使用了 jmesnil/stomp-websocket 库(也称为stomp.js),该库功能完整,已在生产中使用多年,但已不再维护。目前, JSteunou/webstomp-client 是该库的最积极的维护和发展的继承者。下面的示例代码是基于它的:
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}
另外,如果你通过WebSocket(没有SockJS)连接,你可以使用以下代码:
var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}
请注意,前面的例子中的 stompClient
不需要指定 login
和 passcode
header。即使它这样做了,它们在服务器端也会被忽略(或者说,被覆盖)。关于认证的更多信息,请参见 连接到 Broker 和 认证。
更多示例代码见:
-
使用WebSocket构建交互式wen应用程序 — 入门指南。
-
股票证券交易 — 示例应用。
四、 WebSocket 服务器
要配置底层的WebSocket服务器,Server 配置 中的信息适用。然而,对于Jetty,你需要通过 StompEndpointRegistry
设置 HandshakeHandler
和 WebSocketPolicy
:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
五、 消息流
一旦STOMP端点被暴露,Spring应用程序就成为连接客户端的STOMP broker。本节描述了服务器端的消息流。
spring-messaging
模块包含了对消息传递应用的基础支持,这些支持源于 Spring Integration,后来被提取并整合到Spring框架中,以便在许多 Spring projects 和应用场景中更广泛地使用。下面的列表简要地描述了一些可用的消息传递(messaging)抽象:
-
Message: 一个消息的简单表示,包括 header 和 payload。
-
MessageHandler: 处理消息。
-
MessageChannel: 发送消息,使生产者和消费者之间实现松散耦合。
-
SubscribableChannel: 带有
MessageHandler
订阅者的MessageChannel
。 -
ExecutorSubscribableChannel:
SubscribableChannel
,使用一个Executor
来传递消息。
Java 配置(即 @EnableWebSocketMessageBroker
)和 XML 命名空间配置(即 <websocket:message-broker>
)都使用前面的组件来组装一个消息工作流。下图显示了启用简单内置消息 broker 时使用的组件:
前面的图显示了三个信息通道(message channel):
-
clientInboundChannel
: 用于传递从WebSocket客户端收到的消息。 -
clientOutboundChannel
: 用于向WebSocket客户端发送服务器信息。 -
brokerChannel
: 用于从服务器端的应用程序代码中向消息 broker 发送消息。
下图显示了当外部 broker(如 RabbitMQ)被配置为管理订阅和广播消息时使用的组件:
前面两张图的主要区别是使用 “broker relay (中继)”,通过TCP将消息向上传递到外部STOMP broker,并从 broker 向下传递消息到订阅的客户。
当从WebSocket连接中接收到消息时,它们会被解码为STOMP帧,变成Spring Message
表示,并被发送到 clientInboundChannel
进行进一步处理。例如,destination header 以 /app
开头的 STOMP 消息可以被路由到注解 controller 中的 @MessageMapping
方法,而 /topic
和 /queue
消息可以直接被路由到消息 broker。
一个处理来自客户端的STOMP消息的注解的 @Controller
可以通过 brokerChannel
向消息代理发送消息,而代理则通过 clientOutboundChannel
将消息广播给匹配的订阅者。同样的 controller 也可以在响应HTTP请求时做同样的事情,所以客户端可以执行一个HTTP POST,然后一个 @PostMapping
方法可以发送一个消息给消息 broker,以广播给订阅的客户端。
我们可以通过一个简单的例子来追踪这个流程。考虑下面这个例子,它设置了一个服务器:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}
前面的例子支持以下流程:
-
客户端连接到
http://localhost:8080/portfolio
,一旦建立WebSocket连接,STOMP帧就开始在上面流动。 -
客户端发送一个
SUBSCRIBE
帧,destination header 为/topic/greeting
。一旦收到并解码,该消息被发送到clientInboundChannel
,然后被路由到消息 broker,该 broker 存储客户端的订阅。 -
客户端发送一个
SEND
帧到/app/greeting
。/app
前缀有助于将其路由到有注解的 controller 。在/app
前缀被剥离后,剩余的/greeting
部分的 destination 被映射到GreetingController
的@MessageMapping
方法。 -
从
GreetingController
返回的值被转化为SpringMessage
,其 payload 基于返回值和默认的 destination header/topic/greeting
(从输入 destination 派生,用/topic
代替/app
)。产生的消息被发送到brokerChannel
并由消息 broker 处理。 -
消息 broker 找到所有匹配的订阅者,并通过客户端
OutboundChannel
向每个订阅者发送一个MESSAGE
帧,从那里消息被编码为STOMP帧并在WebSocket连接上发送。
下一节将提供关于注解方法的更多细节,包括支持的参数和返回值的种类。
六、 注解式Controller
应用程序可以使用注解的 @Controller
类来处理来自客户端的消息。这样的类可以声明 @MessageMapping
、@SubscribeMapping
和 @ExceptionHandler
方法,如以下主题所述:
-
@MessageMapping
-
@SubscribeMapping
-
@MessageExceptionHandler
@MessageMapping
你可以使用 @MessageMapping
来注解那些根据 destination 路由消息的方法。它在方法层面和类型层面都被支持。在类型层面上,@MessageMapping
被用来表达 controller 中所有方法的共享映射。
默认情况下,映射值是Ant风格的路径模式(例如 /thing*
,/thing/**
),包括对模板变量的支持(例如,/thing/{id}
)。这些值可以通过 @DestinationVariable
方法参数进行引用。应用程序也可以切换到点状分隔的目标约定来进行映射,正如在 使用点作为分隔符 中解释的那样。
支持的方法参数
下表描述了方法的参数:
方法参数 | 说明 |
---|---|
| 为了获得完整的信息。 |
| 用于访问 |
| 用于通过类型化的访问器方法访问 header。 |
| 用于访问消息的 payload,由配置的 这个注解的存在不是必须的,因为默认情况下,如果没有其他参数被匹配,它就会被假定。 你可以用 |
| 用于访问一个特定的 header 值—如果有必要,同时使用 |
| 用于访问消息中的所有 header。这个参数必须是可以分配给 |
| 用于访问从消息 destination 提取的模板变量。必要时,值被转换为声明的方法参数类型。 |
| 反映在WebSocket HTTP握手时登录的用户。 |
返回值
默认情况下,@MessageMapping
方法的返回值通过匹配的 MessageConverter
被序列化为一个 payload,并作为一个 Message
发送到 brokerChannel
,从那里被广播给订阅者。出站消息的 destination 与入站消息的 destination 相同,但前缀为 /topic
。
你可以使用 @SendTo
和 @SendToUser
注解来定制输出消息的 destination。@SendTo
是用来定制目标 destination 或指定多个 destination 的。 @SendToUser
用来指导输出消息只给与输入消息相关的用户。参见 User Destination。
你可以在同一个方法上同时使用 @SendTo
和 @SendToUser
,而且在类的层面上也支持这两种注解,在这种情况下,它们作为类中方法的默认值。然而,请记住,任何方法级的 @SendTo
或 @SendToUser
注解都会覆盖类级的任何此类注解。
消息可以被异步处理,@MessageMapping
方法可以返回 ListenableFuture
、 CompletableFuture
或 CompletionStage
。
请注意,@SendTo
和 @SendToUser
只是一种便利,相当于使用 SimpMessagingTemplate
来发送消息。如果有必要,对于更高级的场景, @MessageMapping
方法可以直接使用 SimpMessagingTemplate
。这可以代替返回值,也可以在返回值之外进行。参见 发送消息。
@SubscribeMapping
@SubscribeMapping
与 @MessageMapping
类似,但只将映射范围缩小到订阅信息。它支持与 @MessageMapping
相同的 方法参数。然而对于返回值,默认情况下,消息被直接发送到客户端(通过 clientOutboundChannel
,对订阅的响应),而不是发送到 broker(通过 brokerChannel
,作为广播给匹配的订阅)。添加 @SendTo
或 @SendToUser
会重写这一行为,并代替发送至 broker。
这在什么时候是有用的?假设 broker 被映射到 /topic
和 /queue
,而应用 controller 被映射到 /app
。在这种设置下,broker 存储了所有对 /topic
和 /queue
的订阅,这些订阅是为了重复广播,而应用程序不需要参与。客户端也可以订阅一些 /app
的 destination,controller 可以在不涉及 broker 的情况下返回一个值,而不需要再次存储或使用该订阅(实际上是一个一次性的请求-回复 exchange)。这方面的一个用例是在启动时用初始数据填充一个用户界面。
这在什么时候没有用?不要试图将 broker 和 controller 映射到同一个目标前缀,除非你想让两者独立处理消息,包括订阅,因为某些原因。入站消息是平行处理的。不能保证一个 broker 或一个 controller 先处理一个给定的消息。如果目标是在订阅被存储并准备好进行广播时得到通知,如果服务器支持的话,客户端应该要求得到一个 receipt (简单的 broker 不支持)。例如,使用Java STOMP client,你可以做以下事情来添加一个 receipt:
@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// During initialization..
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// When subscribing..
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
// Subscription ready...
});
一个服务器端的选择是在 brokerChannel
上 注册 一个 ExecutorChannelInterceptor
,并实现 afterMessageHandled
方法,该方法在消息(包括订阅)被处理后被调用。
@MessageExceptionHandler
一个应用程序可以使用 @MessageExceptionHandler
方法来处理来自 @MessageMapping
方法的异常。你可以在注解本身中声明异常,如果你想获得对异常实例的访问,也可以通过方法参数来声明。下面的例子通过一个方法参数声明了一个异常:
@Controller
public class MyController {
// ...
@MessageExceptionHandler
public ApplicationError handleException(MyException exception) {
// ...
return appError;
}
}
@MessageExceptionHandler
方法支持灵活的方法签名,支持与 @MessageMapping 方法相同的方法参数类型和返回值。
通常情况下,@MessageExceptionHandler
方法适用于它们所声明的 @Controller
类(或类层次结构)。如果你想让这些方法在全局范围内应用(跨控制器),你可以在一个标有 @ControllerAdvice
的类中声明它们。这与Spring MVC中的类似 支持相当。
七、 发送消息
如果你想从应用程序的任何部分向连接的客户端发送消息,怎么办?任何应用程序组件都可以向 brokerChannel
发送消息。最简单的方法是注入一个 SimpMessagingTemplate
并使用它来发送消息。通常情况下,你会按类型注入它,如下例所示:
@Controller
public class GreetingController {
private SimpMessagingTemplate template;
@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}
}
然而,你也可以通过它的名字(brokerMessagingTemplate
)来限定它,如果存在另一个相同类型的bean。
八、Simple Broker
内置的简单 message broker 处理来自客户端的订阅请求,将其存储在内存中,并将消息广播给有匹配目的地的连接客户端。broker 支持类似路径的 destination,包括对 Ant 风格的 destination 模式的订阅。
应用程序也可以使用点分割的(而不是斜线分割的)destination。参见 使用点作为分隔符。 |
如果配置了一个任务调度器(task scheduler),simple broker 就支持 STOMP 心跳。要配置一个调度器,你可以声明你自己的 TaskScheduler
Bean,并通过 MessageBrokerRegistry
来设置它。或者,你可以使用在内置 WebSocket 配置中自动声明的那个,但是,你将’需要 @Lazy
以避免内置 WebSocket 配置和你的 WebSocketMessageBrokerConfigurer
之间的循环。例如:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000})
.setTaskScheduler(this.messageBrokerTaskScheduler);
// ...
}
}
九、 外部 Broker
简单的 broker 很适合入门,但它只支持STOMP命令的一个子集(它不支持acks、receipts和其他一些功能),依赖于一个简单的消息发送循环,并且不适合集群化。作为一种选择,你可以升级你的应用程序,以使用全功能的消息代理。
请参阅你所选择的消息代理(如 RabbitMQ、 ActiveMQ 等)的 STOMP 文档,安装该代理,并在启用 STOMP 支持的情况下运行它。然后你可以在Spring配置中启用STOMP broker 中继(而不是简单的 broker)。
下面的配置示例启用了一个全功能的 broker:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
}
下面的例子显示了前述例子的XML配置等效:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio" />
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
</beans>
前面配置中的STOMP broker 中继器是一个Spring MessageHandler,它通过将消息转发给外部 message broker 来处理消息。为此,它建立了与 broker 的TCP连接,将所有消息转发给 broker ,然后将从 broker 那里收到的所有消息通过WebSocket会话转发给客户。从本质上讲,它充当了一个 "中转站",在两个方向上转发消息。
将 io.projectreactor.netty:reactor-netty 和 io.netty:netty-all 依赖项添加到你的项目中,用于 TCP 连接管理。 |
此外,应用组件(如HTTP请求处理方法、业务服务等)也可以向 broker 中继发送消息,如 发送消息 中所述,以向订阅的WebSocket客户端广播消息。
实际上,broker 中继实现了稳健和可扩展的消息广播。
十、连接到 Broker
STOMP broker 中转站保持着一个与 broker 的单一 "系统" TCP连接。这个连接只用于从服务器端应用程序发出的消息,不用于接收消息。你可以为这个连接配置STOMP凭证(即STOMP框架 login
和 passcode
header)。这在XML命名空间和Java配置中都被暴露为 systemLogin
和 systemPasscode
属性,默认值为 guest
和 guest
。
STOMP broker 中继器还为每个连接的 WebSocket 客户端创建一个单独的TCP连接。你可以配置 STOMP 凭证,这些凭证用于代表客户创建的所有TCP连接。这在XML命名空间和Java配置中都被暴露为 clientLogin
和 clientPasscode
属性,默认值为 guest
和 guest
。
STOMP broker 中继总是在它代表客户转发给 broker 的每个 CONNECT 帧上设置 login 和 passcode header。因此,WebSocket客户端不需要设置这些 header 信息。它们会被忽略。正如 认证 部分所解释的,WebSocket客户端应依靠HTTP认证来保护WebSocket端点并建立客户端身份。 |
STOMP broker 中继也通过 "系统" TCP连接向 message broker 发送和接收心跳。你可以配置发送和接收心跳的时间间隔(默认为10秒)。如果与 broker 的连接丢失,broker 中继会继续尝试重新连接,每5秒一次,直到成功。
任何Spring Bean都可以实现 ApplicationListener<BrokerAvailabilityEvent>
来接收与 broker 的 "系统" 连接丢失和重新建立时的通知。例如,当没有活跃的 "系统" 连接时,一个广播股票报价的股票报价服务可以停止尝试发送消息。
默认情况下,STOMP broker 中继器总是连接到同一主机和端口,并在失去连接时根据需要重新连接。如果你希望提供多个地址,在每次尝试连接时,你可以配置一个地址 supplier,而不是一个固定的主机和端口。下面的例子显示了如何做到这一点:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
你也可以用 virtualHost
属性来配置STOMP broker 中继。这个属性的值被设置为每个 CONNECT
帧的 host
header,可能很有用(例如,在云环境中,建立TCP连接的实际主机与提供基于云的STOMP服务的主机不同)。
十一、 使用点作为分隔符
当消息被路由到 @MessageMapping
方法时,它们会与 AntPathMatcher
匹配。默认情况下,pattern 被期望使用斜线(/
)作为分隔符。这在web应用中是一个很好的惯例,与HTTP URLs类似。然而,如果你更习惯于信息传递的惯例,你可以切换到使用点(.
)作为分隔符。
下面的例子显示了如何在Java配置中这样做:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
下面的例子显示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
<websocket:stomp-endpoint path="/stomp"/>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
<bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
<constructor-arg index="0" value="."/>
</bean>
</beans>
之后,controller 可以在 @MessageMapping
方法中使用点(.
)作为分隔符,如下图所示:
@Controller
@MessageMapping("red")
public class RedController {
@MessageMapping("blue.{green}")
public void handleGreen(@DestinationVariable String green) {
// ...
}
}
客户端现在可以向 /app/red.blue.green123
发送消息。
在前面的例子中,我们没有改变 “broker relay” 上的前缀,因为这些完全取决于外部 message broker。请看你使用的 broker 的STOMP文档页面,看看它对 destination header 支持什么约定。
另一方面,“simple broker” 确实依赖于配置的 PathMatcher
,所以,如果你切换分隔符,这种变化也适用于 broker 和 broker 将消息中的 destination 与订阅中的 pattern 相匹配的方式。
十二、认证
每个基于WebSocket的STOMP 消息会话都从一个HTTP请求开始。这可以是一个升级到WebSocket的请求(即WebSocket握手),或者在SockJS回退的情况下,是一系列的SockJS HTTP传输请求。
许多Web应用程序已经有了认证和授权,以确保HTTP请求的安全。通常情况下,用户通过Spring Security使用某种机制进行认证,如登录页面、HTTP基本认证或其他方式。认证用户的 security context 被保存在HTTP会话中,并与同一基于cookie的会话中的后续请求相关。
因此,对于WebSocket握手或SockJS的HTTP传输请求,通常已经有一个可通过 HttpServletRequest#getUserPrincipal()
访问的认证用户。Spring会自动将该用户与为其创建的WebSocket或SockJS会话联系起来,随后通过 user header 通过该会话传输的所有STOMP消息联系起来。
简而言之,一个典型的web应用程序除了已经做的安全工作外,不需要做任何其他事情。用户在HTTP请求层面通过 security context 进行认证,该 security context 通过基于cookie的HTTP会话(然后与为该用户创建的WebSocket或SockJS会话相关联)进行维护,并导致在流经应用程序的每个 Message
上印上一个 user header。
STOMP协议在 CONNECT
帧上确实有 login
和 passcode
header。这些头信息最初是为通过TCP的STOMP而设计的,并且是需要的。然而,对于通过WebSocket的STOMP,默认情况下,Spring忽略了STOMP协议级别的 authentication header,并假定用户已经在HTTP传输级别进行了认证。我们期望WebSocket或SockJS会话包含认证的用户。
十三、 Token 认证
Spring Security OAuth 提供了对基于令牌的安全支持,包括 JSON Web Token(JWT)。你可以在Web应用程序中使用它作为认证机制,包括上一节所述的基于WebSocket的STOMP交互(也就是通过基于cookie的会话来维护身份)。
同时,基于cookie的会话并不总是最合适的(例如,在不维护服务器端会话的应用中,或者在移动应用中,通常使用 header 信息进行认证)。
WebSocket协议,RFC 6455 "没有规定服务器在WebSocket握手过程中对客户端进行认证的任何特定方式"。然而,在实践中,浏览器客户端只能使用标准 authentication header(即 basic HTTP authentication)或cookies,不能(例如)提供自定义 header。同样地,SockJS的JavaScript客户端也没有提供与SockJS传输请求一起发送HTTP header 的方法。见 sockjs-client issue 196。相反,它确实允许发送查询参数,你可以用它来发送 token,但这也有自己的缺点(例如,token 可能会无意中与服务器日志中的URL一起被记录)。
前面的限制是针对基于浏览器的客户端,不适用于基于Spring Java 的 STOMP client,该客户端确实支持通过WebSocket和SockJS请求发送 header 信息。 |
因此,希望避免使用cookies的应用程序在HTTP协议层面上可能没有任何好的替代认证。与其使用cookies,他们可能更喜欢在STOMP消息协议层面用 header 进行认证。这样做需要两个简单的步骤:
-
在连接时使用STOMP客户端来传递 authentication header 信息。
-
用一个
ChannelInterceptor
来处理 authentication header。
下一个例子使用服务器端配置来注册一个自定义认证拦截器。请注意,拦截器只需要在 CONNECT Message
上进行认证和设置 user header。Spring 会记录并保存认证的用户,并将其与同一会话中的后续STOMP消息相关联。下面的例子展示了如何注册一个自定义认证拦截器:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Authentication user = ... ; // access authentication header(s)
accessor.setUser(user);
}
return message;
}
});
}
}
此外,请注意,当你使用Spring Security的消息授权时,目前你需要确保认证 ChannelInterceptor
配置的顺序在Spring Security的前面。要做到这一点,最好是在自己的 WebSocketMessageBrokerConfigurer
实现中声明自定义拦截器,并标明 @Order(Ordered.HIGHEST_PRECEDENCE + 99)
。
十四、 授权
Spring Security提供了 WebSocket 子协议授权,它使用 ChannelInterceptor
来根据消息中的 user header 授权。此外,Spring Session还提供了 WebSocket integration,确保用户的HTTP会话不会过期,而WebSocket会话仍在活动。
十五、 User Destination
应用程序可以发送针对特定用户的消息,Spring的STOMP支持为此 destination 识别以 /user/
为前缀的 destination。例如,一个客户端可能会订阅 /user/queue/position-updates
destination。UserDestinationMessageHandler
处理这个 destination,并将其转换为用户会话的唯一 destination(如 /queue/position-updates-user123
)。这为订阅一个通用命名的 destination 提供了便利,同时,确保不会与订阅同一 destination 的其他用户发生冲突,以便每个用户都能收到独特的股票位置更新。
当使用用户 destination 时,重要的是要配置 broker 和应用程序的 destination 前缀,如 启用 STOMP, 所示,否则 broker 将处理 "/user" 前缀的消息,而这些消息只应该由 UserDestinationMessageHandler 处理。 |
在发送方面,消息可以被发送到一个 destination,如 /user/{username}/queue/position-updates
,这又被 UserDestinationMessageHandler
翻译成一个或多个destination,每个与用户相关的会话都有一个。这让应用程序中的任何组件都可以发送针对特定用户的消息,而不一定要知道他们的名字和通用destination。这也是通过一个注解和一个消息模板来支持的。
一个消息处理方法可以通过 @SendToUser
注解将消息发送给与被处理的消息相关的用户(在类级上也支持共享一个共同的 destination),如下例所示:
@Controller
public class PortfolioController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
如果用户有一个以上的会话,默认情况下,所有订阅给定 destination 的会话都是目标。然而,有时可能需要只针对发送被处理消息的会话。你可以通过将 broadcast
属性设置为 false
来做到这一点,正如下面的例子所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// raise MyBusinessException here
}
@MessageExceptionHandler
@SendToUser(destinations="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
虽然用户 destination 通常意味着有一个经过认证的用户,但这并不是严格的要求。不与认证用户相关联的WebSocket会话可以订阅用户 destination。在这种情况下, @SendToUser 注解的行为与 broadcast=false 时完全相同(即只针对发送被处理消息的会话)。 |
你可以从任何应用组件向用户 destination 发送消息,例如,通过注入由Java配置或XML命名空间创建的 SimpMessagingTemplate
。(如果需要用 @Qualifier
来限定,bean的名字是 brokerMessagingTemplate
)。下面的例子显示了如何做到这一点:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
}
当你将用户 destination 与外部消息 broker 一起使用时,你应该查看关于如何管理非活动队列的 broker 文档,以便在用户会话结束时,所有独特的用户队列都被删除。例如,当你使用诸如 /exchange/amq.direct/position-updates 等 destination 时,RabbitMQ 会创建自动删除队列。因此,在这种情况下,客户端可以订阅到 /user/exchange/amq.direct/position-updates 。同样地,ActiveMQ也有清除非活动 destination 的 配置选项。 |
在一个多应用服务器的情况下,一个用户 destination 可能会因为用户连接到一个不同的服务器而保持未解析。在这种情况下,你可以配置一个 destination 来广播未解析的消息,以便其他服务器有机会尝试。这可以通过Java配置中 MessageBrokerRegistry
的 userDestinationBroadcast
属性和XML中 message-broker
元素的 user-destination-broadcast
属性完成。
十六、 消息的顺序
来自 broker 的消息被发布到客户端 OutboundChannel
,从那里被写入WebSocket会话。由于该通道由 ThreadPoolExecutor
支持,消息在不同的线程中被处理,而客户端收到的结果序列可能与发布的确切顺序不一致。
如果这是一个问题,请启用 setPreservePublishOrder
标志,如下例所示:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
}
下面的例子显示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker preserve-publish-order="true">
<!-- ... -->
</websocket:message-broker>
</beans>
当该标志被设置时,同一客户端会话中的消息会被一个一个地发布到 clientOutboundChannel
上,这样就保证了发布的顺序。请注意,这将产生一个小的性能开销,所以你应该只在需要时才启用它。
十七、事件
几个 ApplicationContext
事件(event)被发布,并可以通过实现Spring的 ApplicationListener
接口来接收:
-
BrokerAvailabilityEvent
: 指示 broker 何时可用或不可用。虽然 “simple” broker 在启动时立即可用,并在应用程序运行时保持可用,但STOMP “broker relay” 可能会失去与全功能 broker 的连接(例如,如果 broker 被重新启动)。broker 中继有重新连接的逻辑,当它回来时重新建立与 broker 的 "系统" 连接。因此,只要状态从连接变为断开,反之亦然,就会发布这个事件。使用SimpMessagingTemplate
的组件应该订阅这个事件,并避免在 broker 不可用时发送消息。在任何情况下,他们应该准备好在发送消息时处理MessageDeliveryException
。 -
SessionConnectEvent
: 当收到一个新的 STOMP CONNECT 时发布,表示一个新的客户端会话的开始。该事件包含代表连接的消息,包括会话ID、用户信息(如果有的话),以及客户端发送的任何自定义头信息。这对于跟踪客户端会话是很有用的。订阅此事件的组件可以用SimpMessageHeaderAccessor
或StompMessageHeaderAccessor
来包装所包含的消息。 -
SessionConnectedEvent
: 在SessionConnectEvent
发生后不久,当 broker 发送一个 STOMP CONNECTED 帧作为对 CONNECT 的响应时发布。在这一点上,STOMP会话可被视为完全建立。 -
SessionSubscribeEvent
: 当收到一个新的 STOMP SUBSCRIBE 时发布。 -
SessionUnsubscribeEvent
: 当收到一个新的 STOMP UNSUBSCRIBE 时发布。 -
SessionDisconnectEvent
: 在STOMP会话结束时发布。DISCONNECT 可能是由客户端发送的,也可能是在WebSocket会话关闭时自动生成的。在某些情况下,该事件会在每个会话中发布一次以上。对于多个断开事件,组件应该是幂等的。
当你使用一个全功能的 broker 时,如果 broker 暂时不可用,STOMP 的 “broker relay” 会自动重新连接 "系统" 的连接。然而,客户端连接不会自动重新连接。假设心跳被启用,客户端通常会在10秒内注意到 broker 没有回应。客户端需要实现他们自己的重新连接逻辑。 |
十入、 拦截
事件 为STOMP连接的生命周期提供通知,但不是为每个客户端消息提供通知。应用程序也可以注册一个 ChannelInterceptor
来拦截任何消息和处理链中的任何部分。下面的例子显示了如何拦截来自客户端的入站消息:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}
自定义 ChannelInterceptor
可以使用 StompHeaderAccessor
或 SimpMessageHeaderAccessor
来访问消息的信息,如下图所示:
public class MyChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getStompCommand();
// ...
return message;
}
}
应用程序也可以实现 ExecutorChannelInterceptor
,它是 ChannelInterceptor
的一个子接口,在处理消息的线程中具有回调功能。 ChannelInterceptor
对于发送到 channel 的每个消息都会被调用一次,而 ExecutorChannelInterceptor
则在订阅通道消息的每个 MessageHandler
的线程中提供钩子(hook)。
注意,与前面描述的 SessionDisconnectEvent
一样,DISCONNECT 消息可以来自客户端,也可以在WebSocket会话关闭时自动生成。在某些情况下,拦截器可以为每个会话拦截该消息一次以上。对于多个断开连接事件,组件应该是幂等的。
十九、 STOMP 客户端
Spring提供了一个基于 WebSocket 的 STOMP 客户端和一个基于 TCP 的 STOMP 客户端。
首先,你可以创建并配置 WebSocketStompClient
,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的例子中,你可以用 SockJsClient
取代 StandardWebSocketClient
,因为它也是 WebSocketClient
的一个实现。SockJsClient
可以使用WebSocket或基于HTTP的传输作为后退。更多详情,请参见 SockJsClient。
接下来,你可以建立一个连接并为 STOMP 会话提供一个 handler,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话可以使用时,handler 会被通知,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,任何 payload 都可以被发送,并通过配置的 MessageConverter
进行序列化,如下例所示:
session.send("/topic/something", "payload");
你也可以对 destination 进行订阅。subscribe
方法需要一个 handler 来处理订阅上的消息,并返回一个 Subscription
handle,你可以用它来取消订阅。对于每个收到的消息,handler 可以指定 payload 应该被反序列化的目标 Object
类型,如下面的例子所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
为了启用STOMP心跳,你可以用一个 TaskScheduler
来配置 WebSocketStompClient
,并可选择自定义心跳间隔(10 秒写不活动,会导致发送心跳,10 秒读不活动,会关闭连接)。
WebSocketStompClient
只在不活动的情况下发送心跳,即没有其他消息被发送时。当使用外部 broker时,这可能是一个挑战,因为具有非 broker destination 的消息代表活动,但实际上并没有转发给 broker。在这种情况下,你可以在初始化 外部 Broker 时配置一个 TaskScheduler
,以确保在只有非 broker destination 的消息被发送时,心跳也被转发到 broker。
当你使用 WebSocketStompClient 进行性能测试以模拟来自同一台机器的数千个客户端时,请考虑关闭心跳,因为每个连接都会安排自己的心跳任务,而这对于在同一台机器上运行的大量客户端来说并不优化。 |
STOMP协议还支持 receipt,客户端必须添加一个 receipt
头,服务器在处理完发送或订阅后会用一个 RECEIPT 帧来回应。为了支持这一点,StompSession
提供了 setAutoReceipt(boolean)
功能,使每一个后续的发送或订阅事件都会添加一个 receipt
头。另外,你也可以手动添加一个 receipt
头到 StompHeaders
中。发送和订阅都会返回一个 Receiptable
的实例,你可以用它来注册接收成功和失败的回调。对于这个功能,你必须用一个 TaskScheduler
和 receipt
过期前的时间量(默认为15秒)来配置客户端。
请注意,StompSessionHandler
本身就是一个 StompFrameHandler
,这让它除了处理信息处理中的异常的 handleException
回调和处理包括 ConnectionLostException
在内的传输级错误的 handleTransportError
之外,还可以处理ERROR帧。
二十、WebSocket Scope
每个WebSocket会话都有一个 attributes map。该 map 作为 header 附在入站的客户端消息上,可以从 controller 方法中访问,如下例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handle(SimpMessageHeaderAccessor headerAccessor) {
Map<String, Object> attrs = headerAccessor.getSessionAttributes();
// ...
}
}
你可以在 websocket
scope 内声明一个Spring管理的Bean。你可以将 WebSocket scope 的 Bean 注入 controller 和在 clientInboundChannel
上注册的任何通道拦截器中。这些通常是 singleton,生命周期比任何单独的 WebSocket 会话长。因此,你需要为 WebSocket scope 的 Bean 使用 scope proxy 模式,如下例所示:
@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
@PostConstruct
public void init() {
// Invoked after dependencies injected
}
// ...
@PreDestroy
public void destroy() {
// Invoked when the WebSocket session ends
}
}
@Controller
public class MyController {
private final MyBean myBean;
@Autowired
public MyController(MyBean myBean) {
this.myBean = myBean;
}
@MessageMapping("/action")
public void handle() {
// this.myBean from the current WebSocket session
}
}
与任何自定义 scope 一样,Spring在第一次从 controller 访问 MyBean 时初始化一个新的 MyBean
实例,并将该实例存储在WebSocket会话属性中。随后会返回相同的实例,直到会话结束。如前面的例子所示,WebSocket scope 的Bean有所有Spring生命周期方法被调用。
二十一、 性能
谈到性能,没有银弹。许多因素都会影响它,包括消息的大小和数量,应用程序方法是否执行需要阻塞的工作,以及外部因素(如网络速度和其他问题)。本节的目的是提供一个可用配置选项的概述,以及关于如何推理扩展的一些想法。
在一个消息传递的应用程序中,消息是通过通道传递的,用于由线程池支持的异步执行。配置这样一个应用程序需要对通道和消息流有很好的了解。因此,我们建议回顾一下 消息流。
最明显的地方是配置支持 clientInboundChannel
和 clientOutboundChannel
的线程池。默认情况下,两者都被配置为可用处理器数量的两倍。
如果注解方法中的消息处理主要是由CPU约束的,那么 clientInboundChannel
的线程数量应该保持与处理器的数量接近。如果它们所做的工作更多的是IO-bound,需要在数据库或其他外部系统上阻塞或等待,那么线程池的大小可能需要增加。
一个常见的混淆点是,配置核心池大小(例如,10)和最大池大小(例如,20)的结果是线程池有10到20个线程。事实上,如果容量保持在 请参阅 |
在 clientOutboundChannel
方面,它是向WebSocket客户端发送消息的全部内容。如果客户处于告速网络上,线程数应保持接近可用处理器的数量。如果他们速度慢或带宽低,他们需要更长的时间来消费消息,给线程池带来负担。因此,增加线程池的大小成为必要。
虽然 clientInboundChannel
的工作量是可以预测的—毕竟它是基于应用程序所做的事情—但如何配置 "clientOutboundChannel" 则比较困难,因为它是基于应用程序无法控制的因素。出于这个原因,有两个额外的属性与消息的发送有关:sendTimeLimit
和 sendBufferSizeLimit
。你可以使用这些方法来配置在向客户端发送消息时允许发送多长时间以及可以缓冲多少数据。
一般的想法是,在任何时候,只有一个线程可以用来向客户端发送。同时,所有额外的消息都会被缓冲,你可以使用这些属性来决定允许发送一个消息需要多长时间,以及在此期间有多少数据可以被缓冲。关于其他重要的细节,请参见javadoc和XML schema 的文档。
下面的例子显示了一个可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
}
// ...
}
下面的例子显示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport send-timeout="15000" send-buffer-size="524288" />
<!-- ... -->
</websocket:message-broker>
</beans>
你也可以使用前面显示的WebSocket传输配置来配置传入STOMP消息的最大允许大小。理论上,WebSocket消息的大小几乎没有限制。在实践中,WebSocket服务器会施加限制—例如,Tomcat上的8K和Jetty上的64K。出于这个原因,STOMP客户端(如JavaScript webstomp-client 和其他客户端)将较大的STOMP消息以16K的边界分割,并作为多个WebSocket消息发送,这需要服务器进行缓冲和重新组合。
Spring的基于 WebSocket 的 STOMP 支持做到了这一点,因此应用程序可以配置STOMP消息的最大尺寸,而不考虑WebSocket服务器特定的消息尺寸。请记住,如果有必要,WebSocket消息的大小会被自动调整,以确保它们至少可以承载16K的WebSocket消息。
下面的例子显示了一种可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024);
}
// ...
}
下面的例子显示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport message-size="131072" />
<!-- ... -->
</websocket:message-broker>
</beans>
关于扩展的一个重要观点涉及到使用多个应用程序实例。目前,你不能用 simple broker 做到这一点。但是,当你使用全功能 broker(如 RabbitMQ)时,每个应用程序实例都会连接到 broker,并且从一个应用程序实例广播的消息可以通过 broker 广播给通过任何其他应用程序实例连接的 WebSocket 客户端。
二十二、 监控
当你使用 @EnableWebSocketMessageBroker
或 <websocket:message-broker>
时,关键的基础设施组件会自动收集统计信息和计数器,从而提供对应用程序内部状态的重要洞察力。该配置还声明了一个 WebSocketMessageBrokerStats
类型的Bean,它在一个地方收集所有可用的信息,并默认每30分钟在 INFO
级别记录一次。这个Bean可以通过Spring的 MBeanExporter
导出到JMX,以便在运行时查看(例如,通过JDK的 jconsole
)。下面的列表总结了可用的信息:
Client WebSocket Sessions
Current
表示当前有多少个客户端会话,该计数按WebSocket与HTTP流和轮询SockJS会话进一步细分。
Total
表示已经建立的总会话数量。
Abnormally Closed
Connect Failures
已建立的会话,但在60秒内没有收到任何信息后被关闭。这通常是代理或网络问题的一个迹象。
Send Limit Exceeded
会话在超过配置的发送超时或发送缓冲区限制后关闭,这可能发生在慢速客户端上(见前一节)。
Transport Errors
在发生传输错误后关闭的会话,例如未能读取或写入WebSocket连接或HTTP请求或响应。
STOMP Frames
处理的CONNECT、CONNECTED和DISCONNECT帧的总数,表示有多少客户端在STOMP层连接。请注意,当会话被异常关闭或客户端关闭而没有发送DISCONNECT帧时,DISCONNECT计数可能较低。
STOMP Broker Relay
TCP Connections
表示有多少代表客户WebSocket会话的TCP连接被建立到代理。这应该等于客户WebSocket会话的数量+1个额外的共享 "系统" 连接,用于从应用程序内发送消息。
STOMP Frames
代表客户端转发到 broker 处或从 broker 处接收的 CONNECT、CONNECTED 和 DISCONNECT 帧的总数。请注意,无论客户WebSocket会话是如何关闭的,DISCONNECT 帧都会被发送给 broker。因此,较低的 DISCONNECT 帧计数表明 broker 正在主动关闭连接(可能是因为心跳没有及时到达,无效的输入帧,或其他问题)。
Client Inbound Channel
来自支持 clientInboundChannel
的线程池的统计数据,提供了对传入消息处理的健康状况的洞察力。任务在这里排队是一个迹象,表明应用程序可能太慢,无法处理消息。如果有I/O绑定的任务(例如,缓慢的数据库查询,对第三方REST API的HTTP请求,等等),考虑增加线程池大小。
Client Outbound Channel
来自支持 clientOutboundChannel
的线程池的统计数据,提供了对向客户广播消息的健康状况的洞察力。任务在这里排队是一个迹象,表明客户对消息的消费太慢了。解决这个问题的方法之一是增加线程池的大小,以适应预期的并发慢速客户端的数量。另一个办法是减少发送超时和发送缓冲区大小的限制(见上一节)。
SockJS Task Scheduler
来自SockJS任务调度器的线程池的统计数据,用于发送心跳。注意,当心跳在STOMP级别上协商时,SockJS的心跳被禁用。
二十三、 测试
当你使用Spring的基于 WebSocket 的 STOMP 支持时,有两种主要方法来测试应用程序。第一种是编写服务器端测试,以验证 controller 的功能和它们注解的消息处理方法。第二种是编写完整的端到端测试,包括运行一个客户端和一个服务器。
这两种方法并不相互排斥。相反,每种方法在整个测试策略中都有其位置。服务器端的测试更有针对性,更容易编写和维护。另一方面,端到端的集成测试更完整,测试的内容更多,但它们也更需要编写和维护。
服务器端测试的最简单形式是编写 controller 单元测试。然而,这还不够有用,因为 controller 所做的很多事情都取决于它的注解。纯粹的单元测试根本无法测试这些。
理想情况下,被测试的 controller 应该在运行时被调用,就像使用Spring MVC测试框架测试处理HTTP请求的 controller 的方法一样—也就是说,不运行Servlet容器,而是依靠Spring框架来调用注解的 controller。与Spring MVC测试一样,你在这里有两种可能的选择,要么使用 "基于 context",要么使用 "独立 "设置:
-
在Spring TestContext框架的帮助下加载实际的Spring配置,注入
clientInboundChannel
作为测试字段,并使用它来发送 controller 方法所要处理的消息。 -
手动设置调用 controller 所需的最小Spring框架基础设施(即
SimpAnnotationMethodMessageHandler
),并将 controller 的消息直接传递给它。
这两种设置情况在 股票投资组合 样本应用程序的测试中都有展示。
第二种方法是创建端到端的集成测试。为此,你需要在嵌入式模式下运行WebSocket服务器,并作为WebSocket客户端连接到它,发送包含STOMP框架的WebSocket消息。 股票投资组合示例应用程序的测试 也展示了这种方法,它使用Tomcat作为嵌入式WebSocket服务器和一个简单的STOMP客户端进行测试。