Python多进程间通讯(包含共享内存方式)

文章目录

  • 1 通过非共享内存配合队列方式
  • 2 通过共享内存配合队列方式


注:本博文测试环境为Linux系统。


1 通过非共享内存配合队列方式

下面是一个常见的生产者与消费者的模式示例,这里分别启动了两个子进程,一个为生产者(producer)一个为消费者(consumer)。生产者负责生产Numpy的NDArray数据(这里为了体现进程间传递数据会耗时故创建的NDArray的shape比较大),然后将数据放入队列Queue。消费者监控队列Queue一旦有数据就取出并简单打印下shape信息和填充的Value信息。

import time
import multiprocessing as mp
from multiprocessing import Process, Queue

import numpy as np


def producer_task(queue: Queue):
    for i in range(10):
        data = np.full(shape=(1, 3, 2048, 2048), fill_value=i, dtype=np.float32)
        queue.put(data)
        time.sleep(0.1)

    # send exit signal
    queue.put(None)
    print("producer exit.")


def consumer_task(queue: Queue):
    while True:
        data = queue.get()
        if data is None:
            break

        print(f"get data shape:{data.shape}, fill value:{data[0][0][0][0]}")

    print("consumer exit.")


def main():
    queue = Queue()
    producer = Process(target=producer_task, args=(queue,), name="producer")
    consumer = Process(target=consumer_task, args=(queue,), name="consumer")

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()


if __name__ == '__main__':
    mp.set_start_method("spawn")
    main()

执行以上代码终端输出以下内容:

get data shape:(1, 3, 2048, 2048), fill value:0.0
get data shape:(1, 3, 2048, 2048), fill value:1.0
get data shape:(1, 3, 2048, 2048), fill value:2.0
get data shape:(1, 3, 2048, 2048), fill value:3.0
get data shape:(1, 3, 2048, 2048), fill value:4.0
get data shape:(1, 3, 2048, 2048), fill value:5.0
get data shape:(1, 3, 2048, 2048), fill value:6.0
get data shape:(1, 3, 2048, 2048), fill value:7.0
get data shape:(1, 3, 2048, 2048), fill value:8.0
get data shape:(1, 3, 2048, 2048), fill value:9.0
producer exit.
consumer exit.

为了进一步看清进程之间传递数据的过程,这里使用viztracer工具进一步分析(直接通过pip install viztraver即可安装)。使用指令如下,其中main.py就是上面的代码内容。跑完后会在当前目录下生成一个result.json文件。

viztracer main.py

通过如下指令可视化result.json文件:

vizviewer result.json

在终端输入上述指令后,终端会提示你打开网页并进入http://localhost:9001,如果使用的是VSCODE IDE在右下角也会提示你打开浏览器。
在这里插入图片描述

在这里插入图片描述
可以看到生产者进程在将数据放入队列后会先进行ForkingPickler.dump即数据序列化的过程,大概耗时12ms。然后开始posix.write即开始将数据从一个进程传递到另一个进程,大概耗时34ms。最后在消费者进程进行_pickle.loads即数据的反序列化,大概耗时6ms。从生产者进程将数据放入队列到消费者进程拿到数据总耗时约53ms。从这个示例中可以看到,当在进程间传递的数据量很大时会很耗时。


2 通过共享内存配合队列方式

下面示例代码将传递的数据改为了共享内存的方式,这样可以大幅减小进程间数据传递的成本。这里主要是使用multiprocessing库中的shared_memory.SharedMemory对象。创建新的共享内存时需要将create参数设置为True(如果是复用已有的共享内存时设置为False),然后指定具体的size大小,该参数为数据的字节大小,比如要申请一块存放数据类型为float32shape(1, 3, 2048, 2048)的空间所需字节数为1 * 3 * 2048 * 2048 * 4float32为4个字节)。根据Python官方文档介绍,当一个进程不在使用该共享内存时应关闭指向共享内存的文件描述符/句柄,具体做法是调用共享内存对象的close方法。当某块共享内存不在需要时,需在最后一个使用到的进程中调用unlink方法显示释放掉(如果不调用该方法,共享内存会一直存在,如果后续再不断申请新的共享内存则会出现共享内存泄露的问题,或者当程序未正常退出时该共享内存块会成为僵尸共享内存?)。例如在当前示例中,生产者进程创建了共享内存并放入队列里后可调用close方法关闭当前进程指向共享内存的文件描述符/句柄,在消费者进程中拿到数据并消费完后除了调用close方法外还会调用unlink方法删除该共享内存。有关共享内存的详细介绍看查看Python官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html#multiprocessing.shared_memory.SharedMemory

