背景
最近在开发实践中,接触到了需要多进程开发的场景。众所周知,进程和线程最大的区别就在于:进程是资源分配的最小单位,线程是cpu调度的最小单位。对于多进程开发来说,每一个进程都占据一块独立的虚拟内存空间,想要在多进程之间交互难道只能选择管道(PIPE)、套接字(sokect)、远程过程调用(RPC)这种麻烦的东西吗?
实际上在本地进行多进程开发的时候,只要本进程处于你的系统之下,你可以在自己的父进程中设定共享内存区域,使得子进程可以访问该共享内存变量。
共享内存
在Python中,进程间共享变量最常用的方法之一是通过共享内存。Python的multiprocessing
模块提供了对共享内存的支持,主要通过Value、Array、Manager
等类来实现。
使用multiprocessing.Value
共享单个值
multiprocessing.Value
用于在多个进程之间共享一个基本数据类型(如int, float, string等)的值。
import multiprocessing
import time
def writer(shared_value):
for i in range(5):
shared_value.value = i
print(f"Writer: {shared_value.value}")
time.sleep(1)
def reader(shared_value):
while True:
if shared_value.value != -1: # 使用-1作为结束信号
print(f"Reader: {shared_value.value}")
time.sleep(0.5)
else:
break
if __name__ == "__main__":
# 创建一个共享的int值
shared_value = multiprocessing.Value('i', 0) # 'i'表示int类型
# 创建进程
p1 = multiprocessing.Process(target=writer, args=(shared_value,))
p2 = multiprocessing.Process(target=reader, args=(shared_value,))
# 启动进程
p1.start()
p2.start()
# 等待writer进程完成
p1.join()
# 发送结束信号给reader
shared_value.value = -1
# 等待reader进程完成
p2.join()
需要注意的是multiprocessing.Value
的使用方式:
对于共享整数或者单个字符,初始化比较简单,参照下图映射关系即可。如:
i = multiprocessing.Value('i', 1)
c = multiprocessing.Value('c', '0')
注意,如果我们使用的code在上表不存在,则会抛出错误:
TypeError: this type has no size
如果共享的是字符串,则在上表是找不到映射关系的,就是没有code可用。所以我们需要使用原始的ctype类型,如:
from ctypes import c_char_p
ss =multiprocessing.
Value(c_char_p, 'ss')ctype类型可从下表查阅:
使用multiprocessing.Array
共享复杂数据结构
当需要共享更复杂的数据结构(如数组或列表)时,可以使用multiprocessing.Array
。
注意:Array
只能存储基本数据类型的数组。和上上图一致
import multiprocessing
import ctypes
def writer(shared_array):
for i in range(5):
shared_array[i] = i * i
print(f"Writer: {list(shared_array[:i+1])}")
time.sleep(1)
def reader(shared_array):
while True:
if shared_array[0] != -1: # 假设我们使用第一个元素作为结束信号
print(f"Reader: {list(shared_array)}")
time.sleep(0.5)
else:
break
if __name__ == "__main__":
import time
# 创建一个共享的整数数组
shared_array = multiprocessing.Array(ctypes.c_int, 5) # 创建一个长度为5的整数数组
# 创建进程
p1 = multiprocessing.Process(target=writer, args=(shared_array,))
p2 = multiprocessing.Process(target=reader, args=(shared_array,))
# 启动进程
p1.start()
p2.start()
# 等待writer进程完成
p1.join()
# 发送结束信号给reader(这里简单地将第一个元素设为-1)
shared_array[0] = -1
# 等待reader进程完成
p2.join()
使用multiprocessing.Manager
共享更复杂数据结构
Manager提供了一种方法创建数据,数据能够在不同进程之间共享,包括跨网络的运行在不同机器上的进程。manager对象控制有共享对象的服务进程。其他进程通过代理后也能操作共享对象。
import multiprocessing
def worker(d, key, value):
d[key] = value
print(f'Worker: Setting {key} to {value}')
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
# 创建一个共享字典
d = manager.dict()
# 创建并启动进程
p1 = multiprocessing.Process(target=worker, args=(d, 'foo', 1))
p2 = multiprocessing.Process(target=worker, args=(d, 'bar', 2))
p1.start()
p2.start()
# 等待进程完成
p1.join()
p2.join()
# 打印最终的共享字典内容
print('Main: Dictionary contents:', d)
参考文献
Python Multiprocessing 多进程并行计算 - Jeremy Feng (fengchao.pro)
python多进程共享变量Value使用tips - 简书 (jianshu.com)
Python多进程并行操作-multiprocessing-Managers - 知乎 (zhihu.com)