1. 概述
1.1 什么是 ResponseBodyEmitter
ResponseBodyEmitter
是 Spring MVC 提供的一个接口,用于支持异步返回响应数据流。它允许在控制器方法中逐步发送数据给客户端,而无需一次性生成完整的响应。
1.2 使用场景
- 实时数据推送(如股票行情、聊天消息等)。
- 大量数据分批传输。
- 服务器发送事件(SSE, Server-Sent Events)。
1.3 优势与局限性
优势:
- 支持异步数据流处理。
- 能够实时更新客户端数据。
- 简化了复杂数据流的管理。
局限性:
- 高并发场景下需要额外优化。
- 客户端断开连接时需手动处理资源释放。
2. 环境准备
2.1 添加依赖
确保项目中包含以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2 配置 Spring Boot 项目
创建一个标准的 Spring Boot 项目,并配置好基础环境。
3. 基本使用方法
3.1 创建控制器
定义一个控制器类,用于处理 HTTP 请求。
3.2 返回 ResponseBodyEmitter
对象
通过返回 ResponseBodyEmitter
对象实现异步数据流。
3.3 发送数据给客户端
使用 emitter.send()
方法向客户端发送数据。
示例代码:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/stream")
public class StreamController {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@GetMapping("/events")
public ResponseBodyEmitter handleEvents() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// 使用线程池管理异步任务
executorService.execute(() -> {
try {
for (int i = 0; i < 5; i++) {
// 模拟延迟
TimeUnit.SECONDS.sleep(1);
// 发送数据给客户端
emitter.send("Event " + i + "\n");
}
// 完成发送
emitter.complete();
} catch (IOException | InterruptedException e) {
// 发生错误时处理
emitter.completeWithError(e);
}
});
return emitter;
}
}
说明:
- 使用
ExecutorService
管理异步任务,避免直接创建线程。 TimeUnit.SECONDS.sleep(1)
模拟每秒发送一次数据。emitter.send("Event " + i + "\n")
发送数据给客户端。emitter.complete()
完成数据发送。emitter.completeWithError(e)
处理异常。
4. 实现服务器发送事件(SSE)
4.1 SSE 简介
SSE 是一种基于 HTTP 的协议,允许服务器向客户端推送实时更新的数据。
4.2 使用 ResponseBodyEmitter
实现 SSE
通过设置响应头 Content-Type: text/event-stream
,可以实现 SSE。
示例代码:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/sse")
public class SseController {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter();
// 使用线程池管理异步任务
executorService.execute(() -> {
try {
for (int i = 0; i < 5; i++) {
// 模拟延迟
TimeUnit.SECONDS.sleep(1);
// 发送数据给客户端
emitter.send(SseEmitter.event().name("message").data("Event " + i));
}
// 完成发送
emitter.complete();
} catch (IOException | InterruptedException e) {
// 发生错误时处理
emitter.completeWithError(e);
}
});
return emitter;
}
}
说明:
- 使用
SseEmitter
实现 SSE。 MediaType.TEXT_EVENT_STREAM_VALUE
设置响应头为text/event-stream
。emitter.send(SseEmitter.event().name("message").data("Event " + i))
发送带有名称的数据。emitter.complete()
完成数据发送。emitter.completeWithError(e)
处理异常。
4.3 客户端代码示例
HTML 示例:
<!DOCTYPE html>
<html>
<head>
<title>SSE Example</title>
</head>
<body>
<div id="events"></div>
<script>
const eventSource = new EventSource('/sse/stream');
eventSource.onmessage = function(event) {
document.getElementById('events').innerHTML += event.data + '<br>';
};
eventSource.onerror = function(err) {
console.error("EventSource failed:", err);
};
</script>
</body>
</html>
说明:
- 使用
EventSource
连接到 SSE 流。 eventSource.onmessage
处理接收到的数据。eventSource.onerror
处理错误。
5. 异步数据推送的最佳实践
5.1 数据流管理
- 使用线程池管理异步任务,避免资源耗尽。
- 设置合理的超时时间,防止连接长时间占用。
示例代码:
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import