import time
import multiprocessing as mp
from multiprocessing import Process, Queue, shared_memory

import numpy as np


def producer_task(queue: Queue):
    for i in range(10):
        shm = shared_memory.SharedMemory(
            name=f"data_{i}",
            create=True,
            size=1 * 3 * 2048 * 2048 * 4
        )
        np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)
        np_data.fill(i)

        queue.put(shm.name)
        shm.close()
        time.sleep(0.1)

    # send exit signal
    queue.put(None)
    print("producer exit.")


def consumer_task(queue: Queue):
    while True:
        shm_name = queue.get()
        if shm_name is None:
            break

        shm = shared_memory.SharedMemory(name=shm_name, create=False)
        np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)
        print(f"get data shape:{np_data.shape}, fill value:{np_data[0][0][0][0]}")
        shm.close()
        shm.unlink()

    print("consumer exit.")


def main():
    queue = Queue()
    producer = Process(target=producer_task, args=(queue,), name="producer")
    consumer = Process(target=consumer_task, args=(queue,), name="consumer")

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()


if __name__ == '__main__':
    mp.set_start_method("spawn")
    main()

同样我们使用viztracer来看看进程间的通讯情况:
在这里插入图片描述

数据从生产者进程传递到消费者进程耗时为245us相比之前不使用共享内存方法的53ms,速度比值为53000/245≈216X,提升还是非常明显的。但是这有个很奇怪的现象我无法理解,就是在生产者进程中调用close方法用了1.8ms,而在消费者进程里调用close方法只用了15us,unlink用了8us,如果有知道的大神希望能帮忙解释下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915306.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv11实战宠物狗分类

本文采用YOLOv11作为核心算法框架,结合PyQt5构建用户界面,使用Python3进行开发。YOLOv11以其高效的特征提取能力,在多个图像分类任务中展现出卓越性能。本研究针对5种宠物狗数据集进行训练和优化,该数据集包含丰富的宠物狗图像样本…

游戏引擎学习第八天

视频参考: https://www.bilibili.com/video/BV1ouUPYAErK/ 理解下面的代码 关于虚函数 代码分解 结构体 foo 的定义: struct foo {int32 X;int64 Y;virtual void Bar(int c); };foo 结构体有两个成员变量:X(int32 类型)和 Y&…

我要学kali-linux之shell脚本编程1

声明! 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&a…

尽量通俗易懂地概述.Net U nity跨语言/跨平台相关知识

本文参考来自唐老狮,Unity3D高级编程:主程手记,ai等途径 仅作学习笔记交流分享 目录 1. .Net是什么? 2. .Net框架的核心要点? 跨语言和跨平台 .Net x Unity跨平台发展史 Net Framework 2002 Unity跨平台之 Mono 2004 Unity跨平台之 IL2CPP 2015 二者区别 .NET Core …

大陆 ARS513 / ARS510 标准雷达(解析二)

1。GW_ACU (0x40) • GW_ACU_LongAccel Longitudinal acceleration of ego vehicle. • GW_ACU_LongAccel_ValidFlag Valid flag of signal “GW_ACU_LongAccel”. • GW_ACU_LateralAccel Lateral acceleration of ego vehicle. Signal quality requirements for “GW_ACU_La…

【游戏引擎之路】登神长阶(十四)——OpenGL教程:士别三日,当刮目相看

【游戏引擎之路】登神长阶(十四)——OpenGL教程:士别三日,当刮目相看 2024年 5月20日-6月4日:攻克2D物理引擎。 2024年 6月4日-6月13日:攻克《3D数学基础》。 2024年 6月13日-6月20日:攻克《3D…

【C++动态规划】2304. 网格中的最小路径代价|1658

本文涉及知识点 C动态规划 LeetCode2304. 网格中的最小路径代价 给你一个下标从 0 开始的整数矩阵 grid ,矩阵大小为 m x n ,由从 0 到 m * n - 1 的不同整数组成。你可以在此矩阵中,从一个单元格移动到 下一行 的任何其他单元格。如果你位…

