boost asio异步服务器(4)处理粘包

粘包的产生

当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如:客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!

tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送。

粘包处理

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容)。

tlv

TLV(Type-Length-Value)是一种通信协议,用于在通信中传输结构化数据。它将数据分为三个部分:类型(Type)、长度(Length)和值(Value),每个部分都以固定的格式进行编码和解码。

但是我下边的格式并不是标准的tlv格式,而是采用的lv模式,即只包含length和value。

完善消息节点

class MsgNode {
public:
    //这里的构造方法主要方便后续调用Send接口构造消息节点
	MsgNode(char* msg, short data_len) : total_len(data_len + HEAD_LENGTH), cur_len(0) {
		_data = new char[total_len + 1];
		memcpy(_data, &data_len, HEAD_LENGTH);
		memcpy(_data + HEAD_LENGTH, msg, data_len);
		_data[total_len] = '\0';
	}
    //这里的构造方法则是用于在进行切包过程中构造处理数据的节点
	MsgNode(short data_len) :total_len(data_len), cur_len(0) {
		_data = new char[total_len + 1];
	}
    //Clear方法是用于清理节点的数据,避免多次构造析构节点
	void Clear() {
		memset(_data, 0, total_len);
		cur_len = 0;
	}
	~MsgNode() {
		delete[] _data;
	}
private:
	friend class Session;
	//表示已经处理的数据长度
	int cur_len;
	//表示处理数据的总长度
	int total_len;
	//表示数据的首地址
	char* _data;
};

完善两个构造函数和添加Clear函数

1、第一个构造方法主要方便后续调用Send接口构造消息节点
2、第二个构造方法则是用于在进行切包过程中构造处理数据的节点
3、Clear方法是用于清理节点的数据,避免多次构造析构节点

session类完善

_recv_msg_node用于存放收到数据包中的数据

_b_head_parse表示头部是否解析完成

_recv_head_node用于存放接收到数据包中的头部信息

完善hand_read回调函数

void Session::handle_read(const boost::system::error_code& ec, size_t bytes_transferred,
	std::shared_ptr<Session> self_shared) {
	if (ec) {
		std::cout << "read error, error code: " << ec.value() <<
			" read message: " << ec.message() << std::endl;
		Close();
		server_->ClearSession(uuid);
	}
	else {
		PrintRecvData(data_, bytes_transferred);
		std::chrono::milliseconds dura(2000);
		std::this_thread::sleep_for(dura);
		
		//已经移动的字节数
		int copy_len = 0;
		while (bytes_transferred) {
			//头部尚未解析完成
			if (!_b_head_parse) {
				//收到的数据不足头部大小,这种情况很少发生
				if (bytes_transferred + _recv_head_node->cur_len < HEAD_LENGTH) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_head_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}

				//走到这里,说明收到的数据大于头部,可能是一个粘连的数据包,但是首先需要将头部节点两字节读完

				//处理头部剩余未复制的长度
				int head_remain = HEAD_LENGTH - _recv_head_node->cur_len;
				if (head_remain) {
					memcpy(_recv_head_node->_data + _recv_head_node->cur_len, data_ + copy_len, head_remain);
					//更新已处理的数据
					copy_len += head_remain;
					/*
					* 这里不能更新头部节点的cur_len。
					* 因为
					* 1、当一次进来cur_len等于0,处理之后的偏移量copy_len就为2
					* 2、当头部未读取完成,后续读取会修正为正确的偏移量(但是种情况很少发生)
					* 3、之后的读取头部信息都会发生覆盖
					*/
					//_recv_head_node->cur_len += head_remain;
					bytes_transferred -= head_remain;
				}

				//获取头部数据
				short data_len = 0;
				memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
				std::cout << "data_len is " << data_len << std::endl;

				if (data_len > MAX_LENGTH) {
					std::cout << "invalid data length is " << data_len << std::endl;
					server_->ClearSession(uuid);
					return;
				}

				//头部节点处理完成,就可以开始处理数据域的数据节点
				_recv_msg_node = std::make_shared<MsgNode>(data_len);

				//消息长度小于头部规定长度,说明数据未收全,则先将消息放到接收节点中
				if (bytes_transferred < data_len) {
					memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
					_recv_msg_node->cur_len += bytes_transferred;
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));

					//表示头部处理完成,当下次进来的时候,就会直接跳过头部处理环节
					_b_head_parse = true;
					return;
				}

				//走到这里表示消息长度大于头部规定长度,这里可能是一个完整包,也可能是多个粘连的包
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, data_len);
				_recv_msg_node->cur_len += data_len;
				copy_len += data_len;
				bytes_transferred -= data_len;
				_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
				std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

				//调用send发送给客户端
				Send(_recv_msg_node->_data, _recv_msg_node->total_len);

				//继续轮询处理下个未处理的数据,重置数据包和头部解析的情况
				_b_head_parse = false;
				_recv_msg_node->Clear();
				//说明这不是一个多个粘连的数据包
				if (bytes_transferred <= 0) {
					memset(data_, 0, MAX_LENGTH);
					sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
						std::bind(&Session::handle_read, this,
							std::placeholders::_1, std::placeholders::_2, self_shared));
					return;
				}
				//走到这里说明这就是一个多个粘连的数据包
				continue;
			}

			//走到这里就说明头部是已经解析完成的,是处理数据未收全的情况
			int remain_msg = _recv_msg_node->total_len - _recv_msg_node->cur_len;
			//说明收到的数据仍然不足头部规定大小的情况
			if (bytes_transferred < remain_msg) {
				memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, bytes_transferred);
				_recv_msg_node->cur_len += bytes_transferred;
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}

			//走到这里说明收到的数据是大于等于头部规定大小的,接收到的数据可能是个完整的数据包,也可能多个粘连的数据包
			memcpy(_recv_msg_node->_data + _recv_msg_node->cur_len, data_ + copy_len, remain_msg);
			_recv_msg_node->cur_len += remain_msg;
			bytes_transferred -= remain_msg;
			copy_len += remain_msg;
			_recv_msg_node->_data[_recv_msg_node->total_len] = '\0';
			std::cout << "receive data is: " << _recv_msg_node->_data << std::endl;

			//处理完当前数据包的分割后,调用send接口向客户端发送回去
			Send(_recv_msg_node->_data, _recv_msg_node->total_len);

			//继续轮询处理下个数据包,重置接收数据节点和头部解析情况
			_b_head_parse = false;
			_recv_msg_node->Clear();
			//说明数据包并不是粘连的
			if (bytes_transferred <= 0) {
				memset(data_, 0, MAX_LENGTH);
				sock_.async_read_some(boost::asio::buffer(data_, MAX_LENGTH),
					std::bind(&Session::handle_read, this,
						std::placeholders::_1, std::placeholders::_2, self_shared));
				return;
			}
			//走到这里说明数据包是粘连的
			continue;	
		}
	}
}

