boost asio异步服务器(4)处理粘包问题tlv

粘包的产生

当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如:客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!

tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送。

粘包处理

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容)。

tlv

TLV(Type-Length-Value)是一种通信协议,用于在通信中传输结构化数据。它将数据分为三个部分:类型(Type)、长度(Length)和值(Value),每个部分都以固定的格式进行编码和解码。

但是我下边的格式并不是标准的tlv格式,而是采用的lv模式,即只包含length和value。

完善消息节点

class MsgNode {
public:
    //这里的构造方法主要方便后续调用Send接口构造消息节点
	MsgNode(char* msg, short data_len) : total_len(data_len + HEAD_LENGTH), cur_len(0) {
		_data = new char[total_len + 1];
		memcpy(_data, &data_len, HEAD_LENGTH);
		memcpy(_data + HEAD_LENGTH, msg, data_len);
		_data[total_len] = '\0';
	}
    //这里的构造方法则是用于在进行切包过程中构造处理数据的节点
	MsgNode(short data_len) :total_len(data_len), cur_len(0) {
		_data = new char[total_len + 1];
	}
    //Clear方法是用于清理节点的数据,避免多次构造析构节点
	void Clear() {
		memset(_data, 0, total_len);
		cur_len = 0;
	}
	~MsgNode() {
		delete[] _data;
	}
private:
	friend class Session;
	//表示已经处理的数据长度
	int cur_len;
	//表示处理数据的总长度
	int total_len;
	//表示数据的首地址
	char* _data;
};

完善两个构造函数和添加Clear函数

1、第一个构造方法主要方便后续调用Send接口构造消息节点
2、第二个构造方法则是用于在进行切包过程中构造处理数据的节点
3、Clear方法是用于清理节点的数据,避免多次构造析构节点

session类完善

_recv_msg_node用于存放收到数据包中的数据

_b_head_parse表示头部是否解析完成

_recv_head_node用于存放接收到数据包中的头部信息

完善hand_read回调函数

void Session::handle_read(const boost::system::error_code& ec, size_t bytes_transferred,
	std::shared_ptr<Session> self_shared) {
	if (ec) {
		std::cout << "read error, error code: " << ec.value() <<
			" read message: " << ec.message() << std::endl;
		Close();
		server_->ClearSession(uuid);
	}
	else {
		PrintRecvData(data_, bytes_transferred);
		std::chrono::milliseconds dura(2000);
		std::this_thread::sleep_for(dura);
		
		//已经移动的字节数
		int copy_len = 0;
		while (bytes_transferred) {
			//头部尚未解析完成
			if (!_b_head_parse) {
				//收到的数据不足头部大小,这种情况很少发生
				if (bytes_transferred + _recv_head_node->cur_len < HEAD_LENGTH) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_head_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}

				//走到这里,说明收到的数据大于头部,可能是一个粘连的数据包,但是首先需要将头部节点两字节读完

				//处理头部剩余未复制的长度
				int head_remain = HEAD_LENGTH - _recv_head_node->cur_len;
				if (head_remain) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, head_remain);
					//更新已处理的数据
					copy_len += head_remain;
					/*
					* 这里不能更新头部节点的cur_len。
					* 因为
					* 1、当一次进来cur_len等于0,处理之后的偏移量copy_len就为2
					* 2、当头部未读取完成,后续读取会修正为正确的偏移量(但是种情况很少发生)
					* 3、之后的读取头部信息都会发生覆盖
					*/
					//_recv_head_node->cur_len += head_remain;
					bytes_transferred -= head_remain;
				}

				//获取头部数据
				short data_len = 0;
				memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
				std::cout << "data_len is " << data_len << std::endl;

				if (data_len > MAX_LENGTH) {
					std::cout << "invalid data length is " << data_len << std::endl;
					server_->ClearSession(uuid);
					return;
				}

				//头部节点处理完成,就可以开始处理数据域的数据节点
				_recv_msg_node = std::make_shared<MsgNode>(data_len);

				//消息长度小于头部规定长度,说明数据未收全,则先将消息放到接收节点中
				if (bytes_transferred < data_len) {
					memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_msg_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));

					//表示头部处理完成,当下次进来的时候,就会直接跳过头部处理环节
					_b_head_parse = true;
					return;
				}

				//走到这里表示消息长度大于头部规定长度,这里可能是一个完整包,也可能是多个粘连的包
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, data_len);
				_recv_msg_node->cur_len += data_len;
				copy_len += data_len;
				bytes_transferred -= data_len;
				_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
				std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

				//调用send发送给客户端
				Send(_recv_msg_node->_data, _recv_msg_node->total_len);

				//继续轮询处理下个未处理的数据,重置数据包和头部解析的情况
				_b_head_parse = false;
				_recv_msg_node->Clear();
				//说明这不是一个多个粘连的数据包
				if (bytes_transferred <= 0) {
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}
				//走到这里说明这就是一个多个粘连的数据包
				continue;
			}

			//走到这里就说明头部是已经解析完成的,是处理数据未收全的情况
			int remain_msg = _recv_msg_node->total_len - _recv_msg_node->cur_len;
			//说明收到的数据仍然不足头部规定大小的情况
			if (bytes_transferred < remain_msg) {
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
				_recv_msg_node->cur_len += bytes_transferred;
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}

			//走到这里说明收到的数据是大于等于头部规定大小的,接收到的数据可能是个完整的数据包,也可能多个粘连的数据包
			memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, remain_msg);
			_recv_msg_node->cur_len += remain_msg;
			bytes_transferred -= remain_msg;
			copy_len += remain_msg;
			_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
			std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

			//处理完当前数据包的分割后,调用send接口向客户端发送回去
			Send(_recv_msg_node->_data, _recv_msg_node->total_len);

			//继续轮询处理下个数据包,重置接收数据节点和头部解析情况
			_b_head_parse = false;
			_recv_msg_node->Clear();
			//说明数据包并不是粘连的
			if (bytes_transferred <= 0) {
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}
			//走到这里说明数据包是粘连的
			continue;	
		}
	}
}

