主要依赖
<!--spring-boot父工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.2.0</version>
</dependency>
服务
package cn;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
@RestController
public class SseController {
@PostMapping(value = "/sse", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter handleSse(@RequestBody Map<String, Object> data) {
System.out.println(data);
SseEmitter emitter = new SseEmitter();
// 模拟一个长时间运行的任务,每秒发送一个事件
new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("开始响应 " + i);
try {
emitter.send(System.currentTimeMillis());
} catch (IOException e) {
// 如果客户端断开连接,我们需要关闭emitter
emitter.completeWithError(e);
}
}
emitter.complete();
}).start();
return emitter;
}
}
请求
package cn.demo;
import cn.hutool.json.JSONUtil;
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import java.util.Arrays;
import java.util.HashMap;
public class Test_1 {
public static void main(String[] args) throws Exception {
OkHttpClient client = new OkHttpClient.Builder().build();
EventSource.Factory factory = EventSources.createFactory(client);
// 请求体
HashMap<String, Object> map = new HashMap<>();
map.put("prompt", "哈喽,你好");
map.put("history", Arrays.asList());
map.put("temperature", 0.9);
map.put("top_p", 0.7);
map.put("max_new_tokens", 4096);
String json = JSONUtil.toJsonStr(map);
RequestBody body = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);
// 请求对象
Request request = new Request.Builder()
.url("http://localhost:8080/sse")
.post(body)
.build();
// 自定义监听器
EventSourceListener eventSourceListener = new EventSourceListener() {
@Override
public void onOpen(EventSource eventSource, Response response) {
System.out.println("开启");
super.onOpen(eventSource, response);
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
// 接受消息 data
System.out.println("数据data "+data);
super.onEvent(eventSource, id, type, data);
}
@Override
public void onClosed(EventSource eventSource) {
System.out.println("关闭");
//super.onClosed(eventSource);
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
System.out.println("失败");
super.onFailure(eventSource, t, response);
}
};
// 创建事件
EventSource eventSource = factory.newEventSource(request, eventSourceListener);
//client.dispatcher().executorService().shutdown();
}
}
说明
请求连接没有立即关闭
当请求完服务后,okhttp本身并不会直接关闭,它有后台挂起的线程。
这里的关闭不是必须的,如果你的应用需要立即关闭连接,释放资源,可以使用最后一行注释的代码。
EventSource eventSource = factory.newEventSource(request, eventSourceListener);
这行代码我在window10上循环了50000次,并没有看到明显的资源占用,似乎真的没有必要关闭。
参考官网 https://square.github.io/okhttp/5.x/okhttp/okhttp3/-ok-http-client/index.html