这里hand_read函数的完善逻辑代码比较长,其中的注释给的比较详细,需要各位仔细读。但是逻辑可能头一两次读可能还是会有些蒙,多读几遍可能就会好得多。

这里还是得必要得说一下,我们都知道异步读写函数得回调函数中的参数bytes_transferred表示已经读取到的字节数,但是我们在这里还是需要对这些已经读到的数据进行处理。其中定义copy_len表示已经处理的字节数,bytes_transferred则表示为还未处理的数据(尽管已经被读取到了,但是还是尚未被处理,需要好好理解下)。

这里在session类中还定义了两个宏,MAX_LENGTH表示数据包的最大长度,就是1024*2字节。HEAD_LENGTH表示头部长度,就是2字节。

这里我也画了一个逻辑图供大家梳理这里的代码逻辑,希望能对大家理解有帮助。

粘包现象的测试

在session类中写一个打印函数,在每次触发读事件回调的时候调用下这个函数。这里打印的是tcp缓冲区的数据,boost asio从tcp已经是已经做了将tcp缓冲区的数据拿出来的,所以这里打印即可。

为了制造粘包现象,我们可以让服务器端隔2s处理一次读写,而客户端则不停的发送和读取就能制造出粘包现象了。下边是提供的客户端的代码。

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024 * 2;
const int HEAD_LENGTH = 2;
int main()
{
	//测试粘包现象客户端
	try {
		//创建上下文服务
		boost::asio::io_context   ioc;
		//构造endpoint
		tcp::endpoint  remote_ep(address::from_string("127.0.0.1"), 1234);
		tcp::socket  sock(ioc);
		boost::system::error_code   error = boost::asio::error::host_not_found;
		sock.connect(remote_ep, error);
		if (error) {
			cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
			return 0;
		}

		thread send_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				const char* request = "hello world!";
				size_t request_length = strlen(request);
				char send_data[MAX_LENGTH] = { 0 };
				memcpy(send_data, &request_length, 2);
				memcpy(send_data + 2, request, request_length);
				boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
			}
			});

		thread recv_thread([&sock] {
			for (;;) {
				this_thread::sleep_for(std::chrono::milliseconds(2));
				cout << "begin to receive..." << endl;
				char reply_head[HEAD_LENGTH];
				size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
				short msglen = 0;
				memcpy(&msglen, reply_head, HEAD_LENGTH);
				char msg[MAX_LENGTH] = { 0 };
				size_t  msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

				std::cout << "Reply is: ";
				std::cout.write(msg, msglen) << endl;
				std::cout << "Reply len is " << msglen;
				std::cout << "\n";
			}
			});

		send_thread.join();
		recv_thread.join();
	}
	catch (std::exception& e) {
		std::cerr << "Exception: " << e.what() << endl;
	}
	return 0;
}