这里hand_read函数的完善逻辑代码比较长,其中的注释给的比较详细,需要各位仔细读。但是逻辑可能头一两次读可能还是会有些蒙,多读几遍可能就会好得多。

这里还是得必要得说一下,我们都知道异步读写函数得回调函数中的参数bytes_transferred表示已经读取到的字节数,但是我们在这里还是需要对这些已经读到的数据进行处理。其中定义copy_len表示已经处理的字节数,bytes_transferred则表示为还未处理的数据(尽管已经被读取到了,但是还是尚未被处理,需要好好理解下)。

这里在session类中还定义了两个宏,MAX_LENGTH表示数据包的最大长度,就是1024*2字节。HEAD_LENGTH表示头部长度,就是2字节。

这里我也画了一个逻辑图供大家梳理这里的代码逻辑,希望能对大家理解有帮助。

粘包现象的测试

在session类中写一个打印函数,在每次触发读事件回调的时候调用下这个函数。这里打印的是tcp缓冲区的数据,boost asio从tcp已经是已经做了将tcp缓冲区的数据拿出来的,所以这里打印即可。

为了制造粘包现象,我们可以让服务器端隔2s处理一次读写,而客户端则不停的发送和读取就能制造出粘包现象了。下边是提供的客户端的代码。

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024 * 2;
const int HEAD_LENGTH = 2;
int main()
{
	//测试粘包现象客户端
	try {
		//创建上下文服务
		boost::asio::io_context   ioc;
		//构造endpoint
		tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 1234);
		tcp::socket  sock(ioc);
		boost::system::error_code   error = boost::asio::error::host_not_found;
		sock.connect(remote_ep, error);
		if (error) {
			cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
			return 0;
		}

		thread send_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				const char* request = "hello world!";
				size_t request_length = strlen(request);
				char send_data[MAX_LENGTH] = { 0 };
				memcpy(send_data, &request_length, 2);
				memcpy(send_data + 2, request, request_length);
				boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
			}
			});

		thread recv_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				cout << "begin to receive..." << endl;
				char reply_head[HEAD_LENGTH];
				size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
				short msglen = 0;
				memcpy(&msglen, reply_head, HEAD_LENGTH);
				char msg[MAX_LENGTH] = { 0 };
				size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

				std::cout << "Reply is: ";
				std::cout.write(msg, msglen) << endl;
				std::cout << "Reply len is " << msglen;
				std::cout << "\n";
			}
			});

		send_thread.join();
		recv_thread.join();
	}
	catch (std::exception& e) {
		std::cerr << "Exception: " << e.what() << endl;
	}
	return 0;
}

