参考链接
https://www.51cto.com/article/798001.html
了解一下SseEmitter(一)-CSDN博客
依赖
有默认的 springboot-web 依赖即可
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
SSEmitter API
方法 | 说明 |
| 回调方法,连接完成(正常关闭)时触发(超时,异常,complete() 之后触发) |
| 回调方法,当连接超时时触发 |
| 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。 |
| 在发生错误时关闭连接,并以错误的形式告知客户端 |
| 表示数据发送完成后关闭连接 |
| 创建一个事件对象,设置事件名称和数据 使用示例 |
onCompletion() 回调函数触发条件说明:他只会在连接关闭(正常关闭)后调用,可以用于执行一些清理工作。
触发条件:
- 调用 complete()
- 调用 completeWithError(e)
- 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()
服务端
controller
/***
* @author feiXun
* @create 2025/1/13 14:09
**/
@RestController
@RequestMapping("/sse")
@CrossOrigin(origins = "*")
public class SseController {
@Autowired
private SseService sseService;
@GetMapping("/connect")
@ApiOperation(value = "SSE 客户端连接")
public SseEmitter connect() throws IOException {
return sseService.connect();
}
@GetMapping("/batchSend")
@ApiOperation(value = "SSE 群发消息", notes = "SSE 群发消息, 目前用于测试,后期可以删除")
public void batchSend(@RequestParam("message") Object message){
sseService.batchSend(message);
}
}
Service 接口
/***
* @author feiXun
* @create 2025/1/13 14:52
**/
public interface SseService {
/**
* 连接
*/
SseEmitter connect() throws IOException;
/**
* 批量发送消息
*/
void batchSend(Object message);
/**
* 发送心跳包
*/
void sendHeartBeat();
}
Service 实现类
/***
* @author feiXun
* @create 2025/1/13 14:52
**/
@Slf4j
@Service
public class SseServiceImpl implements SseService {
/**
* 用于存储 sseEmitterList
*/
private final List<SseEmitter> sseEmitterList = new CopyOnWriteArrayList<>();
/**
* 消息队列
* 用于将错误的信息,没有 SSE 连接时发送的信息 保存起来
* 在重新连接的时候推送给 sse 客户端
*/
private final Queue<SseEmitter.SseEventBuilder> messages = new ConcurrentLinkedQueue<>();
@Override
public SseEmitter connect() throws IOException {
// 0 表示无限长连接;其他:毫秒数,表示连接时长,比如 1000L,就是 1秒后断开连接
SseEmitter emitter = new SseEmitter(0L);
// sse 连接完成,准备释放
emitter.onCompletion(completionCallBack(emitter));
// 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
emitter.onError(errorCallBack(emitter));
// 添加 list,用于发送给多个 sse 客户端
sseEmitterList.add(emitter);
// 这里是将 发生错误时,没有 SSE 客户端时 的消息在发一遍
while (!messages.isEmpty()){
SseEmitter.SseEventBuilder poll = messages.poll();
try {
// 发送信息
emitter.send(poll);
} catch (IOException e) {
// 将信息加入队列,下次连接的时候推送给客户端
messages.add(poll);
// 关闭连接并抛出异常给客户端
emitter.completeWithError(e);
break;
}
}
// 发送一条 心跳包信息
emitter.send(MrSseVo.buildHeartBeat());
return emitter;
}
/**
* 群发消息
*/
@Override
public void batchSend(Object message) {
// 将消息加入消息队列,当有 SSE 连接时,从队列中取出信息推送给 sse 客户端
if (sseEmitterList.isEmpty()){
SseEmitter.SseEventBuilder data = builderMessage(message);
messages.add(data);
return;
}
sseEmitterList.forEach(sseEmitter -> {
send(sseEmitter, message, false);
});
}
@Override
public void sendHeartBeat() {
if (!sseEmitterList.isEmpty()){
// 批量发送心跳包
sseEmitterList.forEach(sseEmitter -> {
send(sseEmitter, "ping", true);
});
}
}
/**
* 发送消息
* @param isPing 是否是心跳包
*/
private void send(SseEmitter sseEmitter, Object message, boolean isPing){
SseEmitter.SseEventBuilder data = builderMessage(message);
try {
// 发送信息
sseEmitter.send(message);
} catch (IOException e) {
if (!isPing){
// 将信息加入队列,下次连接的时候推送给客户端
messages.add(data);
}
// 关闭连接并抛出异常给客户端
sseEmitter.completeWithError(e);
}
}
/**
* SSE 连接完成 回调(连接已关闭,正准备释放)
* 触发条件(前提: 客户端的连接没断开)
* 1. 调用 complete()
* 2. 调用 completeWithError(e)
* 3. 超时断开连接(例如:new SseEmitter(1000L),那么在 1 秒后就会调用 onCompletion() 的回调函数);回调调用顺序:onTimeout() -> onCompletion()
*/
private Runnable completionCallBack(SseEmitter emitter){
return ()->{
log.info("连接已关闭,准备释放!");
// 将 sse 连接 移除
sseEmitterList.remove(emitter);
};
}
/**
* SSE 异常回调
* 指定当发生错误时执行的回调方法。这个错误可能是由于网络连接问题等原因。
*/
private Consumer<Throwable> errorCallBack(SseEmitter sseEmitter){
return throwable -> {
log.error("SSE 异常 {}", throwable.getMessage());
// 关闭连接并抛出异常给客户端
sseEmitter.completeWithError(throwable);
};
}
/**
* 构造消息
*/
private SseEmitter.SseEventBuilder builderMessage(Object message){
return SseEmitter.event().
id(UUID.randomUUID().toString())
.data(message, MediaType.APPLICATION_JSON);
}
}
客户端
http://127.0.0.1:8000/sse/connect:这个接口用于跟 sse 服务端建立连接
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Client</title>
</head>
<body>
SSE Client
<div id="events"></div>
<script>
const eventSource = new EventSource('http://127.0.0.1:8000/sse/connect');
eventSource.onmessage = function(event) {
const eventsDiv = document.getElementById('events');
eventsDiv.innerHTML += `<p>${event.data}</p>`;
};
eventSource.onerror = function(err) {
console.error("EventSource failed:", err);
};
</script>
</body>
</html>
测试
- 打开客户端 或者 浏览器访问 连接 接口(/sse/connect)
- 调用发送接口 (/sse/send)
- 查看网页等有没有出现相应的信息
例如: