C# 基于共享内存实现跨进程队列

C# 进程通信系列

第一章 共享内存
第二章 共享队列(本章)


文章目录

  • C# 进程通信系列
  • 前言
  • 一、实现原理
    • 1、用到的主要对象
    • 2、创建共享内存
    • 3、头部信息
    • 4、入队
    • 5、出队
    • 6、释放资源
  • 二、完整代码
  • 三、使用示例
    • 1、传输byte[]数据
    • 2、传输字符串
    • 3、传输对象
  • 总结


前言

进程通信一般情况下比较少用,但是也有一些使用场景,有些做视频传输的似乎会用多进程来实现,还有在子进程中调用特定的库来避免内存泄漏,笔者最近也遇到了需要使用多进程的场景。多进程的使用最主要的就是进程间的通信,本文参考了go语言的ipc库,实现了一个基于共享内存的跨进程队列。


一、实现原理

1、用到的主要对象

//共享内存管理对象
MemoryMappedFile _mmf;
//跨进程的互斥变量
Mutex _mtx;
//入队信号量
Semaphore _semaEq;
//出队信号量
Semaphore _semaDq;

2、创建共享内存

创建共享内存需要使用MemoryMappedFile.CreateFromFile实现跨平台。CreateNew只能创建无法打开第二个,OpenExisting只支持windows。

string name="共享内存标识名称";
_shmPath="共享内存文件路径"+name;
//通过文件路径创建共享内存
_mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false);
//创建互斥变量
_mtx = new Mutex(false, Name + ".mx");
//创建入队信号量,capacity为队列元素个数容量
_semaEq = new Semaphore(0, (int)capacity, name+ ".eq");
//创建出队信号量,capacity为队列元素个数容量
_semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");

获取读写对象

_mmva = _mmf.CreateViewAccessor();

值类型数组方式写入

T[] obj;
_mmva.WriteArray<T>(position , obj, 0, obj.Length);

值类型数组方式读取

T[] arr=new T[n];
_mmva.ReadArray(position, arr, 0, arr.ArrayLength);

3、头部信息

采用循环队列方式实现,判断队空队满通过count、capacity的方式(参考了C#的Queue源码),避免占用多一个空间。

struct QueueHeader
{
    //元素大小
    public nint ElementSize;
    //队列容量
    public nint Capacity;
    //当前元素个数
    public nint Count;
    //队列头
    public nint Front;
    //队列尾
    public nint Rear;
}

队列头信息需要存储在共享内存中。

QueueHeader Header
{
    get
    {
        QueueHeader header;
        _mmva.Read(0, out header);
        return header;
    }
    set
    {
        _mmva.Write(0, ref value);
    }
}

4、入队

示例如下

bool Enqueuee<T>(T[] obj) where T : struct
{    
    //共享内存中读取header
    var header = Header;
    //队列满返回
    if (header.Count == header.Capacity) return false;
    //计算写入的位置,头部长度+队尾*元素大小
    nint position = _QueuetHeaderSize + header.Rear * header.ElementSize;
    //写入共享内存
    _mmva.WriteArray<T>(position, obj, 0, obj.Length);
    //更新队尾
    header.Rear = (header.Rear + 1) % header.Capacity;
    //更新长度
    header.Count++;
    //更新头部信息到共享内存
    Header=header;
    return true;
}

同步

 //等待出队信号量(如果队列满则会等待)
 if (!_semaDq.WaitOne(timeout)) return false;
 //进入互斥锁
 if (!_mtx.WaitOne(timeout)) return false;
 try
 {   
     //入队
     Enqueue(obj);
 }
 finally
 {
     //通知入队信号量
     _semaEq.Release();
     //释放互斥锁
     _mtx.ReleaseMutex();
 }
 return true;

5、出队

object Dequeue()
{   
    //共享内存中读取header
    var header = Header;
    //队列空则返回
    if (header.Count == 0) return null;
    //计算读取的位置,头部长度+队头*元素大小
    long position = _QueuetHeaderSize + header.Front * header.ElementSize;
    //创建数据用于装载数据
    Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength);
    //将泛型转type调用。
    var readArray = _ReadArrayGeneric.MakeGenericMethod(readType);
    //读取共享内存的数据
    readArray.Invoke(_mmva, [position , arr, 0, arr.Length]);
    //更新队头
    header.Front = (header.Front + 1) % header.Capacity;
    //更新长度
    header.Count--;
    //更新头部信息到共享内存
    Header = header;
    return msg;
}