现象如下图,测试环境Windows visual studio 

完整服务端代码:codes-C++: C++学习 - Gitee.com

这里的echo服务器实现了粘包的处理,但是在不同的平台下仍存在收发数据异常的问题,其根本原因就是平台大小端的差异。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/757329.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

昇思25天学习打卡营第11天|SSD目标检测

1. 学习内容复盘 模型简介 SSD&#xff0c;全称Single Shot MultiBox Detector&#xff0c;是Wei Liu在ECCV 2016上提出的一种目标检测算法。使用Nvidia Titan X在VOC 2007测试集上&#xff0c;SSD对于输入尺寸300x300的网络&#xff0c;达到74.3%mAP(mean Average Precision)…

amis源码 更新组件数据域的几种方法

更新组件数据域的几种方法&#xff1a; 默认都是合并数据&#xff0c;非覆盖(指定replace为true的才是覆盖)&#xff1a; const comp amisScoped.getComponentById(id);//或者getComponentByName(name) 1.comp.setData(values, replace); //更新多个值values&#xff0c; r…

开启网络监控新纪元:免费可视化工具助力网络信息链路拓扑监控大屏

在数字化浪潮汹涌的今天&#xff0c;网络已成为我们生活、工作的不可或缺的一部分。然而&#xff0c;你是否曾经想过&#xff0c;在这个庞大的网络世界中&#xff0c;是谁在默默守护着每一条信息的传输&#xff0c;确保我们的数据安全、稳定地抵达目的地&#xff1f; 网络信息链…

昇思MindSpore学习总结五——网络构建

1、网络构建 神经网络模型是由神经网络层和Tensor操作构成的&#xff0c;mindspore.nn提供了常见神经网络层的实现&#xff0c;在MindSpore中&#xff0c;Cell类是构建所有网络的基类&#xff0c;也是网络的基本单元。一个神经网络模型表示为一个Cell&#xff0c;它由不同的子C…

【机器学习300问】136、C4.5虽然改善了ID3决策树算法的部分缺点,但还是有不足,请问还有更好的算法吗?CART算法构建决策树

一、C4.5算法仍存在的不足 &#xff08;1&#xff09;计算效率不高 C4.5使用的信息增益率计算涉及熵的对数计算&#xff0c;特别是当属性值数量大时&#xff0c;计算成本较高。 &#xff08;2&#xff09;处理连续数值属性不够高效 ID3算法只能处理离散属性&#xff0c;需要预…

STM32CUBEMX配置USB虚拟串口

STM32CUBEMX配置USB虚拟串口 cubemx上默认配置即可。 外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传 配置完后生成工程&#xff0c;主要就是要知道串口的收发接口就行了。 发送&#xff1a;CDC_Transmit_FS()&#xff0c;同时记得包含头文件#include “…

转运机器人:智能物流的得力助手

在物流行业&#xff0c;转运机器人已经成为提高转运效率、降低成本的重要工具。而富唯智能转运机器人凭借其出色的性能和智能化的设计&#xff0c;成为了众多企业的得力助手。 富唯智能转运机器人采用了先进的AMR控制系统&#xff0c;可以一体化控制移动机器人并实现与产线设备…

电脑提示vcomp140.dll丢失的几种有效的解决方法,轻松搞定dll问题

在电脑使用过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是找不到vcomp140.dll。那么&#xff0c;究竟什么是vcomp140.dll呢&#xff1f;为什么会出现找不到vcomp140.dll的情况呢&#xff1f;本文将从vcomp140.dll的定义、常见原因、对电脑的影响以及解…

go语言DAY7 字典Map 指针 结构体 函数

Go中Map底层原理剖析_go map底层实现-CSDN博客 目录 Map 键值对key,value 注意&#xff1a; map唯一确定的key值通过哈希运算得出哈希值 一、 map的声明及初始化&#xff1a; 二、 map的增删改查操作&#xff1a; 三、 map的赋值操作与切片对比&#xff1a; 四、 通用所有…

