若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial
可以分块处理这种数据。
interface Partial<T> extends MessageHandler {
/**
* Called when part of a message is available to be processed.
*
* @param messagePart The message part
* @param last <code>true</code> if this is the last part of
* this message, else <code>false</code>
*/
void onMessage(T messagePart, boolean last);
}
Partial
接口中的参数last
就是表示是否是最后一块分块数据。
我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。
//全局设置消息体的缓冲池大小
@Bean
public WebSocketContainerFactoryBean webSocketContainer(){
WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();
factoryBean.setMaxTextMessageBufferSize(20);
// factoryBean.setMaxSessionIdleTimeout(10*1000);
// factoryBean.setMaxBinaryMessageBufferSize(1000);
return factoryBean;
}
//session级别消息体的缓冲池大小
@OnOpen
public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
session.setMaxTextMessageBufferSize(20);
//....
}
使用JSR356注解实现消息体分块传输
@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {
private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);
private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
session.setMaxTextMessageBufferSize(20);
}
@OnMessage
public void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {
StringBuilder stringJoiner = dataCache.get(session.getId());
if (isLast) {
log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);
if (stringJoiner == null) {
String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);
session.getBasicRemote().sendText(msg);
} else {
dataCache.remove(session.getId());
stringJoiner.append(partialMsg);
String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);
session.getBasicRemote().sendText(msg);
}
} else {
log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);
if (stringJoiner == null) {
stringJoiner = new StringBuilder(partialMsg);
dataCache.put(session.getId(), stringJoiner);
} else {
stringJoiner.append(partialMsg);
}
}
}
使用spring的WebSocketHandler实现消息体分块传输
@Component
public class WebsocketHandler1 extends TextWebSocketHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
session.setTextMessageSizeLimit(20);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
StringBuilder stringJoiner = dataCache.get(session.getId());
if (message.isLast()) {
log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());
if (stringJoiner == null) {
TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));
session.sendMessage(msg);
} else {
dataCache.remove(session.getId());
stringJoiner.append(message.getPayload());
TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));
session.sendMessage(msg);
}
} else {
log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());
if (stringJoiner == null) {
stringJoiner = new StringBuilder(message.getPayload());
dataCache.put(session.getId(), stringJoiner);
} else {
stringJoiner.append(message.getPayload());
}
}
}
@Override
public boolean supportsPartialMessages() {
return true;
}
注意上边的WebsocketHandler1
实现的抽象方法supportsPartialMessages
其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter
会根据supportsPartialMessages
方法返回值将我们的WebSocketHandler适配成MessageHandler.Partial
或MessageHandler.Whole
,而supportsPartialMessages
返回值是false
就会适配成MessageHandler.Whole
,MessageHandler.Whole
是无法处理分块消息体的。
//StandardWebSocketHandlerAdapter
@Override
public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
this.wsSession.initializeNativeSession(session);
// The following inner classes need to remain since lambdas would not retain their
// declared generic types (which need to be seen by the underlying WebSocket engine)
if (this.handler.supportsPartialMessages()) {
session.addMessageHandler(new MessageHandler.Partial<String>() {
@Override
public void onMessage(String message, boolean isLast) {
handleTextMessage(session, message, isLast);
}
});
session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message, boolean isLast) {
handleBinaryMessage(session, message, isLast);
}
});
}
else {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
handleTextMessage(session, message, true);
}
});
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
handleBinaryMessage(session, message, true);
}
});
}
//......
}
websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText
可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。
//org.apache.tomcat.websocket.server.WsFrameBase
private boolean processDataText() throws IOException {
// Copy the available data to the buffer
TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
while (!TransformationResult.END_OF_FRAME.equals(tr)) {
//...
}
messageBufferBinary.flip();
boolean last = false;
// Frame is fully received
// Convert bytes to UTF-8
while (true) {
CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,
last);
if (cr.isError()) {
throw new WsIOException(new CloseReason(
CloseCodes.NOT_CONSISTENT,
sm.getString("wsFrame.invalidUtf8")));
} else if (cr.isOverflow()) {
// Ran out of space in text buffer - flush it
//尝试解码时发现接收缓冲区messageBufferText容量不足
//调用sendMessageTex(false),先处理部分数据。
if (usePartial()) { //查找分块处理器
messageBufferText.flip();
sendMessageText(false);
messageBufferText.clear();
} else { //没有分块处理器,就会抛出异常
throw new WsIOException(new CloseReason(
CloseCodes.TOO_BIG,
sm.getString("wsFrame.textMessageTooBig")));
}
} else if (cr.isUnderflow() && !last) {
// End of frame and possible message as well.
if (continuationExpected) {
// If partial messages are supported, send what we have
// managed to decode
if (usePartial()) {
messageBufferText.flip();
sendMessageText(false);
messageBufferText.clear();
}
messageBufferBinary.compact();
newFrame();
// Process next frame
return true;
} else {
// Make sure coder has flushed all output
last = true;
}
} else {
// End of message
messageBufferText.flip();
//处理最后一块消息
sendMessageText(true);
newMessage();
return true;
}
}
}
//确定是否支持分块处理
private boolean usePartial() {
if (Util.isControl(opCode)) {
return false;
} else if (textMessage) {
return textMsgHandler instanceof MessageHandler.Partial;
} else {
// Must be binary
return binaryMsgHandler instanceof MessageHandler.Partial;
}
}