数据中台解决方案

文件是关于数据中台解决方案的详细介绍,内容涵盖了数据中台的定义、建设方案、实施步骤、以及在数字化转型中的作用。以下是对文件内容的分析和总结: 1. 数字化转型背景 国家政策支持:提到了《中华人民共和国国民经济和社会发展第十四个五年…

JS 实现WebSocket通讯和什么是WebSocket

WebSocket 介绍: WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信。它允许服务器主动向客户端推送信息,客户端也能实时接收服务器的响应。 客户端 这里实现了将input内的内容发送给客户端,并将接收到的服务器的…

K8S单节点部署及集群部署

1.Minikube搭建单节点K8S 前置条件:安装docker,注意版本兼容问题 # 配置docker源 wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo# 安装docker环境依赖 yum install -y yum-utils device-m…

算法闭关修炼百题计划(六)

塔塔开(滑稽 1.删除排序链表中的重复元素2.删除排序链表中的重复元素II3.字典序的第k小数字4.下一个排列5.排序链表6.随机链表的复制7.数据流的中位数 1.删除排序链表中的重复元素 使每个元素就出现一次 class Solution { public:ListNode* deleteDuplicates(ListNode* head)…

PH热榜 | 2024-11-13

DevNow 是一个精简的开源技术博客项目模版,支持 Vercel 一键部署,支持评论、搜索等功能,欢迎大家体验。 在线预览 1. Agree.com 标语:人人免费电子签名! 介绍:Agree,这款由人工智能驱动的平台…

PTE-中间件安全

DOCKER环境,一般是80 8080 8081端口 1 apache位置扩展名解析漏洞 cd vulhub-master/httpd/apache_parsing_vulnerability/ docker-compose up -d 修改一句话的后缀 直接上传 蚁剑 2 CVE-2017-15715 docker-compose stop cd .. cd CVE-2017-15715/ dock…

【Linux】Github 仓库克隆速度慢/无法克隆的一种解决方法,利用 Gitee 克隆 Github 仓库

Github 经常由于 DNS 域名污染以及其他因素克隆不顺利。 一种办法是修改 hosts sudo gedit /etc/hosts加上一行 XXX.XXX.XXX.XXX github.comXXX 位置的 IP 可以通过网站查询 IP/服务器github.com的信息-站长工具 这种方法比较适合本身可以克隆,但是速度很慢的…

Elasticsearch 8.16:适用于生产的混合对话搜索和创新的向量数据量化,其性能优于乘积量化 (PQ)

作者:来自 Elastic Ranjana Devaji, Dana Juratoni Elasticsearch 8.16 引入了 BBQ(Better Binary Quantization - 更好的二进制量化)—— 一种压缩向量化数据的创新方法,其性能优于传统方法,例如乘积量化 (Product Qu…

MySQL数据库常用命令大全(完整版——表格形式)

✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 ✨特色专栏&#xff1a…

微型导轨在自动化生产线中起什么作用?

在现代制造业的飞速跃进中,自动化生产线的蓬勃发展引领了一场效率与质量的双重革命。微型导轨作为传动领域的重要零部件,可用于工业自动化生产线上的零件运输、加工设备定位等,实现自动化生产和减少人力成本。那么,微型导轨在自动…

Flutter 小技巧之 Shader 实现酷炫的粒子动画

在之前的《不一样的思路实现炫酷 3D 翻页折叠动画》我们其实介绍过:如何使用 Shader 去实现一个 3D 的翻页效果,具体就是使用 Flutter 在 3.7 开始提供 Fragment Shader API ,因为每个像素都会过 Fragment Shader ,所以我们可以通…

c++实现B树(下)

书接上回小吉讲的是B树的搭建和新增方法的实现(blog传送门🚪:B树实现上)(如果有小可爱对B树还不是很了解的话,可以先看完上一篇blog,再来看小吉的这篇blog)。那这一篇主要讲的是B树中…

【Oracle篇】掌握SQL Tuning Advisor优化工具:从工具使用到SQL优化的全方位指南(第六篇,总共七篇)

💫《博主介绍》:✨又是一天没白过,我是奈斯,DBA一名✨ 💫《擅长领域》:✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux,也在扩展大数据方向的知识面✌️…