大纲
- Rabbitmq
- 开启STOMP支持
- 服务端
- 依赖
- 参数
- 参数映射类
- 配置类
- 逻辑处理类
- 测试
- 测试页面
- Controller
- 测试案例
在《Websocket在Java中的实践——STOMP通信的最小Demo》一文中,我们使用enableSimpleBroker启用一个内置的内存级消息代理。本文我们将使用Rabbitmq作为消息代理,这样我们的服务就可以变成分布式部署。
Rabbitmq
开启STOMP支持
在Rabbitmq所在的机器上执行下面的命令:
sudo -H -u rabbitmq bash -c "/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp"
然后启动Rabbitmq
sudo service rabbitmq-server start
服务端
依赖
spring-boot-starter-websocket用于Websocket服务。
spring-boot-starter-amqp和spring-rabbit-stream都是用于Rabbitmq操作。
reactor-netty用于Broker。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.1.20</version>
</dependency>
参数
src/main/resources/application.properties
需要注意的是,rabbitmq_stomp启动后会开启61613端口。
spring.rabbitmq.host=172.30.254.255
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=fangliang
spring.rabbitmq.stomp.port=61613
还有一点需要注意,很多文章上说使用guest用户登录。但是guest用户只能在Rabbitmq所在的机器上使用,如果跨机器使用会报下列错误。而且这和是否设置guest为全域无关。所以我们使用admin账户。
Received ERROR {message=[Bad CONNECT], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[26]} session=system text/plain payload=non-loopback access denied
spring.rabbitmq.stomp.port是一个自定义参数,它只是供Broker连接Rabbitmq使用。
spring.rabbitmq.port在当前本文例子中没有使用。
参数映射类
这个类主要是映射上述参数,方便后续使用。
src/main/java/com/nyctlc/stomprbmq/component/RabbitMQProperties.java
package com.nyctlc.stomprbmq.component;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQProperties {
@Value("${spring.rabbitmq.password}")
private String rabbitmqPassword;
public String getRabbitmqPassword() {
return rabbitmqPassword;
}
@Value("${spring.rabbitmq.username}")
private String rabbitmqUsername;
public String getRabbitmqUsername() {
return rabbitmqUsername;
}
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
public String getRabbitmqHost() {
return rabbitmqHost;
}
@Value("${spring.rabbitmq.port}")
private String rabbitmqPort;
public String getRabbitmqPort() {
return rabbitmqPort;
}
@Value("${spring.rabbitmq.stomp.port}")
private String rabbitmqStompPort;
public String getRabbitmqStompPort() {
return rabbitmqStompPort;
}
}
配置类
/handshake是STOMP和Websocket建立握手的接口。
enableStompBrokerRelay(“/topic”)会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列。(所以我们看到很多文章订阅的前缀使用的是“topic”,而不用其他字段,这是有渊源的)
setRelayPort方法传递的是Rabbitmq的STOMP端口,即61613。
setClientLogin、setClientPasscode、setSystemLogin和setSystemPasscode都要设置为admin及其密码,否则会报错。
src/main/java/com/nyctlc/stomprbmq/config/WebSocketConfig.java
package com.nyctlc.stomprbmq.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import com.nyctlc.stomprbmq.component.RabbitMQProperties;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private RabbitMQProperties rabbitMQProperties;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/handshake");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/send");
registry.enableStompBrokerRelay("/topic")
.setRelayHost(rabbitMQProperties.getRabbitmqHost())
.setRelayPort(Integer.parseInt(rabbitMQProperties.getRabbitmqStompPort()))
.setClientLogin(rabbitMQProperties.getRabbitmqUsername())
.setClientPasscode(rabbitMQProperties.getRabbitmqPassword())
.setSystemLogin(rabbitMQProperties.getRabbitmqUsername())
.setSystemPasscode(rabbitMQProperties.getRabbitmqPassword());
}
}
逻辑处理类
这个类的handle方法会接受/send/msg-from-user端点发来的消息,然后转发给Rabbitmq的amp.topic交换器下msg-to-user路由键对应的队列。上述代码创建的Broker会持续监听这个队列,如果收到消息,则发送给客户端。
src/main/java/com/nyctlc/stomprbmq/controller/WebSocketController.java
package com.nyctlc.stomprbmq.controller;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class WebSocketController {
@MessageMapping("/msg-from-user")
@SendTo("/topic/msg-to-user")
public String handle(String msg) {
System.out.println("Received message: " + msg);
return msg;
}
}
测试
测试页面
src/main/resources/static/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>STOMP over WebSocket Example with StompJs.Client</title>
<script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs"></script>
</head>
<body>
<h2>STOMP over WebSocket Example with StompJs.Client</h2>
<button id="connectButton">Connect</button>
<form id="messageForm">
<input type="text" id="messageInput" placeholder="Type a message..."/>
<button type="submit">Send</button>
</form>
<div id="messages"></div>
<script>
var client = null;
function connect() {
client = new StompJs.Client({
brokerURL: 'ws://localhost:8080/handshake', // WebSocket服务端点
connectHeaders: {},
debug: function (str) {
console.log(str);
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
});
client.onConnect = function(frame) {
console.log('Connected: ' + frame);
client.subscribe('/topic/msg-to-user', function(message) { // 订阅端点
showMessageOutput(JSON.parse(message.body).content);
});
};
client.onStompError = function(frame) {
console.error('Broker reported error: ' + frame.headers['message']);
console.error('Additional details: ' + frame.body);
};
client.activate();
}
function sendMessage(event) {
event.preventDefault(); // 阻止表单默认提交行为
var messageContent = document.getElementById('messageInput').value.trim();
if(messageContent && client && client.connected) {
var chatMessage = { content: messageContent };
client.publish({destination: "/send/msg-from-user", body: JSON.stringify(chatMessage)}); // 发送端点
document.getElementById('messageInput').value = '';
}
}
function showMessageOutput(message) {
var messagesDiv = document.getElementById('messages');
var messageElement = document.createElement('div');
messageElement.appendChild(document.createTextNode(message));
messagesDiv.appendChild(messageElement);
}
document.getElementById('messageForm').addEventListener('submit', sendMessage);
document.getElementById('connectButton').addEventListener('click', connect);
</script>
</body>
</html>
Controller
这个Controller主要是为了让上述HTML可以通过URL访问。
src/main/java/com/nyctlc/stomprbmq/controller/FileController.java
package com.nyctlc.stomprbmq.controller;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
@Controller
public class FileController {
@GetMapping("/")
public String index() {
return "index"; // 返回index.html
}
@RequestMapping(value = "/favicon.ico")
@ResponseStatus(value = HttpStatus.NO_CONTENT)
public void favicon() {
// No operation. Just to avoid 404 error for favicon.ico
}
}
测试案例
我们在管理后台直接给这个队列发送消息,前端页面也会收到。比如我们发送{“content”:“message from management”}