基于准静态自适应环型缓存器(QSARC)的taskBus万兆吞吐实现

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
      • 1. 数据结构
      • 2. 自适应计算队列大小
      • 3. 生产者拼接缓存
      • 4. 高效地通知消费者
    • 小结
      • 1. 性能表现情况
      • 2. 主要改进
      • 3. 源码和发行版

概要

准静态自适应环形缓存器(Quasi-Static Adaptive Ring Cache)是taskBus用于数据吞吐的软件结构。

  • 准静态:缓存器的大小并不是静态分配,而是随着吞吐需求的提高,缓慢增长,并最终适应最高峰时的内存消耗。当缓存器已经达峰后,不再有堆内存分配。
  • 自适应:根据包大小的不同,缓存器在恒定的最大峰值内存容积下,根据统计获得包的大小数据,决定环状缓存队列元素的个数、每个元素的大小。
  • 环形: 队列收尾相接形成环状,生产者、消费者使用两个时钟前后相随,时钟本身采用atomic保护,无需额外的锁。

使用该缓存器,基于增强管道数据流转技术(EPDR)的业余软线无线电平台taskBus可在Linux 系统 i7 6700K 主频 4GHz下达到3GBps(24Gbps)的总交换能力。该交换能力被各个通道均分,共同支撑taskBus平台按照工程的连接关系,把各个生产者产出的数据包及时、完整、有序地输送给消费者。尽管采用了本技术,但与采用内核层面的函数进行管道直接交换的性能极限还差了20倍。

整体架构流程

taskBus的整体架构是一种多进程的合作机制,平台管理N个子进程,各个进程可以充当消费者和生产者的角色。平台收集各个子进程的stdout输出,并按照用户给定的消费关系,送入消费者的stdout.在这样的整体架构下,数据流转流程如下:

stdout
stdin
stdin
stdin
stdout
生产者1
平台 QSARC
消费者1
消费者2
消费者2
生产者2

在202409版本之前,taskBus使用的是消费者队列。由1个生产者产出的数据,会复制N份进入各个消费者的队列。这种情况下,平台既要为生产者部位保留一个用于包拼接的缓存,又要做N次memcpy。在消费者显著多于生产者时,吞吐能力下降的很厉害。

202409版本之后,设计成每份包只有唯一的1个副本,存储在生产者队列。消费者根据索引去拉取:

  • 平台为每个生产者维护1个环状缓存队列。
  • 平台按照消费关系,把每个包的队列位置索引播发给各个消费者。
  • 播发时,会维护一个待消费计数器,将值加1
  • 消费者消费包后,会将计数器的值减1
  • 如果生产者队列绕了一圈,发现下一个位置的计数器不为0,说明消费的速度赶不上生产速度,此时平台不再接受生产者的数据,等待。

整个流程如下图所示(动图):

Queue
注意的是,上图中队列的大小是4个包,即使包消费完毕,可用的容积也不会释放,这样,随着程序的运行,渐渐地动态内存分配会越来越少,速度会越来越快。当然,如果包长是固定的,则不存在此问题,可以提前全部分配。

值得注意的是,在时钟11时,因为下一个位置依旧有消费者在驻留,因此生产被暂停等待。这种架构的缺点是整体的吞吐能力存在“木桶效应”,即由消费最慢的消费者决定1秒内能够流转多少包。

技术名词解释

  • 包:用于一次功能操作的数据单位,可以理解为1段连续内存的数据。包由包头、数据段组成。包头含有长度指示,正常的数据包之间是紧密衔接的。
  • 时钟:用于控制生产、消费的整形变量,从0开始无限增长。每处理1个包,时钟会加1。
  • 当前位置:生产者、消费者操作队列的当前位置,数值= 时钟 % 队列大小。

技术细节

1. 数据结构

生产者为taskNode类型,拥有如下成员变量维护自己的生产队列:

QVector<int> m_status_stdin;
QVector<QByteArray> m_array_stdin;
QVector<qsizetype> m_size_stdin;
QVector<QAtomicInteger<qsizetype> > m_cnt_stdin;

变量1 控制生产的状态。管道到来的数据,是一个无限有序无损流,类似TCP。但是,每次流可能会被切断,导致包头、数据块可能被切割到多次调用里。这个变量用于在各次调用中记忆上次的生产状态。

变量2 就是缓存的内存本身。这个列表的QByteArray元素个数是缓存的包个数,每个QByteArray会保持在历史最大包的高位。这样,只有到来的包大于当前体积时,才会重新分配内存。

变量3 存储缓存内当前的写入位置。如果包已经写完了,则等于包长。如果是空闲,则为0。

