Websocket如何分块处理数据量超大的消息体

在这里插入图片描述

若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理这种数据。

 interface Partial<T> extends MessageHandler {

        /**
         * Called when part of a message is available to be processed.
         *
         * @param messagePart   The message part
         * @param last          <code>true</code> if this is the last part of
         *                      this message, else <code>false</code>
         */
        void onMessage(T messagePart, boolean last);
    }

Partial接口中的参数last就是表示是否是最后一块分块数据。

我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。

 //全局设置消息体的缓冲池大小
    @Bean  
    public WebSocketContainerFactoryBean webSocketContainer(){
        WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();
        factoryBean.setMaxTextMessageBufferSize(20);
//        factoryBean.setMaxSessionIdleTimeout(10*1000);
//        factoryBean.setMaxBinaryMessageBufferSize(1000);
        return factoryBean;
    }

	//session级别消息体的缓冲池大小
	 @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
        //....
  }      

使用JSR356注解实现消息体分块传输


@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {
    private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
    }

    @OnMessage
    public void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (isLast) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);
                session.getBasicRemote().sendText(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(partialMsg);
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);
                session.getBasicRemote().sendText(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(partialMsg);
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(partialMsg);
            }
        }

    }

使用spring的WebSocketHandler实现消息体分块传输

@Component
public class WebsocketHandler1 extends TextWebSocketHandler {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
   		 session.setTextMessageSizeLimit(20);
    }


    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (message.isLast()) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));
                session.sendMessage(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(message.getPayload());
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));
                session.sendMessage(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(message.getPayload());
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(message.getPayload());
            }
        }

    }
    @Override
    public boolean supportsPartialMessages() {
        return true;
    }

注意上边的WebsocketHandler1 实现的抽象方法supportsPartialMessages其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter会根据supportsPartialMessages方法返回值将我们的WebSocketHandler适配成MessageHandler.PartialMessageHandler.Whole,而supportsPartialMessages返回值是false就会适配成MessageHandler.WholeMessageHandler.Whole是无法处理分块消息体的。

	//StandardWebSocketHandlerAdapter
	@Override
	public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
		this.wsSession.initializeNativeSession(session);

		// The following inner classes need to remain since lambdas would not retain their
		// declared generic types (which need to be seen by the underlying WebSocket engine)

		if (this.handler.supportsPartialMessages()) {
			session.addMessageHandler(new MessageHandler.Partial<String>() {
				@Override
				public void onMessage(String message, boolean isLast) {
					handleTextMessage(session, message, isLast);
				}
			});
			session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message, boolean isLast) {
					handleBinaryMessage(session, message, isLast);
				}
			});
		}
		else {
			session.addMessageHandler(new MessageHandler.Whole<String>() {
				@Override
				public void onMessage(String message) {
					handleTextMessage(session, message, true);
				}
			});
			session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message) {
					handleBinaryMessage(session, message, true);
				}
			});
		}
		//......
	}

websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。