同步

 //等待入队信号量(如果队列空则会等待)
if (!_semaEq.WaitOne(timeout)) return null;
 //进入互斥锁
if (!_mtx.WaitOne(timeout!)) return null;
try
{ 
   //出队
    return  Dequeue();
}
finally
{
     //通知入队信号量
    _semaDq.Release();
     //释放互斥锁
    _mtx.ReleaseMutex();
}

6、释放资源

/// <summary>
/// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
/// </summary>
public void Dispose()
{
    _mmf.Dispose();
    _mmva.Dispose();
    _mtx.Dispose();
    _semaEq.Dispose();
    _semaDq.Dispose();
}

二、完整代码

类的定义

/// <summary>
/// 共享队列
/// 基于共享内存实现
/// </summary>
class SharedQueue : IDisposable
{
    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; private set; }
    /// <summary>
    /// 元素最大大小
    /// </summary>
    public long ElementMaxSize { get; private set; }
    /// <summary>
    /// 队列容量
    /// </summary>
    public long Capacity { get; private set; }
    /// <summary>
    /// 表示是否新创建,是则是创建,否则是打开已存在的。
    /// </summary>
    public bool IsNewCreate { get; private set; }
    /// <summary>
    /// 构造方法
    /// </summary>
    /// <param name="name">唯一名称,系统级别,不同进程创建相同名称的本对象,就是同一个队列,可以进行数据传输。</param>
    /// <param name="capacity">队列容量,元素个数总量</param>
    /// <param name="elementBodyMaxSize">队列元素最大大小,此大小需要考虑传输数据Type.FullName长度</param>
    public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728);
    /// <summary>
    /// 发送数据
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。
    /// 此方法队列满了会阻塞,直到发送成功才返回。
    /// </param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    public void Send(object obj, bool isForceSerialize = false);
    /// <summary>
    /// 接收数据
    /// 此方法队列空会阻塞,直到有数据才返回。
    /// </summary>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换</returns>
    public object Receive();
    /// <summary>
    /// 发送数据超时
    /// </summary>
    /// <param name="obj">发送的对象,支持值类型(元类型、结构体)、值类型数组、可json序列化的任意对象(实体类、数组、List、字典等等),无法序列化会产生异常。
    /// 会根据类型自动判断传输方式,值类型以及值类型数组会直接内存拷贝,引用类型会进行序列化。</param>
    /// <param name="timeout">超时时长</param>
    /// <param name="isForceSerialize">是否强制序列化,结构体不含引用的情况下会直接复制数据性能较高,但是如果结构体成员变量有引用类型则会引发异常,此时可以强制序列化。</param>
    /// <returns>true发送成功,false超时</returns>
    public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false);
    /// <summary>
    /// 接收超时
    /// </summary>
    /// <param name="timeout">超时时长</param>
    /// <returns>接收的数据,与send的数据类型对应。可以通过type或is判断,或者提前知道类型直接转换。
    /// 超时返回null。
    /// </returns>
    public object? ReceiveTimeout(TimeSpan timeout);
    /// <summary>
    /// 销毁队列,只会销毁当前实例,如果多个队列打开同个名称,其他队列不受影响
    /// </summary>
    public void Dispose();
}

项目
vs2022 .net 8.0 控制台项目
https://download.csdn.net/download/u013113678/89544650


三、使用示例

1、传输byte[]数据

进程a

SharedQueue shq= new SharedQueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//发送数据
shq.send(a);

进程b

SharedQueue shq= new SharedQueue("shq1", 10);
//接收数据
var a=shq.Receive() as byte[];
Console.Write("receive: ");
foreach (var i in a)
{
    Console.Write(i);
}

在这里插入图片描述

2、传输字符串

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
shq.send("12345");

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as string;
Console.WriteLine("receive: " + a);

在这里插入图片描述

3、传输对象

class A
{
    public string Name;
    public int Number;
}

进程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
sq.Send(new A() { Name = "Tommy", Number = 102185784 });