现象如下图,测试环境Windows visual studio 

完整服务端代码:codes-C++: C++学习 - Gitee.com

这里的echo服务器实现了粘包的处理,但是在不同的平台下仍存在收发数据异常的问题,其根本原因就是平台大小端的差异。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/744533.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

BP神经网络

BP神经网络 BP神经网络是一种多层前馈神经网络&#xff0c;它通过反向传播算法进行训练&#xff0c;旨在最小化损失函数&#xff0c;从而对输入数据进行精确的分类或回归预测。 背景 BP (Back Propagation) 神经网络是1986年由 Rumelhart 和 McClelland 为首的科学家提出的概…

SAP ABAP 之容器

文章目录 前言一、案例介绍/笔者需求二、自定义容器 a.实例化对象 b.自定义容器效果演示 c.Copy Code 三、自适应容器 a.常用 必须 参数理解 b.METRIC 度量单位 c.RATIO 百分比尺寸 d.STYLE 容器…

商业银行流动性创造指标数据集(2005-2022)

数据简介&#xff1a;中文数据库商业银行流动性创造指标参考邓伟等老师&#xff08;2022&#xff09;的做法&#xff0c;常备借贷便利与中期借贷便利数据来源于中国人民银行发布的《中国货币政策执行报告》。银行层面的微观指标主要来源于BankScope数据库和CSMAR数据库&#xf…

Spring Cloud Netflix:构建强大微服务生态系统的利器

Spring Cloud Netflix是一组集成框架&#xff0c;它将Netflix的多个开源组件整合到Spring Boot应用程序中&#xff0c;使得构建云原生应用程序变得更加简单。这些组件包括用于服务发现和注册的Eureka&#xff0c;断路器模式的实现Hystrix&#xff0c;用于API网关的Zuul&#xf…

springboot家乡特色推荐系统 LW +PPT+源码+讲解

3系统需求分析 3.1系统功能 通过前面的功能分析可以将家乡特色推荐系统的功能分为管理员和用户两个部分&#xff0c;系统的主要功能包括首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;文章分类管理&#xff0c;文章分享管理&#xff0c;系统管理等内容。任何用户…

【c语言】二级指针

1&#xff0c;定义 本质还是从指针的角度去理解&#xff0c;只不过存的指针的值 2&#xff0c;使用方法

第三方软件连接虚拟机

第三方软件连接虚拟机 1 查看本机VM&#xff08;VMware&#xff09;虚拟机网段2 开启虚拟机系统&#xff0c;修改网卡配置3 重新打开网络并测试连通性4 打开VM虚拟机网络开关5 通过第三方软件建立连接6 可能遇到的问题 1 查看本机VM&#xff08;VMware&#xff09;虚拟机网段 子…

38.控制功能实现

上一个内容&#xff1a;37.添加简易的调试功能 以 37.添加简易的调试功能 它的代码为基础进行修改 效果图&#xff1a; 下图红框位置的功能实现 Dlls项目中添加一个Dialog Dialog如下 然后给它添加一个类&#xff0c;MFC添加的类可能会报错添加 #include "afxdialogex.h…

煤矿智能巡检机器人:推动煤矿行业变革的关键力量

目前我国煤炭资源总量达到了2078.85亿吨&#xff0c;已探明储量为1432亿吨&#xff0c;煤矿能源现阶段还是我国重要的基础能源。而煤矿生产作业存在巨大危险&#xff0c;主要包括高温、高压、燃爆和有毒气体等环境因素&#xff0c;同时机械设备运转过程中潜藏着重大风险。这些危…

【Python/Pytorch - 网络模型】-- 高阶SVD算法

文章目录 文章目录 00 写在前面01 基于Python版本的高阶SVD算代码02 HOSVD 的步骤 00 写在前面 高阶奇异值分解&#xff08;Higher-Order SVD&#xff0c;HOSVD&#xff09;是一种将传统的奇异值分解&#xff08;SVD&#xff09;扩展到高阶张量的方法。它能够将一个高阶张量分…

