【C++boost::asio网络编程】有关异步读写api的笔记

异步读写api

  • 异步写操作
    • async_write_some
    • async_send
  • 异步读操作
    • async_read_some
    • async_receive

  定义一个Session类,主要是为了服务端专门为客户端服务创建的管理类

class Session {
public:
    Session(std::shared_ptr<asio::ip::tcp::socket> socket);
    void Connect(const asio::ip::tcp::endpoint& ep);
private:
    std::shared_ptr<asio::ip::tcp::socket> _socket;
};

异步写操作

  在介绍异步写之前,需要先封装一个Node结构,用来管理发送的数据

class MsgNode
{
	friend class Session;
public:
	MsgNode(const char* msg, int total_len)
		:_total_len(total_len)
		,_cur_len(0)
	{
		_msg = new char[total_len];
		memcpy(_msg, msg, total_len);
	}
	MsgNode(int total_len)
		:_total_len(total_len)
		,_cur_len(0)
	{
		_msg = new char[_total_len];
	}
	~MsgNode()
	{
		delete[] _msg;
	}
private:
	char* _msg;
	int _total_len;
	int _cur_len;
};

  其中,_msg表示要发送的数据,_cur_len表示已经发送的长度,而_total_len表示数据的总长度

async_write_some

在这里插入图片描述
  通过源码可以看出,async_write_some需要两个参数。第一个参数是buffer结构的数据,用来放需要发送的数据;第二个参数是一个回调函数,这个回调函数又有两个参数,一个是用来存放错误码的对象,另一个是无符号整数(这个无符号整数代表的就是当前具体发送数据的大小)
  当调用完async_write_some之后(即一次异步写操作结束之后),系统会调用这个回调函数。

void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> node)
{
	if (node->_cur_len + bytes_transferred <= node->_total_len)
	{
		node->_cur_len += bytes_transferred;
		this->_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - node->_cur_len),
			std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
	}
}

void Session::WriteToSocketErr(const std::string& buf)
{
	_send_node = std::make_shared<MsgNode>(buf.c_str(), buf.size());
	_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node));
}

  在以上代码中,先在WriteToSocketErr函数中创建一个消息结点,然后调用async_write_some将数据发送出去。当一次写操作结束之后。系统会将错误码和已写入数据的长度作为参数给回调函数。

if (node->_cur_len + bytes_transferred <= node->_total_len)

  在回调函数中判断是否已经将数据全部发送出去了,如果没有,则更新_cur_len,然后继续执行异步发送操作
  但是,以上代码逻辑中存在一个漏洞。在异步执行的逻辑中,代码调用的顺序是不确定的。
  举个例子,当需要连续两次发送hello world

//连续两次调用
WriteToSocketErr("HelloWorld");
WriteToSocketErr("HelloWorld");

  可能会发生第一次进行写入的时候只写入了Hello,这时按照逻辑需要执行回调函数,当在回调函数中发现数据并没有发送完全,于是再次调用async_write_some想继续写入World,但此时第二次调用WriteToSocketErr("HelloWorld");中,已经提前一步调用了async_write_some并将数据全部写完,然后才轮到第一次发送时的回调函数将剩下的World继续发完。这最终导致的结果时对方收到的数据为HelloHelloWorldWorld.
  为了确保发送顺序的问题,可以在Session类中定义一个队列用来管理需要发送的结点和i一个布尔类型变量用来表示当前是否有数据正在被发送(初始化为false)

class Session{
public:
	Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
	:_socket(socket)
	,_send_pending(false)
	{}
    void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
    void WriteToSocket(const std::string &buf);
private:
    std::queue<std::shared_ptr<MsgNode>> _send_queue;
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    bool _send_pending;
};

  此时再对写操作进行改进