变量4 就是待消费计数器,每次生产完毕1个包,会根据消费者等待队列广播包的索引(时钟),并把计数设置为消费者个数。消费者消费完,会减一。

2. 自适应计算队列大小

这种队列存在1个重大的问题,就是很难控制内存的用量。由于各个QByteArray都只增加不释放,因此期望的用量是:

C = M ∗ N C = M * N C=MN

M是最大包大小,N是队列包个数。

由于taskBus设计时,建议包的大小小于64KB,在假设包的种类小于K时,可以预先统计前K个包的最大长度M,从而确定按照最大内存门限,如C=128MB,要设置的N。

N = C / M N = C/M N=C/M

当然,这种算法是极为简陋的。这是建立在taskBus的具体应用场景上的。对包变化幅度很大、无法简单统计的情况,要考虑一定的shink策略,如在每次消费指针归0时,裁剪队列的大小,并释放尾部的内存。

//Adjust buf size
			if (m_pos_stdin==8)
			{
				qsizetype sza = 0;
				for (int ia = 0;ia<8;++ia)
					if (sza < m_size_stdin[ia])
						sza = m_size_stdin[ia];
				if (sza < 32)
					sza = 32;
				m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;
				if (m_bufsize_adjust > m_bufsize_stdin)
					m_bufsize_adjust = m_bufsize_stdin;
				if (m_bufsize_adjust < 8)
					m_bufsize_adjust = 8;
				emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());
			}

3. 生产者拼接缓存

在当前生产位置上,平台为生产者拼接一个完整的包。这里用到了状态机。状态机拥有如下状态:

状态名取值意义
头部捕获中0正在捕获头部
数据缓存中1正在根据头部指示的状态,缓存数据
数据缓存完毕2包接收完整,触发消费。
数据缓存完毕3触发消费完毕,待回收。

在每个状态上,都会进行进程管道读操作,不断的从stdout获取数据。直到状态2,会触发消费,并在全部消费通知播发后,进入状态3。当下一次生产者访问状态3的队列成员时,如果没有消费者还在消费这个包,则会把包位置归零,状态归零。否则,会阻塞生产者,直到消费者消费完毕为止。

void taskNode::slot_readyReadStandardOutput()
{
	LOG_PROFILE("IO","Start Recieving packs.");
	qsizetype total_sz = m_process->size();
	int badHeader = 0;
	while (total_sz)
	{
		const qsizetype pos = m_pos_stdin % m_bufsize_adjust;
		QByteArray & curr_array = m_array_stdin[pos];
		qsizetype & readBufMax = m_size_stdin[pos];
		QAtomicInteger<qsizetype> & cnt = m_cnt_stdin[pos];
		int & stat = m_status_stdin[pos];
		//生产者被阻塞了,因为下一个缓存位置依旧有消费者在消费数据。
		if (cnt>0)
			break;
		//Old data
		if (stat==3)
		{
			readBufMax = 0;
			stat = 0;
		}
		auto * header =	reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
		//Header
		if (stat==0)
		{
			if (total_sz<sizeof(TASKBUS::subject_package_header))
				break;
			auto needRead = sizeof(TASKBUS::subject_package_header) - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax == sizeof(TASKBUS::subject_package_header))
			{
				if (header->prefix[0]==0x3C && header->prefix[1]==0x5A &&	header->prefix[2]==0x7E && header->prefix[3]==0x69)
				{
					stat = 1;
				}
				else
				{
					++badHeader;
					readBufMax = 0;
				}
			}
			Q_ASSERT(readBufMax <= sizeof(TASKBUS::subject_package_header));
		}
		//data
		if (stat==1)
		{
			const qsizetype packAllSize = sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (curr_array.size()<packAllSize)
			{
				curr_array.resize(packAllSize);
				header = reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());
			}
			auto needRead = packAllSize - readBufMax ;
			auto red = m_process->read(curr_array.data()+readBufMax,needRead);
			readBufMax += red;
			if (readBufMax==packAllSize)
			{
				stat = 2;
			}
			Q_ASSERT(readBufMax <= packAllSize);
		}

		//Send
		if (stat==2)
		{
			const qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;
			extern QAtomicInteger<quint64>  g_totalrev;
			g_totalrev += readBufMax;
			++m_spackage_sent;
			m_sbytes_sent += sizeof(TASKBUS::subject_package_header)+header->data_length;
			if (header->subject_id == TB_SUBJECT_CMD)
			{
				//Command must endwith \0
				const char * pCmd = (const char *)header+sizeof(TASKBUS::subject_package_header);
				QString cmd = QString::fromUtf8(pCmd,header->data_length);
				QMap<QString, QVariant> map_z = taskCell::string_to_map(cmd);
				//remember uuid
				if (map_z.contains("source"))
				{
					if(m_uuid.size()==0 )
						m_uuid = map_z["source"].toString();
					if (map_z.contains("destin"))
						emit sig_new_command(map_z);
				}
			}
			else if (m_currPrj)
				m_currPrj->routing_new_package(this,pos);
			if (m_bDebug)
				log_package(true,(char *)header,pack_size);
			stat = 3;
			++m_pos_stdin;
			//Adjust buf size
			if (m_pos_stdin==8)
			{
				qsizetype sza = 0;
				for (int ia = 0;ia<8;++ia)
					if (sza < m_size_stdin[ia])
						sza = m_size_stdin[ia];
				if (sza < 32)
					sza = 32;
				m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;
				if (m_bufsize_adjust > m_bufsize_stdin)
					m_bufsize_adjust = m_bufsize_stdin;
				if (m_bufsize_adjust < 8)
					m_bufsize_adjust = 8;
				emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());
			}
		}
		total_sz = m_process->size();
	}

	if (badHeader)
		emit_message(QByteArray("Error header recieved. "
								"Header must be 0x3C, 0x5A, 0x7E,"
								" 0x69. Aborting."));
	LOG_PROFILE("IO","End Recieving packs.");
}

