SSE
SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP协议的通信技术,它允许服务器持续地将数据推送给客户端,而无需客户端发起请求。这种通信方式通常用于实时性要求较高的场景,如实时更新、通知、或者数据流式传输。
SSE与传统的Ajax轮询或长轮询相比,具有更低的延迟、更高的效率,并且更易于实现。它建立在HTTP协议之上,利用HTTP/1.1的持久连接,允许服务器在连接建立后持续地向客户端发送数据,客户端通过监听一个HTTP连接来接收这些数据。
在Web开发中,服务器通常会使用特殊的HTTP响应头(如"Content-Type: text/event-stream")来指示客户端这是一个SSE流,并且按照一定的格式发送事件数据给客户端。客户端则可以使用JavaScript中的EventSource对象来接收并处理这些事件,从而实现实时的数据更新。
SseEmitter
SseEmitter是Spring框架中的一个类,专门用于Java。SSE代表服务器发送事件,是一种使服务器能够通过HTTP向Web客户端推送数据更新的技术。SseEmitter是在Spring应用程序中实现SSE服务器支持的便捷方式。
使用SseEmitter,您可以在Spring应用程序中创建一个端点,客户端可以连接到该端点,服务器可以通过此连接向客户端推送事件。这对于实时更新非常有用,例如显示实时通知、进度更新或流式传输数据。
实现:
- OpenAI支持Stream流格式接收
接口连续的数据读取
官网示例
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0125", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0125", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}
...
{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0125", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
demo
private static final String API_KEY = "********************";
private static final Pattern contentPattern = Pattern.compile("\"content\":\"(.*?)\"}");
private static final String MODEL_ENGINE = "gpt-3.5-turbo";
public static void test() throws InterruptedException, IOException {
//params 的入参封装 这里省略 参考上面图片 或去官网 需要stream形式请求
HttpRequest httpRequest = HttpRequest.post("https://api.openai.com/v1/chat/completions")
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + API_KEY)
.body(JSONUtil.toJsonStr(params));
//TODO 代理 到shadowsocks
httpRequest.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 7890)));
HttpResponse execute = httpRequest.execute();
InputStream inputStream = execute.bodyStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
String line;
while ((line = bufferedReader.readLine()) != null) {
if (StringUtils.hasLength(line)) {
System.out.println(line);
Matcher matcher = contentPattern.matcher(line);
if (matcher.find()) {
String content = matcher.group(1);
System.out.println(content);
}
}
}
}
SSE发送
demo
ChatController
@Autowired
private ChatService chatService;
@GetMapping("/test")
public SseEmitter test(String question) {
SseEmitter sseEmitter = new SseEmitter();
chatService.question(question, sseEmitter);
return sseEmitter;
}
ChatService
private static final String API_KEY = "********************";
private static final Pattern contentPattern = Pattern.compile("\"content\":\"(.*?)\"}");
@Async
public void question(String question, SseEmitter sseEmitter) {
try {
// 构建请求参数
String params = "{\"model\":\"gpt-3.5-turbo\",\"messages\":[{\"role\":\"user\",\"content\":\"" + question + "\"}],\"stream\":true}";
// 发起 HTTP 请求
HttpRequest httpRequest = HttpRequest.post("https://api.openai.com/v1/chat/completions")
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + API_KEY)
.body(JSONUtil.toJsonStr(params))
.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 7890)));
// 执行 HTTP 请求
HttpResponse execute = httpRequest.execute();
// 处理响应流
try (InputStream inputStream = execute.bodyStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (StringUtils.hasLength(line)) {
// 输出响应内容
System.out.println(line);
// 提取内容
Matcher matcher = contentPattern.matcher(line);
if (matcher.find()) {
String content = matcher.group(1);
System.out.println(content);
// 发送 SSE 事件 (模拟延迟)
Thread.sleep(1000);
sseEmitter.send(SseEmitter.event().name("answer").data("{" + content + "}"));
}
}
}
}
} catch (IOException | InterruptedException e) {
// 异常处理
throw new RuntimeException(e);
} finally {
// 完成 SSE 连接
sseEmitter.complete();
}
}