void Session::WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	std::shared_ptr<MsgNode>&node = _send_queue.front();
	node->_cur_len += bytes_transferred;
	if (node->_cur_len + bytes_transferred < node->_total_len)//还没有发送完
	{
		_socket->async_write_some(boost::asio::buffer(node->_msg + node->_cur_len, node->_total_len - bytes_transferred),
			std::bind(&WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_send_queue.pop();
	if (_send_queue.empty())
	{
		_send_pending = false;
	}
	else
	{
		std::shared_ptr<MsgNode>& node = _send_queue.front();
		_socket->async_write_some(boost::asio::buffer(node->_msg, node->_total_len),
			std::bind(&Session::WriteCallBack, std::placeholders::_1, std::placeholders::_2));
	}
}

void Session::WriteToSocket(const std::string& buf)
{
	_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
	if (_send_pending)//当前有消息正在发
	{
		return;
	}
	_socket->async_write_some(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_send_pending = true;
}

  在WriteToSocket函数中,先不着急将数据立马发送出去,而是将数据节点放入到发送队列中,然后判断当前是否有数据正在发送,如果有就返回避免冲突;没有就直接调用async_write_some,在回调函数中,永远都是取出队首的结点进行发送,如果判断队首的元素数据已经发送完了就pop掉,并且检查队列中是否还有需要发送的元素:如果有,继续执行发送逻辑;如果没有就将_send_pending置为false表示当前已经没有数据正在发送了。

async_send

  async_send的作用是直接将所有数据全部发送完,代码逻辑也比async_write_some要简单一些

void Session::WriteAllToSocket(const std::string& buf)
{
	_send_queue.push(std::make_shared<MsgNode>(buf.c_str(), buf.size()));
	if (_send_pending)
	{
		return;
	}
	_socket->async_send(boost::asio::buffer(buf.c_str(), buf.size()),
		std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
	_send_pending = true;
}

void Session::WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	_send_queue.pop();
	if (_send_queue.empty())
	{
		_send_pending = false;
	}
	else
	{
		std::shared_ptr<MsgNode>& node = _send_queue.front();
		_socket->async_send(boost::asio::buffer(node->_msg, node->_total_len),
			std::bind(&Session::WriteAllCallBck, this, std::placeholders::_1, std::placeholders::_2));
	}
}

注意
  async_sendasync_write_some不要放在一起使用,因为async_send底层还是多次调用的async_write_some。如果一起使用,还是会引发数据冲突的问题

异步读操作

  为了准备读操作,需要在Session类中添加数据结点_recv_node和一个布尔变量_recv_pending

class Session
{
public:
	Session(std::shared_ptr<boost::asio::ip::tcp::socket> socket)
		:_socket(socket)
		,_send_pending(false)
		,_recv_pending(false)
	{}
	void Connect(boost::asio::ip::tcp::endpoint& ep);
	void WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>);
	void WriteToSocketErr(const std::string& buf);

	void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
	void WriteToSocket(const std::string& buf);

	void WriteAllToSocket(const std::string& buf);
	void WriteAllCallBck(const boost::system::error_code& ec, std::size_t bytes_tranferred);

	void ReadFromSocket();
	void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

	void ReadAllFromSocket();
	void ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);

private:
	std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
	std::shared_ptr<MsgNode> _send_node;
	std::queue<std::shared_ptr<MsgNode>> _send_queue;
	std::shared_ptr<MsgNode> _recv_node;
	bool _recv_pending;
	bool _send_pending;
};

  由于接收的数据在TCP缓冲区里面已经是排好序了的,所以并不需要队列来维护顺序

async_read_some

其实异步读和异步写的逻辑类似,这里就不多介绍了

void Session::ReadFromSocket()
{
	if (_recv_pending)
	{
		return;
	}
	_recv_node = std::make_shared<MsgNode>(RECVSIZE);
	_socket->async_read_some(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
		std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
	{
		_recv_node->_cur_len += bytes_transferred;
		_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
			std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_recv_pending = false;
}

async_receive

void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	if (_recv_node->_cur_len + bytes_transferred < _recv_node->_total_len)
	{
		_recv_node->_cur_len += bytes_transferred;
		_socket->async_read_some(boost::asio::buffer(_recv_node->_msg + _recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len),
			std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));
		return;
	}
	_recv_pending = false;
}

void Session::ReadAllFromSocket()
{
	if (_recv_pending)
	{
		return;
	}
	_recv_node = std::make_shared<MsgNode>(RECVSIZE);
	_socket->async_receive(boost::asio::buffer(_recv_node->_msg, _recv_node->_total_len),
		std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2));
	_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred)
{
	if (ec.value() != 0)
	{
		std::cout << "Error! Code is " << ec.value() << ".Message is " << ec.message() << std::endl;
		return;
	}
	_recv_pending = false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/927902.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter如何适配RTL

阿拉伯语和希伯来语等是使用的从右到左书写的文字系统。世界上估计有4.22亿人以阿拉伯语做为母语。使用从右至左的人口可以说是更多了。所以对于出海项目来说&#xff0c;是不能忽视的一部分。 RTL可以说是本地化适配中比较麻烦的一项&#xff0c;并没有多语言适配来的简单。RT…

【Django-xadmin】

时间长不用,会忘的系列 1、Django-xadmin后台字段显示处理 主要是修改每个模块下adminx.py文件 代码解释&#xff1a;第1行控制表单字段显示第2行控制列表字段显示第3行控制搜索条件第4行控制过滤条件第5行支持单个或多个字段信息修改第6行列表分页&#xff0c;每页显示多少行…

Pytest --capture 参数详解:如何控制测试执行过程中的输出行为

--capture 选项用于控制测试用例执行过程中标准输出&#xff08;stdout&#xff09;和标准错误输出&#xff08;stderr&#xff09;的捕获行为。 --capture 的选项值&#xff1a; fd&#xff08;默认&#xff09; 捕获文件描述符级别的输出&#xff08;stdout 和 stderr&#x…

整合SSM框架:构建Java Web应用

目录 简介 项目结构 配置文件详解 db.properties mybatis-config.xml spring-mybatis.xml springmvc.xml web.xml pom.xml 整合步骤 为什么这样整合&#xff1f; 简介 SSM框架整合指的是Spring、Spring MVC和MyBatis三个开源框架的整合。这种整合方式在Java Web开发…

Solidity开发智能合约

05-Solidity开发智能合约 0 Solidity和智能合约 Solidity开发可运行的智能合约步骤&#xff1a; 源代码通过编译成字节码&#xff08;Bytecode&#xff09;&#xff0c;同时会产生二进制接口规范&#xff08;ABI&#xff09; 通过交易将字节码部署到以太坊网络&#xff0c;部署…

Java基础之控制语句:开启编程逻辑之门

一、Java控制语句概述 Java 中的控制语句主要分为选择结构、循环结构和跳转语句三大类&#xff0c;它们在程序中起着至关重要的作用&#xff0c;能够决定程序的执行流程。 选择结构用于根据不同的条件执行不同的代码路径&#xff0c;主要包括 if 语句和 switch 语句。if 语句有…

Vue教程|搭建vue项目|Vue-CLI2.x 模板脚手架

一、项目构建环境准备 在构建Vue项目之前&#xff0c;需要搭建Node环境以及Vue-CLI脚手架&#xff0c;由于本篇文章为上一篇文章的补充&#xff0c;也是为了给大家分享更为完整的搭建vue项目方式&#xff0c;所以环境准备部分采用Vue教程&#xff5c;搭建vue项目&#xff5c;V…

shell脚本30个案例(五)

前言&#xff1a; 通过一个多月的shell学习&#xff0c;总共写出30个案例&#xff0c;分批次进行发布&#xff0c;这次总共发布了5个案例&#xff0c;希望能够对大家的学习和使用有所帮助&#xff0c;更多案例会在下期进行发布。 案例二十一、系统内核优化 1.问题&#xff1…

分布式集群下如何做到唯一序列号

优质博文&#xff1a;IT-BLOG-CN 分布式架构下&#xff0c;生成唯一序列号是设计系统常常会遇到的一个问题。例如&#xff0c;数据库使用分库分表的时候&#xff0c;当分成若干个sharding表后&#xff0c;如何能够快速拿到一个唯一序列号&#xff0c;是经常遇到的问题。实现思…

ChatGPT/AI辅助网络安全运营之-数据解压缩

在网络安全的世界中&#xff0c;经常会遇到各种压缩的数据&#xff0c;比如zip压缩&#xff0c;比如bzip2压缩&#xff0c;gzip压缩&#xff0c;xz压缩&#xff0c;7z压缩等。网络安全运营中需要对这些不同的压缩数据进行解压缩&#xff0c;解读其本意&#xff0c;本文将探索一…

C++小问题

怎么分辨const修饰的是谁 是限定谁不能被改变的&#xff1f; 在C中&#xff0c;const关键字的用途和位置非常关键&#xff0c;它决定了谁不能被修改。const可以修饰变量、指针、引用等不同的对象&#xff0c;并且具体的作用取决于const的修饰位置。理解const的规则能够帮助我们…

Docker中配置Mysql主从备份

Mysql配置主从备份 一、Docker中实现跨服务器主从备份二、配置步骤1.配置主库2.配置从库3.遇到问题3.其它使用到的命令 一、Docker中实现跨服务器主从备份 在 Docker 中配置 MySQL 主从备份主要通过 MySQL 主从复制实现 二、配置步骤 1.配置主库 # 进入mysql主库容器 docke…

下载maven 3.6.3并校验文件做md5或SHA512校验

一、下载Apache Maven 3.6.3 Apache Maven 3.6.3 官方下载链接&#xff1a; 二进制压缩包&#xff08;推荐&#xff09;: ZIP格式: https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zipTAR.GZ格式: https://archive.apache.org/dist/…

C++趣味编程:基于树莓派Pico的模拟沙漏-倾斜开关与LED的互动实现

沙漏,作为一种古老的计时工具,利用重力让沙子通过狭小通道,形成了计时效果。在现代,我们可以通过电子元件模拟沙漏的工作原理。本项目利用树莓派Pico、倾斜开关和LED,实现了一个电子沙漏。以下是项目的详细技术解析与C++代码实现。 一、项目概述 1. 项目目标 通过倾斜开关…

pycharm链接neo4j(导入文件)

1.新建csv文件 2.写入文件 3.运行代码 import csv from py2neo import Graph, Node, Relationship# 连接到Neo4j数据库&#xff0c;使用Bolt协议 graph Graph("bolt://localhost:7687", auth("neo4j", "password"))# 读取CSV文件 with open(…

【C++】多线程

目录 一 概念 1 多线程 2 多进程与多线程 3 多线程理解 二 创建线程 1 thread 2 join() 和 detach() 3 this_thread 三 std::mutex 1 lock 和 unlock 2 lock_guard 3 unique_lock 四 condition_variable 五 std::atomic 一 概念 1 多线程 在C11之前&#xff0…

Kafka 图形化工具 Eagle安装

Kafka 图形化工具 Eagle 3.0.1版本安装 1、安装JDK jdk安装 2、安装kafka 如未安装kafka&#xff0c;需要先安装完kafka 3、下载kafka-eagle 官网下载地址 wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz #移动到安装目录 mv v3.0.1.tar.gz…

vue实现echarts饼图自动轮播

echarts官网&#xff1a;Examples - Apache ECharts echartsFn.ts 把echarts函数封装成一个文件 import * as echarts from "echarts";const seriesData [{"value": 12,"name": "过流报警"},{"value": 102,"name&qu…

CSS动画案例6

目录 一、介绍二、静态页面的布局1.HTML部分2.CSS 三、动画交互四、结束语 一、介绍 本节内容我们来仿照一个网站&#xff08;戳我进网站&#xff09;的进入页面时的doing动画部分&#xff0c;首先是大标题从最小变为最大&#xff0c;然后是副标题从下向上走&#xff0c;最后是…

Proteus8.17下载安装教程

Proteus是一款嵌入式系统仿真开发软件&#xff0c;实现了从原理图设计、单片机编程、系统仿真到PCB设计&#xff0c;真正实现了从概念到产品的完整设计&#xff0c;其处理器模型支持8051、HC11、PIC10/12/16/18/24/30/DsPIC33、AVR、ARM、8086和MSP430等&#xff0c;能够帮助用…