上篇博客 SSE(Server Sent Event)实战(2)- Spring MVC 实现,我们用 Spring MVC 实现了简单的消息推送,并且留下了两个问题,这篇博客,我们用 Spring Web Flux 实现,并且看看这两个问题怎么解决。
一、服务端实现
/*
* XingPan.com
* Copyright (C) 2021-2024 All Rights Reserved.
*/
package com.sse.demo2.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author liuyuan
* @version SseController.java, v 0.1 2024-07-15 14:24
*/
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {
private static final HttpClient HTTP_CLIENT = HttpClient.create().responseTimeout(Duration.ofSeconds(5));
private final Map<String, FluxSink<String>> USER_CONNECTIONS = new ConcurrentHashMap<>();
/**
* 用来存储用户和本机地址,实际生成请用 redis
*/
private final Map<String, String> USER_CLIENT = new ConcurrentHashMap<>();
/**
* 创建连接
*/
@GetMapping(value = "/create-connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnect(@RequestParam("userId") String userId) {
// 获取本机地址
String hostAddress = this.getHostAddress();
Flux<String> businessData = Flux.create(sink -> {
USER_CONNECTIONS.put(userId, sink);
USER_CLIENT.put(userId, hostAddress);
log.info("创建了用户[{}]的SSE连接", userId);
sink.onDispose(() -> {
USER_CONNECTIONS.remove(userId);
USER_CLIENT.remove(userId);
log.info("移除用户[{}]的SSE连接", userId);
});
});
// 创建心跳
Flux<String> heartbeat = Flux.interval(Duration.ofMinutes(1)).map(tick -> "data: heartbeat\n\n");
return Flux.merge(businessData, heartbeat);
}
/**
* 发送消息 gateway
*/
@GetMapping("/send-message-gateway")
public Mono<RpcResult<Boolean>> sendMessageGateway(@RequestParam("userId") String userId, @RequestParam("message") String message) {
String userHostAddress = USER_CLIENT.get(userId);
if (userHostAddress == null) {
log.info("用户[{}]的SSE连接不存在,无法发送消息", userId);
return Mono.just(RpcResult.error("10001", "SSE连接不存在,无法发送消息"));
}
// 获取本机地址和用户连接地址比较,如果相同,直接使用localhost发消息
String hostAddress = this.getHostAddress();
userHostAddress = userHostAddress.equals(hostAddress) ? "localhost" : userHostAddress;
String baseUrl = "http://" + userHostAddress + ":8080";
log.info("发送消息 > baseUrl = {}", baseUrl);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HTTP_CLIENT))
.baseUrl(baseUrl)
.build();
RpcResult<Boolean> errorResult = RpcResult.error("10002", "消息发送失败");
return webClient.get()
.uri("/sse/send-message?userId={userId}&message={message}", userId, message)
.exchangeToMono(clientResponse -> {
if (clientResponse.statusCode().is2xxSuccessful()) {
log.info("消息发送成功 > 用户 = {},消息内容 = {}", userId, message);
return Mono.just(RpcResult.success(true));
} else {
log.error("消息发送失败 > 状态码 = {},用户 = {},消息内容 = {}", clientResponse.statusCode().value(), userId, message);
return Mono.just(errorResult);
}
}).onErrorResume(error -> {
log.error("消息发送失败 > 用户 = {}, 消息内容 = {}, e = ", userId, message, error);
return Mono.just(errorResult);
});
}
/**
* 发送消息
*/
@GetMapping("/send-message")
public Mono<Void> sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {
FluxSink<String> sink = USER_CONNECTIONS.get(userId);
if (sink != null) {
try {
sink.next(message);
log.info("给用户[{}]发送消息成功: {}", userId, message);
} catch (Exception e) {
log.error("向用户[{}]发送消息失败,sink可能已关闭或无效", userId, e);
USER_CONNECTIONS.remove(userId);
USER_CLIENT.remove(userId);
}
} else {
log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);
}
return Mono.empty();
}
private String getHostAddress() {
String hostAddress = "localhost";
try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (!inetAddress.isLoopbackAddress() && !inetAddress.getHostAddress().contains(":") && inetAddress.getHostAddress().startsWith("10.")) {
hostAddress = inetAddress.getHostAddress();
}
}
}
} catch (SocketException e) {
log.error("获取主机地址失败", e);
}
log.info("获取主机地址 > hostAddress = {}", hostAddress);
return hostAddress;
}
}
- 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?
在创建连接时
/create-connect
,增加心跳,只要心跳频率小于超时时间,基本就可以解决这个问题,但是前端要注意隐藏心跳内容。
- 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。
a. 将用户id 和用户请求的实例 ip 绑定,我这里用的是Map(USER_CLIENT)存储,生产请换成分布式缓存;
b. 服务端发送消息使用/send-message-gateway
接口,这个接口只做消息分发,不真实发送消息。从USER_CLIENT中获取用户所在的实例,然后将请求分发到具体实例;
c./send-message-gateway
将请求打到/send-message
,然后给用户推送消息;
二、客户端实现
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Demo</title>
<script> document.addEventListener('DOMContentLoaded', function () {
var userId = "1";
// 创建一个新的EventSource对象
var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);
// 当连接打开时触发
source.onopen = function (event) {
console.log('SSE连接已打开');
};
// 当从服务器接收到消息时触发
source.onmessage = function (event) {
// event.data 包含服务器发送的文本数据
console.log('接收到消息:', event.data);
// 在页面上显示消息
var messagesDiv = document.getElementById('messages');
if (messagesDiv) {
messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data
} else {
console.error('未找到消息容器元素');
}
};
// 当发生错误时触发
source.onerror = function (event) {
console.error('SSE连接错误:', event);
};
});
</script>
</head>
<body>
<div id="messages">
<!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>
三、启动项目
- 运行 Spring 项目
- 浏览器打开 index.html文件
- 调用发送消息接口
curl http://localhost:8080/sse/send-message-gateway?userId=1&message=test0001