//org.apache.tomcat.websocket.server.WsFrameBase
	private boolean processDataText() throws IOException {
        // Copy the available data to the buffer
        TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
        while (!TransformationResult.END_OF_FRAME.equals(tr)) {
             //...
        }
        messageBufferBinary.flip();
        boolean last = false;
        // Frame is fully received
        // Convert bytes to UTF-8
        while (true) {
            CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,
                    last);
            if (cr.isError()) {
                throw new WsIOException(new CloseReason(
                        CloseCodes.NOT_CONSISTENT,
                        sm.getString("wsFrame.invalidUtf8")));
            } else if (cr.isOverflow()) {
                // Ran out of space in text buffer - flush it
                //尝试解码时发现接收缓冲区messageBufferText容量不足
                //调用sendMessageTex(false),先处理部分数据。
                if (usePartial()) {   //查找分块处理器
                    messageBufferText.flip();
                    sendMessageText(false);
                    messageBufferText.clear();
                } else { //没有分块处理器,就会抛出异常
                    throw new WsIOException(new CloseReason(
                            CloseCodes.TOO_BIG,
                            sm.getString("wsFrame.textMessageTooBig")));
                }
            } else if (cr.isUnderflow() && !last) {
                // End of frame and possible message as well.

                if (continuationExpected) {
                    // If partial messages are supported, send what we have
                    // managed to decode
                    if (usePartial()) {
                        messageBufferText.flip();
                        sendMessageText(false);
                        messageBufferText.clear();
                    }
                    messageBufferBinary.compact();
                    newFrame();
                    // Process next frame
                    return true;
                } else {
                    // Make sure coder has flushed all output
                    last = true;
                }
            } else {
                // End of message
                messageBufferText.flip();
                //处理最后一块消息
                sendMessageText(true);

                newMessage();
                return true;
            }
        }
	}      
	//确定是否支持分块处理
	private boolean usePartial() {
        if (Util.isControl(opCode)) {
            return false;
        } else if (textMessage) {
            return textMsgHandler instanceof MessageHandler.Partial;
        } else {
            // Must be binary
            return binaryMsgHandler instanceof MessageHandler.Partial;
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918813.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

卡尔曼滤波学习资料汇总

卡尔曼滤波学习资料汇总 其实&#xff0c;当初的目的&#xff0c;是为了写 MPU6050 的代码的&#xff0c;然后不知不觉学了那么多&#xff0c;也是因为好奇、感兴趣吧 有些还没看完&#xff0c;之后笔记也会同步更新的 学习原始材料 【卡尔曼滤波器】1_递归算法_Recursive P…

Javaweb-day13事务管理AOP

spring的事务管理&spring框架的第二大核心AOP面向切面编程 spring框架的第一大核心是前面讲的IOC控制反转 事务管理 事务回顾 概念&#xff1a;事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;这些操作要么同时成功&#xff0c;要么同时失败。…

JavaWeb后端开发知识储备1

目录 1.DTO/VO/PO 2.MVC架构/微服务架构 3.JWT令牌流程 4.ThreadLocal 5.接口路径/路径参数 6.自定义注解 1.DTO/VO/PO 1.1 DTO DTO 即 Data Transfer Object—— 数据传输对象&#xff0c;是用于传输数据的对象&#xff0c;通常在服务层与表现层之间传递数据&#xff…

BLE 蓝牙客户端和服务器连接

蓝牙通信在设计小型智能设备时非常普遍&#xff0c;之前一直没有使用过&#xff0c;最近使用ardunio ESP32 做了一些实验&#xff0c;做了一个收听播客的智能旋钮&#xff08;Smart Knob&#xff09;&#xff0c;它带有一个旋转编码器和两个按键。 本文介绍BLE 服务器Server和W…

海康威视和大华视频设备对接方案

目录 一、海康威视 【老版本】 【新版本】 二、大华 一、海康威视 【老版本】 URL规定&#xff1a; rtsp://username:password[ipaddress]/[videotype]/ch[number]/[streamtype] 注&#xff1a;VLC可以支持解析URL里的用户名密码&#xff0c;实际发给设备的RTSP请求不支…

STM32设计智能翻译手势识别加算法系统

目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 电路图采用Altium Designer进行设计&#xff1a; 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 在全球化的浪潮下&#xff0c;语言的多样性也为人们的交流带来了不小的挑战…

基本定时器---内/外部时钟中断

一、定时器的概念 定时器&#xff08;TIM&#xff09;&#xff0c;可以对输入的时钟信号进行计数&#xff0c;并在计数值达到设定值的时候触发中断。 STM32的定时器系统有一个最为重要的结构是时基单元&#xff0c;它由一个16位计数器&#xff0c;预分频器&#xff0c;和自动重…

Qt文件目录操作

文件目录操作相关类 Qt 为文件和目录操作提供了一些类&#xff0c;利用这些类可以方便地实现一些操作。Qt 提供的与文件和目录操作相关的类包括以下几个&#xff1a; QCoreApplication&#xff1a;用于提取应用程序路径&#xff0c;程序名等文件信息&#xff1b;QFile&#x…

.NET 通过模块和驱动收集本地EDR的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

css中的box-sizing,记录

border-box&#xff1a;最终高度为height&#xff0c;默认包含padding border等属性 content-box&#xff1a;box-sizing默认值&#xff0c;最终大小为heightpaddingborder 等

【AI绘画】Alpha-VLLM 的 Lumina-Next:新一代图像生成器

简介 Lumina-Next-T2I 是在 Lumina-T2I 成功基础上发展起来的尖端图像生成模型。它采用了带有 2B 参数模型的 Next-DiT 和 Gemma-2B 文本编码器&#xff0c;推理速度更快&#xff0c;生成样式更丰富&#xff0c;并增强了多语言支持。 模型架构 Lumina-Next-T2I 的生成模型建…

Redis学习 ——缓存

文章目录 一、Redis缓存的介绍二、Redis缓存问题2.1 缓存穿透2.2 缓存击穿2.3 缓存雪崩2.4 双写一致性2.5 缓存持久化RDBAOF 三、缓存数据管理3.1 数据过期策略3.2 数据淘汰策略 一、Redis缓存的介绍 我们在日常的代码编写中比较少使用到Redis&#xff0c;但是如果涉及到了比较…

【阅读记录-章节2】Build a Large Language Model (From Scratch)

目录 2.Working with text data2.1 Understanding word embeddings2.2 Tokenizing text通过一个简单的实验来理解文本的词元化概念关键概念 2.3 Converting tokens into token IDs实现分词器类&#xff08;Tokenizer Class&#xff09;应用分词器测试文本的编码与解码通过分词器…

etcd部署(基于v3.5.15)

etcd部署 单节点部署下载etcd&#xff0c;解压etcd二进制包&#xff0c;并进入解压后目录创建数据目录移动可执行文件到/usr/local/bin/目录测试版本配置systemd管理启动etcd&#xff0c;设置开机启动验证 集群部署(3节点)环境准备准备3台服务器配置3台服务器hosts配置3台服务器…

HTML5实现趣味飞船捡金币小游戏(附源码)

文章目录 1.设计来源1.1 主界面1.2 游戏中界面1.2 飞船边界框效果 2.效果和源码2.1 动态效果2.2 源代码 源码下载 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/143799554 HTML5实现趣味飞船捡金币小游戏(附源码)&…

ASP.NET Core Webapi 返回数据的三种方式

ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择&#xff1a; Specific type IActionResult ActionResult<T> 1. 返回指定类型&#xff08;Specific type&#xff09; 最简单的API会返回原生的或者复杂的数据类型&#xff08;比如&#xff0c;string 或者…

汽车资讯新动力:Spring Boot技术驱动

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

Go语言跨平台桌面应用开发新纪元:LCL、CEF与Webview全解析

开篇寄语 在Go语言的广阔生态中&#xff0c;桌面应用开发一直是一个备受关注的领域。今天&#xff0c;我将为大家介绍三款基于Go语言的跨平台桌面应用开发框架——LCL、CEF与Webview&#xff0c;它们分别拥有独特的魅力和广泛的应用场景。通过这三款框架&#xff0c;你将能够轻…

如何确保爬取的数据准确性和完整性?

在数据驱动的业务环境中&#xff0c;爬虫程序的准确性和完整性至关重要。本文将探讨如何使用Java编写爬虫程序&#xff0c;并确保其在爬取数据时的准确性和完整性。 1. 精确的HTML解析 确保数据准确性的第一步是精确地解析HTML。Jsoup是Java中常用的HTML解析库&#xff0c;它提…

【linux】如何扩展磁盘容量(VMware虚拟机)-转载

如何扩展磁盘容量(VMware虚拟机) 一、前置准备工作 扩展虚拟机磁盘前&#xff0c;需要先把虚拟机关机才能进行扩展磁盘操作 1.选择虚拟机设置&#xff0c;如下图所示 2.输入你想扩展的磁盘容量&#xff0c;以本次实操为例&#xff0c;我这里输入的30G&#xff08;具体按照实…