前面几个小节我们已经学习了stomp协议,了解了stomp客户端的用法,并且搭建了一个小的后台demo,前端页面通过html页面与后端服务建立WebSocket连接。发送消息给后端服务。后端服务将消息内容原样送回。通过这个demo我们学习了前端stomp客户端的用法,同时也学到了如何通过spring-boot来搭建一个websocket平台。
本节我们将继续深入,搭建一套可用于生产的websocket平台。
基本介绍
websocket连接推送服务包含两个服务,websocket-connector和websocket-gateway。
架构如上图
websocket-connector
- 作为和客户端建立websocket连接的服务,负责消息的接收和推送
websocket-gateway
- 作为后台服务,提供http接口给其他微服务,其他微服务可以通过http接口发送消息给指定的用户
使用说明
通过下面的步骤来进行调试
- 分别启动项目websocket-connector和websocket-gateway
- 访问下面接口,获取某个用户的token
下面示例是获取用户1001的token
curl -X POST -H "Accept:*/*" -H "Content-Type:application/json" -d "{\"userId\":\"1001\"}" "http://localhost:8081/api/ws/token/get"
-
打开websocket-connector调试页面http://localhost:8080/index.html
将上一个接口获取到的token作为参数,与服务器建立连接
-
通过页面的send按钮,发送一条消息给服务器,同时服务器会将此消息回复给前端页面。参考上图
-
通过websocket-gateway的接口,发送用户单播消息
curl -X POST -H "Accept:*/*" -H "Content-Type:application/json" -d "{\"appCode\":\"test2\",\"messageData\":{\"body\":\"single message\",\"headers\":{}},\"topic\":\"/user/topic/single/hello\",\"userIds\":[1001]}" "http://localhost:8081/api/ws/message/single/send"
前端页面可以收到该消息的推送
6.通过websocket-gateway的接口,发送广播消息
curl -X POST -H "Accept:*/*" -H "Content-Type:application/json" -d "{\"appCode\":\"test1\",\"messageData\":{\"body\":\"hello board message1\",\"headers\":{}},\"topic\":\"/topic/boardCast/hello\"}" "http://localhost:8081/api/ws/message/boardCast/send"
主要代码分析
前端代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>STOMP</title>
</head>
<body onload="disconnect()">
<h1 id="tip">消息发布订阅测试页</h1>
请输入token:<input type="text" id="token" placeholder=""> <br>
<button onclick="connect()" id="connect">connect</button>
<button onclick="disconnect()" id="disconnect">disconnect</button>
<p>输入消息: <span id="msg"></span></p>
<input type="text" id="content" placeholder=""> <br>
<button onclick="send()">send</button>
<ul id="ul">
回答消息<p id="answer"></p>
单播消息<p id="single"></p>
广播消息<p id="board"></p>
</ul>
<script src="sockjs.min.js"></script>
<script src="stomp.min.js"></script>
<script>
var stompClient = null;
var endpoint = "/ws";
//断开连接
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnect(false);
console.log("disconnected");
}
//建立连接
function connect() {
var token = document.getElementById("token").value;
if (token === '' || token === undefined) {
alert("请输入token");
return;
}
//连接请求头里面,设置好我们提前获取好的token
var headers = {
token: token
};
var url = endpoint;
var socket = new SockJS(url);
stompClient = Stomp.over(socket);
//建立连接
stompClient.connect(headers, function (msg) {
setConnect(true);
console.log("connected:" + msg);
//订阅了三个topic
//订阅用户消息topic1
stompClient.subscribe("/user/topic/answer", function (response) {
createElement("answer", response.body);
});
//订阅用户消息topic2
stompClient.subscribe("/user/topic/single/hello", function (response) {
createElement("single", response.body);
});
//订阅广播消息topic
stompClient.subscribe("/topic/boardCast/hello", function (response) {
createElement("board", response.body);
});
});
}
//主动发送消息给服务器,对应的后端topic为/app/echo
function send() {
var value = document.getElementById("content").value;
var msg = {
msgType: 1,
content: value
};
stompClient.send("/app/echo", {}, JSON.stringify(msg));
}
function createElement(eleId, msg) {
var singleP = document.getElementById(eleId);
var p = document.createElement("p");
p.style.wordWrap = "break-word";
p.appendChild(document.createTextNode(msg));
singleP.appendChild(p);
}
function setConnect(connected) {
document.getElementById("connect").disabled = connected;
document.getElementById("disconnect").disabled = !connected;
}
</script>
</body>
</html>
websocket-connector端的代码
会话的鉴权及连接建立
@Slf4j
@Component
public class WebSocketInboundInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor == null) {
return message;
}
//建立连接
if (Objects.equals(accessor.getCommand(), StompCommand.CONNECT)) {
connect(message, accessor);
}
return message;
}
/**
* 建立会话
*
* @param message
* @param accessor
*/
private void connect(Message<?> message, StompHeaderAccessor accessor) {
String token = accessor.getFirstNativeHeader(WsConstants.TOKEN_HEADER);
if (StringUtils.isEmpty(token)) {
throw new MessageDeliveryException("token missing!");
}
String userId = TokenUtil.parseToken(token);
if (StringUtils.isEmpty(userId)) {
throw new MessageDeliveryException("userId missing!");
}
String simpleSessionId = (String) message.getHeaders().get(SimpMessageHeaderAccessor.SESSION_ID_HEADER);
UserSession userSession = new UserSession();
userSession.setSimpleSessionId(simpleSessionId);
userSession.setUserId(userId);
userSession.setCreateTime(LocalDateTime.now());
//关联用户的会话。通过msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); 此方法,可以发送给用户消息
accessor.setUser(new UserSessionPrincipal(userSession));
}
}
如何接收客户端发送的消息
@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
private final SimpMessageSendingOperations msgOperations;
private final SimpUserRegistry simpUserRegistry;
/**
* 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
*/
@MessageMapping("/echo")
public void echo(Principal principal, Msg msg) {
String username = principal.getName();
msg.setContent("Echo: " + msg.getContent());
msgOperations.convertAndSendToUser(username, "/topic/answer", msg);
int userCount = simpUserRegistry.getUserCount();
int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
}
}
消费rabbitMQ推送的单播和广播消息
@Component
@Slf4j
public class MessageReceiveConsumer {
private final Gson gson;
private final ReceivedMessageHandler receivedMessageHandler;
public MessageReceiveConsumer(Gson gson, ReceivedMessageHandler receivedMessageHandler) {
this.gson = gson;
this.receivedMessageHandler = receivedMessageHandler;
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(),
exchange = @Exchange(value = WsConstants.SINGLE_EXCHANGE, type = ExchangeTypes.FANOUT)))
public void handleSingleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
SingleMessage singleMessage = gson.fromJson(body, SingleMessage.class);
receivedMessageHandler.handleSingleMessage(singleMessage);
channel.basicAck(tag, false);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(),
exchange = @Exchange(value = WsConstants.BOARD_CAST_EXCHANGE, type = ExchangeTypes.FANOUT)))
public void handleBoardCastMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
BoardCastMessage boardCastMessage = gson.fromJson(body, BoardCastMessage.class);
receivedMessageHandler.handleBoardCastMessage(boardCastMessage);
channel.basicAck(tag, false);
}
}
建立了两个exchange分别来接收消息。这里 @QueueBinding(value = @Queue(),通过这种方式建立的队列,队列名是spring取的一个随机名称,如下图所示
调用客户端工具类,发送消息给客户端
@Slf4j
@Component
public class ReceivedMessageHandler {
private final WsMessageClient wsMessageClient;
public ReceivedMessageHandler(WsMessageClient wsMessageClient) {
this.wsMessageClient = wsMessageClient;
}
public void handleSingleMessage(SingleMessage singleMessage) {
wsMessageClient.sendToUsers(singleMessage.getUserIds(), singleMessage);
}
public void handleBoardCastMessage(BoardCastMessage boardCastMessage) {
wsMessageClient.boardCast(boardCastMessage);
}
}
websocket-gateway 申请token接口
通过该接口,生成用户的jwtToken,在客户端建立连接时需要此token,不然无法建立连接
public class TokenController {
@PostMapping("/get")
public String getToken(@RequestBody @Validated TokenRequest tokenRequest) {
return TokenUtil.generateToken(tokenRequest.getUserId());
}
}
websocket-gateway 发送消息的接口
websocket-gateway暴露发送消息的接口给业务服务
public class MessageController {
private final MessageSendService messageSendService;
public MessageController(MessageSendService messageSendService) {
this.messageSendService = messageSendService;
}
@PostMapping("/single/send")
public Boolean singleSend(@RequestBody SingleMessage singleMessage) {
return messageSendService.singleSend(singleMessage);
}
@PostMapping("/boardCast/send")
public Boolean boardCastSend(@RequestBody BoardCastMessage boardCastMessage) {
return messageSendService.boardCastSend(boardCastMessage);
}
}
通过该http接口,最终会调用rabbitmq,发送消息给websocket-connector服务
项目地址
更多项目代码直接看一下源码吧
https://gitee.com/syk1234/websocket-services