【摄像头标定】使用kalibr进行双目摄像头标定(ros1、ros2)

使用kalibr进行双目摄像头标定 前言标定板标定①板端准备和录制②上位机准备和标定 前言 本文不是纯用ros1进行标定&#xff0c;需要ros1和ros2通信。给使用ros2进行开发&#xff0c;但又想用kalibr标定双目摄像头的小伙伴一个教程。本文双目摄像头的数据发布使用ros2&#xf…

fork 是一个创建新进程的系统调用

在计算机科学中&#xff0c;fork 是一个创建新进程的系统调用。具体来说&#xff0c;fork 调用会创建一个与当前进程几乎完全相同的副本&#xff0c;包括父进程的内存布局、环境变量、打开的文件描述符等。这个新的进程被称为子进程&#xff0c;而原始进程被称为父进程。 以下…

Spring+Vue项目部署

目录 一、需要的资源 二、步骤 1.首先要拥有一个服务器 2.项目准备 vue&#xff1a; 打包: 3.服务器装环境 文件上传 设置application.yml覆盖 添加启动和停止脚本 ​编辑 安装jdk1.8 安装nginx 安装mysql 报错&#xff1a;「ERR」1273-Unknown collation: utf8m…

springboot网上商城系统的设计与实现-计算机毕业设计源码08789

摘 要 随着互联网趋势的到来&#xff0c;各行各业都在考虑利用互联网将自己推广出去&#xff0c;最好方式就是建立自己的互联网系统&#xff0c;并对其进行维护和管理。在现实运用中&#xff0c;应用软件的工作规则和开发步骤&#xff0c;采用Java技术建设网上商城系统。 本设…

客户有哪些封装案例,一句克服使用让PCBA工厂泪流满面

作者 | 高速先生成员--王辉东 天空下着雨&#xff0c;萧萧从窗前经过&#xff0c;看窗里。 翠萍那娇艳欲滴的脸上挂着两串泪滴。 萧萧一进去&#xff0c;问啥情况。 翠萍往电脑屏幕一指。 当萧萧看向屏幕一瞬间。 那些曾经以为早已遗忘的伤痛&#xff0c;会在某些时刻如潮…

Gradle学习-2 Groovy

1、Groovy基础语法 1.1、基本数据类型 Groovy支持数据类型&#xff1a;byte, short, int, long, float, double, char &#xff08;1&#xff09;创建一个Android Studio项目 &#xff08;2&#xff09;在根目录新建一个 leon.gradle&#xff0c;输入以下内容 leon.gradle…

突破Web3红海,DePIN如何构建创新生态系统?

撰文&#xff1a;TinTinLand 本文来源香港Web3媒体Techub News专栏作者TinTinLand 2023 年 DePIN 赛道的火热成为 Web3 行业的重点关注方向&#xff0c;当前如何以可扩展、去中心化、安全方式推动 DePIN 赛道赋能下的 AI 版图建设&#xff0c;寻找更多 Web3 行业创新机遇成为…

【已解决】Python报错:NameError: name ‘Image‘ is not defined

&#x1f60e; 作者介绍&#xff1a;我是程序员行者孙&#xff0c;一个热爱分享技术的制能工人。计算机本硕&#xff0c;人工制能研究生。公众号&#xff1a;AI Sun&#xff0c;视频号&#xff1a;AI-行者Sun &#x1f388; 本文专栏&#xff1a;本文收录于《AI实战中的各种bug…

QT拖放事件之七:子类化QMimeData,实现对多个自定义类型进行数据

1、前提说明 /*自定义的MIME类型数据存储在QMimeData对象中, 存在两种方法:1. setData(...)可以把自定义类型的数据以QByteArray的形式直接存储在QMimeData中,但是使用此方法一次只能对一个MIME类型进行处理(可参考 QT拖放事件六:自定义MIME类型的存储及读取demo ) 一文。…

udp Socket组播 服务器

什么是组播 组播也可以称之为多播这也是 UDP 的特性之一。组播是主机间一对多的通讯模式&#xff0c;是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将一份报文发送到特定的组播地址&#xff0c;组播地址不同于单播地址&#xff0c;它并不属于特定某个主机…