进程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as A;
Console.WriteLine("receive: " + a.Name + " " + a.Number);

在这里插入图片描述


总结

以上就是今天要讲的内容,实现这样的一个对象,虽然代码量不多,但还是有一点难度的,很多细节需要处理,比如泛型转type以统一接口,信号量实现队列和条件变量是有差异的,用CreateFromFile才能实现跨平台。总的来说,有了这样的一个队列,跨线程通信就变的比较方便且高效了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/801644.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

持续集成03--Jenkins的安装与配置

前言 在持续集成/持续部署&#xff08;CI/CD&#xff09;的实践中&#xff0c;Jenkins作为一个开源的自动化服务器&#xff0c;扮演着至关重要的角色。本篇“持续集成03--Jenkins的安装配置”将带您走进Jenkins的世界&#xff0c;深入了解如何在Linux环境中安装并配置Jenkins。…

window下安装go环境

一、go官网下载安装包 官网地址如下&#xff1a;https://golang.google.cn/dl/ 选择对应系统的安装包&#xff0c;这里是window系统&#xff0c;可以选择zip包&#xff0c;下载完解压就可以使用 二、配置环境变量 这里的截图配置以win11为例 我的文件解压目录是 D:\Software…

web自动化测试selenium的基本使用

目录 初始化浏览器并打开网页 定位网页元素 定位的方法 模拟键盘操作 模拟鼠标操作 xpath方法 xpath结点 路径表达式 轴 selenium是一个很流行的自动化测试的库&#xff0c;主要用于模拟浏览器的运行&#xff0c;是web应用测试的工具。 在使用selenium时&#xff0c;…

C++基础篇(2)

目录 前言 1.缺省参数 2.函数重载 2.1函数重载的基本规则 ​编辑2.2注意事项 2.3 重载解析&#xff08;Overload Resolution&#xff09;--补充内容 3.引用 3.1引用的概念和定义 3.2引用的特性 3.3引用的使用 3.4const引用 4.指针和引用的关系 结束语 前言 上节小编…

16_Shell好用工具:sed

16_Shell好用工具&#xff1a;sed 零、语法解析 sed [选项参数] [模式匹配/sed命令] 文件 命令说明aadd&#xff0c;新增iinsert&#xff0c;新增cchange&#xff0c;修改ssubstitute&#xff0c;替换ddelete&#xff0c;删除pprint, 打印 通常与 -n 连用 一、增&#xff08;…

【JavaScript】聊一聊js中的浅拷贝与深拷贝与手写实现

前言 什么是深拷贝与浅拷贝&#xff1f;深拷贝与浅拷贝是js中处理对象或数据复制操作的两种方式。‌在聊深浅拷贝之前咱得了解一下js中的两种数据类型&#xff1a; 基本数据类型&#xff08;6种&#xff09;String、Number、Object、Boolean、null、undefined、symbol&#xff…

数据结构——线性表(C语言实现)

写在前面&#xff1a; 在前面C语言的结构体学习中&#xff0c;我提及了链表的操作&#xff0c; 学习数据结构我认为还是需要对C语言的数组、函数、指针、结构体有一定的了解&#xff0c;不然对于结构体的代码可能很难理解&#xff0c;特别是一些书籍上面用的还是伪代码&#xf…

Day07-员工管理-上传下载

1.员工管理-导出excel 导出员工接口返回的是二进制axios配置responseType为blob接收二进制流文件为Blob格式按装file-saver包&#xff0c;实现下载Blob文件npm install add file-saver导出员工excel的接口 (src/api/employee.js) export function exportEmployee(){return req…

普通人还有必要学习 Python 之类的编程语言吗?

在开始前分享一些编程的资料需要的同学评论888即可拿走 是我根据网友给的问题精心整理的对于编程的重要性&#xff0c;这里就不详谈了。 未来&#xff0c;我们和机器的交流会越来越多&#xff0c;编程可以简单看作是和机器对话并分发给机器任务。机器不仅越来越强大&#xff0…

芯课堂 | Synwit_UI_Creator(ugui)平台之PC端界面设计篇

