Springboot 接入 WebSocket 实战
前言:
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
简单理解:
1,常见开发过程中我们知道 Http协议,客户端请求一次,服务器响应一次,推送数据,不能主动的推送数据,每次请求都要做一个连接,非常消耗性能。
2,websocket 建立一次链接,可以主动向客户端推送数据。
需求说明:
1,项目需要做一个知识助手,远程调用三方接口,那边是websocket 实时推送数据,类似gpt
2,后端需要连接三方服务,调用接口返回数据给前端,做渲染
功能实现
1,依赖
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
2,代码接口实现
controller:
@GetMapping("/callxxxxModel")
public String callxxxxModel(@RequestParam("paramOne") String paramOne) {
return webSocketxxxClientService.callxxxxModel(paramOne);
}
service:
/**
*
* @param paramOne
* 参数1
*/
String callxxxxModel(String paramOne);
serviceImpl:
@Slf4j
@Service("webSocketxxxClientService")
public class WebSocketxxxClientServiceImpl implements WebSocketxxxClientService{
@Override
public String callxxxxModel(String paramOne, Integer executeType) {
String uri = "ws://10.xx.xx.13:123/sss/xxx/aa/xxx_v2";
String xappid = "lsjdfljsdxxx09980dsfsd";
String xappkey = "xxsfdsf12123123";
long timestamp = System.currentTimeMillis();
String seqid = "";
log.info("timestamp=" + timestamp);
log.info("seqid=" + seqid);
WebSocketClientRemote client = new WebSocketClientRemote(uri, xappid, xappkey);
// Prepare message
Map<String, Object> message = new HashMap<>();
message.put("uid", xappid);
message.put("timestamp", timestamp);
message.put("seqid", seqid);
message.put("stream", "true");
// 会话识别码,切换话题可能需要更换
message.put("session_id", seqid);
message.put("prov", "xxsdfsdf23424332");
message.put("param1", paramOne);
message.put("param2", "xvsdfds23423423xxxxx");
String jsonMessage = JSON.toJSONString(message);
client.sendMessage(jsonMessage);
String responseMessage = client.getResponseMessage();
client.close();
return responseMessage;
}
package com.xx.xx.xx.web;
import com.xx.xx.common.result.ResultCodeEnum;
import com.xx.xx.xxx.exception.CloudException;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okio.ByteString;
import javax.websocket.ClientEndpoint;
import java.util.concurrent.CountDownLatch;
/**
* @author nobuyboday
*/
@Slf4j
@ClientEndpoint
public class WebSocketClientRemote {
private final OkHttpClient client;
private final WebSocket webSocket;
// public final CountDownLatch latch = new CountDownLatch(50);
// 记录websocket 返回的信息
public String responseMessage = "";
public WebSocketClientRemote(String uri, String xappid, String xappkey) {
client = new OkHttpClient();
Request request = new Request.Builder().url(uri)
// 添加自定义头
.addHeader("X-App-ID", "xxxfjslfjslj1231321xxxx3")
// 添加自定义头
.addHeader("X-App-Key", "0923jhdjflsdjflsdjljxxxxxxflsn").build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
log.info("已连接到服务器................");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
// log.info("收到消息: " + text);
responseMessage += text;
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
log.info("收到字节消息: " + bytes.hex());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(1000, null);
log.info("连接关闭: " + reason);
// latch.countDown();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
log.info("连接失败: " + t.getMessage());
}
});
}
public void sendMessage(String message) {
webSocket.send(message);
}
public void close() {
webSocket.close(1000, "关闭连接");
}
public void await() throws InterruptedException {
// latch.await();
}
public String getResponseMessage() {
// 对方数据是以 <#END> 代表推送数据完毕,这个\\u003c#END\\u003e是<#END>编码问题 不用管
boolean isReturn = responseMessage.endsWith("<#END>") || responseMessage.endsWith("\\u003c#END\\u003e") || responseMessage.contains("#END")
|| responseMessage.contains("\\u003c#END\\u003e");
if (isReturn) {
// log.info("最终的responseMessage:================>>>:{}", responseMessage);
return responseMessage.substring(0, responseMessage.length() - "<#END>".length());
} else {
// 循环等待
try {
Thread.sleep(3000);
} catch (Exception e) {
throw new CloudException(ResultCodeEnum.CALL_GROUP_DCOOS_TYCLOUD_KNOWLEDGE_ASSISTANT_FAIL.getCode(),
ResultCodeEnum.CALL_GROUP_DCOOS_TYCLOUD_KNOWLEDGE_ASSISTANT_FAIL.getMessage() + ":" + e);
}
return getResponseMessage();
}
}
}
遇到的问题:
1,websocket 没有返回值,需要加一个接口获取返回值
2,对方返回的数据 <#END> 结尾,在接收判断是否完毕后,字符集问题 不识别 <#END>
3,推送数据要保证全部推送完毕,要有个循环调用
喜欢我的文章记得点个在看,或者点赞,持续更新中ing…