4. 高效地通知消费者

如果每个包很小,则QEvent触发的密度会很大,开销很大。我们设置一个消费者的索引队列,存储待消费的生产者队列、索引:

	QMutex m_mtx_queue;
	QList<taskNode *> m_write_queue;
	QList<qsizetype> m_write_pos;

而后,只在队列为0时,触发Event。

bool taskNode::enqueue_write(taskNode * node, qsizetype pos)
{
	m_mtx_queue.lock();
	int z = m_write_queue.size() + m_write_cmd.size();
	m_write_queue.push_back( node);
	m_write_pos.push_back(pos);
	m_mtx_queue.unlock();
	if (!z)
	{
		QCoreApplication::postEvent(this,new QEvent(m_nPackEvent));
	}
	return  true;
}

同时,在消费时,一次性获取队列,并清空。这样减少锁的碰撞。

void taskNode::flush_write()
{
	m_mtx_queue.lock();
	QList<taskNode *> write_queue = m_write_queue;
	QList<qsizetype> write_pos = m_write_pos;
	QByteArrayList write_cmd = m_write_cmd;
	m_write_queue.clear();
	m_write_pos.clear();
	m_write_cmd.clear();
	//qDebug()<<write_queue.size();
	m_mtx_queue.unlock();

	while (write_queue.size())
	{
		taskNode * node = write_queue.first();
		qsizetype pos = write_pos.first();

		write_queue.pop_front();
		write_pos.pop_front();

		QByteArray & arr = node->get_stdin_array(pos);
		m_process->write(arr.constData(),sz);
		--cnt;
	}

小结

尽管采用了环形队列,由于在QProcess上还是存在mem-alloc,这使得峰值状态下CPU占用还是很大的。整体速率距离PCI总线和DDR4的能力还相去甚远,即使和bash直接管道连接相比,也有不小的差距。不过,为了构造灵活的管道吞吐能力,允许数据被多对多流转和反馈回环,损失一些性能也差强人意。

1. 性能表现情况

通过上述操作,taskBus的吞吐能力得到了保证,在只使用用户态的内存操作情况下,缓存64MB时,可以获得10Gbps以上的性能。在Linux下,可达 24Gbps,即3GBps1

平台系统峰值吞吐单路流量平均来回延迟
i7-10700U1Linux x643354MBps1340MBps1ms
i7-6700KLinux x642844MBps1050MBps2.2ms
i7-10700U2win10 home x641561MBps400MBps22ms
i7-6700Kwin10 home x641345MBps340MBps40ms
RaspberryPi 4(8GB)Rasbain 64263MBps102MBps6ms

上表是运行双进程点对点双向PING的状态下达到的。多进程下,总速率会稍高。可以发现,同样的硬件配置下:

  • windows下taskBus的吞吐能力要比Linux少1倍
  • windows下taskBus的带宽利用率要低于Linux,总速率/单路速率Linux更优。
  • windows下的延迟更大。

这是非常奇怪的现象,讲起来windows应该更快才对。对于里面的细节原因,只有后面慢慢研究了。

2. 主要改进

与2024年8月版本相比,少了一层生产者–>消费者的memcpy,转而只是传递int类型的索引,在生产者:消费者个数=1:N的情况下,吞吐能力会得到较大提高。这种memcpy次数的降低,对于老旧CPU影响更大,即使在2进程互PING(1:1)的测试中,也能达到 20-30%的提速。

3. 源码和发行版

源码和发行版参考

GitCode.net

或者

GitCode.com


  1. i7-10700U是一个笔记本上的低功耗cpu,在最大睿频4GHz下的Manjaro Linux系统上,3GBps持续了5秒。由于温度上升,温度墙导致频率下降到1.8GHz,速度降低1倍。 ↩︎ ↩︎

  2. i7-10700U是一个笔记本上的低功耗cpu,通过在windows-10下去除温度墙,可以在97摄氏度的高温状态下,保持在4GHz,维持1.5GBps的流量。 ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/873582.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32F407VET6开发板RT-Thread MSH 串口的适配

相关文章 STM32F407VET6开发板RT-Thread的移植适配 环境 STM32F407VET6 开发板&#xff08;魔女&#xff09;&#xff0c;http://www.stm32er.com/ Keil MDK5&#xff0c;版本 5.36 串口驱动 RT-Thread 通过适配 串口驱动&#xff0c;可以使用 MSH shell 当前手动搭建的 …

详细分析linux中的MySql跳过密码验证以及Bug(图文)

目录 1.问题所示2. 基本知识3. 解决方法3.1 跳过验证Bug3.2 设定初始密码 1.问题所示 发现密码验证错误&#xff0c;遗失密码 2. 基本知识 停止MySQL服务&#xff1a;sudo systemctl stop mysql 以跳过权限表模式启动MySQL&#xff1a;sudo mysqld_safe --skip-grant-tables …

论文解读 | ACL2024 Outstanding Paper:因果指导的主动学习方法:助力大语言模型自动识别并去除偏见...

点击蓝字 关注我们 AI TIME欢迎每一位AI爱好者的加入&#xff01; 点击阅读原文观看作者直播讲解回放&#xff01; 作者简介 孙洲浩&#xff0c;哈尔滨工业大学SCIR实验室博士生 概述 尽管大语言模型&#xff08;LLMs&#xff09;展现出了非常强大的能力&#xff0c;但它们仍然…

MATLAB-基于高斯过程回归GPR的数据回归预测

目录 目录 1 介绍 1. 1 高斯过程的基本概念 1.2 核函数&#xff08;协方差函数&#xff09; 1.3 GPR 的优点 1.4. GPR 的局限 2 运行结果 3 核心代码 1 介绍 高斯过程回归&#xff08;Gaussian Process Regression, GPR&#xff09;是一种强大的非参数贝叶斯方法&…

如何用GPU算力卡P100玩黑神话悟空?

精力有限&#xff0c;只记录关键信息&#xff0c;希望未来能够有助于其他人。 文章目录 综述背景评估游戏性能需求显卡需求CPU和内存系统需求主机需求显式需求 实操硬件安装安装操作系统Win11安装驱动修改注册表选择程序使用什么GPU 安装黑神话悟空其他 综述 用P100 PCIe Ge…

一台手机一个ip地址吗?手机ip地址泄露了怎么办

在数字化时代&#xff0c;‌手机作为我们日常生活中不可或缺的一部分&#xff0c;‌其网络安全性也日益受到关注。‌其中一个常见的疑问便是&#xff1a;‌“一台手机是否对应一个固定的IP地址&#xff1f;‌”实际上&#xff0c;‌情况并非如此简单。‌本文首先解答这一问题&a…

Linux_kernel移植rootfs10

一、动态更改内核 1、low level&#xff08;静态修改&#xff09; 【1】将led_drv.c拷贝到kernel/drivers/char/目录中 【2】修改当前目录下的Makefile文件 obj-y led_drv.o #将新添加的驱动文件加入到Makefile文件中 【3】退回kernel目录&#xff0c;执行make uImage …

C语言学习笔记 Day16(C10文件管理--下)

Day16 内容梳理&#xff1a; C语言学习笔记 Day14&#xff08;文件管理--上&#xff09;-CSDN博客 C语言学习笔记 Day15&#xff08;文件管理--中&#xff09;-CSDN博客 目录 Chapter 10 文件操作 10.5 文件状态 10.6 文件的随机读写 fseek()、rewind() &#xff08;1&…

【初阶数据结构】详解栈和队列(来自知识星空的一抹流光)

文章目录 前言1. 栈1.1 栈的概念及结构1.2 栈的实现1.2.1 "栈"实现的选择 1.3 栈的代码实现1.3.1 栈的结构体定义&#xff08;用的是顺序表&#xff09;1.3.2 栈的头文件设置1.3.3 栈的各功能的实现 2. 队列2.1 队列的概念及结构2.2 "队列"实现的选择2.3 队…

【即时通讯】轮询方式实现

技术栈 LayUI、jQuery实现前端效果。django4.2、django-ninja实现后端接口。 代码仓 - 后端 代码仓 - 前端 实现功能 首次访问页面并发送消息时需要设置昵称发送内容为空时要提示用户不能发送空消息前端定时获取消息&#xff0c;然后展示在页面上。 效果展示 首次发送需要…

深入理解数据库的 4NF:多值依赖与消除数据异常

在数据库设计中&#xff0c; "范式" 是一个常常被提到的重要概念。许多初学者在学习数据库设计时&#xff0c;经常听到第一范式&#xff08;1NF&#xff09;、第二范式&#xff08;2NF&#xff09;、第三范式&#xff08;3NF&#xff09;以及 BCNF&#xff08;Boyce-…

滑动窗口在算法中的应用

滑动窗口是一种经典的算法技巧&#xff0c;就像在处理一系列动态数据时&#xff0c;用一扇可以滑动的“窗口”来捕捉一段连续的子数组或子字符串。通过不断地移动窗口的起点或终点&#xff0c;我们能够以较低的时间复杂度来解决一系列问题。在这篇文章中&#xff0c;我们将通过…

维信小程序禁止截屏/录屏

一、维信小程序禁止截屏/录屏 //录屏截屏,禁用wx.setVisualEffectOnCapture({visualEffect:hidden});wx.setVisualEffectOnCapture(Object object) 测试安卓手机&#xff1a; 用户截屏&#xff0c;被禁用 用户录屏&#xff0c;录制的是空白内容/黑色内容的视频。 二、微信小…

C++ | Leetcode C++题解之第386题字典序排数

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> lexicalOrder(int n) {vector<int> ret(n);int number 1;for (int i 0; i < n; i) {ret[i] number;if (number * 10 < n) {number * 10;} else {while (number % 10 9 || numbe…

EasyPlayer.js网页H5 Web js播放器能力合集

最近遇到一个需求&#xff0c;要求做一款播放器&#xff0c;发现能力上跟EasyPlayer.js基本一致&#xff0c;满足要求&#xff1a; 需求 功性能 分类 需求描述 功能 预览 分屏模式 单分屏&#xff08;单屏/全屏&#xff09; 多分屏&#xff08;2*2&#xff09; 多分屏…

【阿一网络安全】如何让你的密码更安全?(二) - 非对称加密

上次《【阿一网络安全】如何让你的密码更安全&#xff1f;(一) - 对称加密》提到加密算法的对称加密&#xff0c;我们这次来聊聊非对称加密。 和对称加密不同&#xff0c;非对称加密的加密密钥和解密密钥不同。 非对称加密 大概过程就是&#xff0c;发送方使用公钥对明文数据…

mac 安装redis

官网下载指定版本的redis https://redis.io/ 目前3.2.0 是最新最稳定的 版本 这里是历史版本下载 下载指定版本 安装 1.放到自定义目录下并解压 2.打开终端&#xff0c;执行命令 cd redis的安装目录下 make test -- 此命令的作用是将redis源代码编译成可执行文件&#xff0c…

SPI驱动学习五(如何编写SPI设备驱动程序)

目录 一、SPI驱动程序框架二、怎么编写SPI设备驱动程序1. 编写设备树2. 注册spi_driver3. 怎么发起SPI传输3.1 接口函数3.2 函数解析 三、示例1&#xff1a;编写SPI_DAC模块驱动程序1. 要做什么事情2. 硬件2.1 原理图2.2 连接 3. 编写设备树4. 编写驱动程序5. 编写app层操作程序…

C++语法知识点合集:11.模板

文章目录 一、非类型模板参数1.非类型模板参数的基本形式2.指针作为非类型模板参数3.引用作为非类型模板参数4.非类型模板参数的限制和陷阱&#xff1a;5.几个问题 二、模板的特化1.概念2.函数模板特化3.类模板特化(1)全特化(2)偏特化(3)类模板特化应用示例 三、模板分离编译1.…

微带结环行器仿真分析+HFSS工程文件

微带结环行器仿真分析HFSS工程文件 工程下载&#xff1a;微带结环行器仿真分析HFSS工程文件 我使用HFSS版本的是HFSS 2024 R2 参考书籍《微波铁氧体器件HFSS设计原理》和视频微带结环行器HFSS仿真 1、环形器简介 环行器是一个有单向传输特性的三端口器件&#xff0c;它表明…