​今天小编给大家介绍的是华芯微特面向小尺寸TFT-LCD屏驱市场量身打造的Synwit_UI_Creator&#xff08;ugui&#xff09;自研开发套件。 UI_Creator&#xff08;ugui&#xff09;开发套件分为上位机和下位机&#xff0c;以下如无特指&#xff0c;上位机即为PC端设计器/仿真器&…

【香橙派AiPro】基于VGG16的火灾检测模型预测

目录 引言开发板介绍开发板使用准备工作工具文档 拨码开关镜像烧录连接开发板下载MobaXterm网线-SSH连接开发板设置WIFI连接WIFI-SSH连接开发板确定开发板IP方法 Vnc可视化WindowsiPad 开发工具安装 散热风扇基于VGG16的火灾检测模型预测数据集准备目录结构代码操作 安装宝塔最…

RISC-V在线反汇编工具

RISC-V在线反汇编工具&#xff1a; https://luplab.gitlab.io/rvcodecjs/#q34179073&abifalse&isaAUTO 不过&#xff0c;似乎&#xff0c;只支持RV32I、RV64I、RV128I指令集&#xff1a;

2024大模型十大趋势

2024大模型十大趋势 关键要点一、机器外脑时代的智慧探索二、机器外脑、创意生成和情感陪伴三、大模型驱动的新未来&#xff1a;AI带来创意转化与机遇四、人物-行为-场景一体化&#xff1a;未来人工智能的新范式五、未来数字内容生产的基础设施六、共创、共建、共享智能美好未来…

【入门篇】2.3 STM32启动模式(一)

一,Boot引脚分步 二,启动电路 三,启动模式 STM32F4 根据 BOOT 引脚的电平选择启动模式,这两个 BOOT 引脚根据外部施加的电平来决定芯片的启动地址。 下表中 BOOT0 和 BOOT1 是 STM32 芯片上面的两个引脚,用于控制 STM32

哪个牌子充电宝好用?实测倍思、西圣、安克充电宝,哪个值得入手

目前充电宝已经成为我们日常出行的重要依靠。然而&#xff0c;共享充电宝不仅价格昂贵&#xff0c;而且还存在诸多安全隐患&#xff0c;让我们用起来总是不太放心。为了帮大家找到既好用又实惠且安全的充电宝&#xff0c;我们对倍思、西圣、安克这三个热门品牌的充电宝进行了深…

Ubuntu/Linux 安装ITKSnap

文章目录 1. 安装ITKSnap1.1 下载后安装 2.进入opt文件夹改名3. 更改启动项4. 创建硬链接5. 添加桌面启动方式6. 即可使用 1. 安装ITKSnap http://www.itksnap.org/pmwiki/pmwiki.php?nMain.HomePage 1.1 下载后安装 找到下载的文件夹&#xff0c;文件夹内打开terminal。复…

提升代码质量:利用策略模式优化Spring Boot应用的设计

&#x1f4e3;前言 在Spring Boot中使用策略模式&#xff08;Strategy Pattern&#xff09;是一种常见的设计模式实践&#xff0c;它允许在运行时选择算法的行为。策略模式定义了一系列的算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以互换。这样做的好处是使算法…

MyBatis源码中的设计模式1

1. 建造者模式的应用 建造者模式属于创建类模式&#xff0c;通过一步一步地创建一个复杂的对象&#xff0c;能够将部件与其组装过程分开。用户只需指定复杂对象的类型&#xff0c;就可以得到该对象&#xff0c;而不需要了解其内部的具体构造细节。《Effective Java》中也提到&…

CH552G使用IAP下载

常见下载中的方式ISP&#xff0c;IAP&#xff0c;ICP 参考&#xff0c;CH552G中文手册&#xff0c;参考1 ISP&#xff1a;In System Programing&#xff0c;在系统编程。是常见的&#xff0c;使用软件&#xff0c;先将某个引脚&#xff08;例如boot&#xff09;连接到合适的电…

领航Linux UDP:构建高效网络新纪元

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 文章目录 引言Udp和Tcp的异同相同点不同点总结 1.1、socket1.2、bind1.3、recvfrom1.4、sendto2.1、代码2.1、说明3.1、代码3.2、说明 引言 在前几篇博客中&#xff0c;我们学习了Linux网络编程中的一些概念。…