最佳学习率和Batch Size缩放中的激增现象

前言 《Surge Phenomenon in Optimal Learning Rate and Batch Size Scaling》原文地址GitHub项目地址Some-Paper-CN。本项目是译者在学习长时间序列预测、CV、NLP和机器学习过程中精读的一些论文&#xff0c;并对其进行了中文翻译。还有部分最佳示例教程。如果有帮助到大家&a…

C语言学习记录(十二)——指针与数组及字符串

文章目录 前言一、指针和数组二、指针和二维数组**行指针(数组指针)** 三、 字符指针和字符串四、指针数组 前言 一个学习嵌入式的小白~ 有问题评论区或私信指出~ 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、指针和数组 在C语言中 &#xff0…

网盘挂载系统-知识资源系统-私域内容展示系统

系统介绍&#xff1a; 存储&#xff1a;一共支持约30款云盘存储&#xff0c;其中包括主流的&#xff08;百度网盘、阿里云盘、夸克云盘、迅雷云盘、蓝奏云、天翼云盘&#xff09;&#xff0c;部分展示 以及特别的&#xff08;一刻相册、对象存储、又拍云存储、SFTP、MEGA 网盘…

锁机制 -- 概述篇

锁机制 1、概述 ​  加锁是为了解决并发场景下&#xff0c;多个线程对同一资源同时进行操作&#xff0c;而导致同一线程多次操作出现结果不唯一的情况&#xff08;一次操作包含多条指令&#xff09;。结果不唯一发生的原因在于指令的错乱&#xff0c;前提条件是多线程环境及…

原子变量原理剖析

一、原子操作 原子操作保证指令以原子的方式执行&#xff0c;执行过程不被打断。先看一个实例&#xff0c;如下所示&#xff0c;如果thread_func_a和thread_func_b同时运行&#xff0c;执行完成后&#xff0c;i的值是多少&#xff1f; // test.c static int i 0;void thread…

013、MongoDB常用操作命令与高级特性深度解析

目录 MongoDB常用操作命令与高级特性深度解析 1. 数据库操作的深入探讨 1.1 数据库管理 1.1.1 数据库统计信息 1.1.2 数据库修复 1.1.3 数据库用户管理 1.2 数据库事务 2. 集合操作的高级特性 2.1 固定集合(Capped Collections) 2.2 集合验证(Schema Validation) 2.…

自组装mid360便捷化bag包采集设备

一、问题一&#xff1a;电脑太重&#xff0c;换nuc 采集mid360数据的过程中&#xff0c;发现了头疼的问题&#xff0c;得一手拿着电脑&#xff0c;一手拿着mid360来采集&#xff0c;实在是累胳膊。因此&#xff0c;网购了一个intel nuc, 具体型号是12wshi5000华尔街峡谷nuc12i…

Python私教张大鹏 PyWebIO通过事件回调实现表格的编辑和删除功能

从上面可以看出&#xff0c;PyWebIO把交互分成了输入和输出两部分&#xff1a;输入函数为阻塞式调用&#xff0c;会在用户浏览器上显示一个表单&#xff0c;在用户提交表单之前输入函数将不会返回&#xff1b;输出函数将内容实时输出至浏览器。这种交互方式和控制台程序是一致的…

在Ubuntu 18.04.6 LTS 交叉编译生成Windows 11下的gdb 8.1.1

1. 安装mingw sudo apt-get install mingw-w64 2. 下载 gdb 8.1.1源码 https://ftp.gnu.org/gnu/gdb/gdb-8.1.1.tar.gz 解压命令 tar -xf gdb-8.1.1.tar.gz 进入目录,创建build目录: hq@hq:~/gdb-8.1.1/build$ 执行配置 ../confi

视频云计算的未来发展趋势:智能化、个性化与云端协同助力智慧城市安防监控

随着信息技术的飞速发展&#xff0c;云计算作为一种全新的服务模式&#xff0c;正在改变我们处理数据和信息的方式。而视频云计算技术&#xff0c;作为云计算领域的一个重要分支&#xff0c;以其独特的优势&#xff0c;正在逐步渗透到我们生活的各个领域。 一、视频云计算技术…

[leetcode hot 150]第一百二十二题,买卖股票的最佳时机Ⅱ

题目&#xff1a; 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可以先购买&#xff0c;然后在 同一天 出售。 返回 你能获得的 最大…