本节的快速导航目录如下喔!!!
一、创建进程的类Process
二、进程并发控制之Semaphore
三、进程同步之Lock
四、进程同步之Event
五、进程优先队列Queue
六、多进程之进程池Pool
七、多进程之数据交换Pipe
一、创建进程的类Process
multiprocessing模块提供了一个创建进程的类Process,创建进程有两种方法:
- 创建一个类的实例Process,并指定任务目标函数
-
自定义一个类并继承Process类,重写其init()方法和run方法
第一种方法:
import os
import time
from multiprocessing import Process
# 字进程要执行的代码:数据累加耗时函数
def task_process(delay):
num = 0
for i in range(delay * 10000000):
num += i
print(f"进程pid为{os.getpid()},执行完成")
if __name__ == '__main__':
print('父进程pid为 %s.' % os.getpid())
t0 = time.time()
# 单进程执行
task_process(3)
task_process(3)
t1 = time.time()
print(f"顺序执行耗时{t1 - t0}")
p0 = Process(target=task_process, args=(3,))
p1 = Process(target=task_process, args=(4,))
t2 = time.time()
# 双进程并行执行
p0.start();p1.start()
p0.join();p1.join()
t3 = time.time()
print(f"多进程并发执行耗时{t3 - t2}")
# 发现多进程执行相同的操作次数耗时更少
第二种方法(重写方法):
# -*- coding: UTF-8 -*-
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, delay):
super().__init__()
self.delay = delay
# 子进程要执行代码
def run(self):
num = 0
for i in range(self.delay * 10000000):
num += i
print(f"进程pid为{os.getpid()},执行完成")
if __name__ == '__main__':
print('父进程pid为 %s.' % os.getpid())
p0 = MyProcess(3)
p1 = MyProcess(3)
t0 = time.time()
p0.start()
p1.start()
p0.join()
p1.join()
t1 = time.time()
print(f"多进程并发执行耗时{t1 - t0}")
Process类的构造函数原型:
class multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
'''参数说明如下:
Target:表示调用对象,一般为函数,也可以为类。
Args:表示调用对象的位置参数元组
Kwargs:为进程调用对象的字典
Name:为进程的别名
Group:参数不使用,可忽略
'''
类提供的常用方法如下:
- is_alive():返回进程是否是激活的
- join([timeout]):阻塞进程,直到进程执行完成或超时或进程被终止。
- run():代表进程执行的任务函数,可被重写
- start():激活进程
- terminate():终止进程
其属性如下:
- authkey:字节码,进程的准密钥
daemon
:父进程终止后自动终止,且不能产生新进程,必须在start()之前设置- exicode:退出码,进程在运行时为None,如果为-N,就表示被信号N结束
- name:获取进程名称
- pid:进程ID
例子:不设置daemon属性:
# -*- coding: UTF-8 -*-
import os
import time
from multiprocessing import Process
# 子进程要执行的代码
def task_process(delay):
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始")
print(f"sleep {delay}s")
time.sleep(delay)
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束")
if __name__ == '__main__':
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始")
p0 = Process(target=task_process,args=(3,))
p0.start()
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束")
'''
2024-05-08 20:04:09 父进程执行开始
2024-05-08 20:04:09 父进程执行结束
2024-05-08 20:04:10 子进程执行开始
sleep 3s
2024-05-08 20:04:13 子进程执行结束
'''
'''
没有使用p0.join()来阻塞进程,父进程并没有等待子进程运行完成就打印了退出信息,程序依旧会等待子进程运行完成
'''
例子:设置daemon
属性:
# 在上述代码主程序中添加
p0.deamon = True
'''
2024-05-08 20:07:11 父进程执行开始
2024-05-08 20:07:11 父进程执行结束
'''
# 程序并没有等待子进程结束而结束,主要主程序结束,程序即退出
二、进程并发控制之Semaphore
semaphore用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数。
# 实例:多进程同步控制
import multiprocessing
import time
def worker(s,i):
s.acquire() print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name+"获得锁运行");
time.sleep(i)
print(time.strftime('%H:%M:%S'), multiprocessing.current_process().name +"释放锁运行");
s.release()
if __name__ == '__main__':
s = multiprocessing.Semaphore(2) # 同一时刻只有两个进程在执行
for i in range(6):
p = multiprocessing.Process(target=worker(s,2))
p.start()
三、进程同步之Lock
多进程的目的是并发执行,提高资源利用率,从而提高效率,但有时候我们需要在某一时间只能有一个进程访问某个共享资源,这种情况就需要使用锁Lock
# 多个进程输出信息,不加锁:
import multiprocessing
import time
def task1():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task1 输出信息")
time.sleep(1)
n -= 1
def task2():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task2 输出信息")
time.sleep(1)
n -= 1
def task3():
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task3 输出信息")
time.sleep(1)
n -= 1
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task1)
p2 = multiprocessing.Process(target=task2)
p3 = multiprocessing.Process(target=task3)
p1.start();p2.start();p3.start()
未使用锁,生成三个子进程,每个进程都打印自己的信息。下面使用锁:
import multiprocessing
import time
def task1(lock):
with lock: # 也可使用上下文关键字加锁
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task1 输出信息")
time.sleep(1)
n -= 1
def task2(lock):
lock.acquire()
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task2 输出信息")
time.sleep(1)
n -= 1
lock.release()
def task3(lock):
lock.acquire() # 调用前后加锁
n = 5
while n > 1:
print(f"{time.strftime('%H:%M:%S')}task3 输出信息")
time.sleep(1)
n -= 1
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock() # 1、实例化一个锁
p1 = multiprocessing.Process(target=task1,args=(lock,))
p2 = multiprocessing.Process(target=task2,args=(lock,))
p3 = multiprocessing.Process(target=task3,args=(lock,))
p1.start()
p2.start()
p3.start()
可以发现,同一时刻只有一个进程在输出信息。
四、进程同步之Event
Event用来实现进程之间同步通信
import multiprocessing
import time
def with_for_event(e):
e.wait()
time.sleep(1)
# 唤醒后清除Event状态,为后续继续等待
e.clear()
print(f"{time.strftime('%H:%M:%S')}:进程A:我们是兄弟,我等你...")
e.wait()
print(f"{time.strftime('%H:%M:%S')}:进程A:好的,是兄弟一起走")
def wait_for_event_timeout(e, t):
e.wait()
time.sleep(1)
e.clear()
print(f"{time.strftime('%H:%M:%S')}:进程B:好吧,最多等你{t}秒")
e.wait(t)
print(f"{time.strftime('%H:%M:%S')}:进程B:我要继续往前走了")
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(target=with_for_event,args=(e,))
w2 = multiprocessing.Process(target=wait_for_event_timeout,args=(e,5))
w1.start()
w2.start()
# 主进程发话
print(f"{time.strftime('%H:%M:%S')}:主进程:谁等我下,我需要8秒时间")
# 唤醒等待的进程
e.set()
time.sleep(8)
print(f"{time.strftime('%H:%M:%S')}:好的,我赶上了")
# 再次唤醒等待的进程
e.set()
w1.join()
w2.join()
print(f"{time.strftime('%H:%M:%S')}:主进程:退出")
上述定义了两个函数,一个用于等待事件的发生;一个用于等待事件发生并设置超时时间。主进程调用事件的set()方法唤醒等待事件的进程,事件唤醒后调用claer()方法清除事件的状态并重新等待,以此达到进程同步的控制。
五、进程优先队列Queue
- 多进程安全:Queue是为多进程环境设计的,确保在多进程之间进行数据传递时的安全性。
- put方法:
- 用于向队列中插入数据。
- 有两个可选参数:blocked 和 timeout 。
- 默认情况下,
blocked
设置为True
。 - 如果
timeout
是一个正值,并且队列已满,put
方法会阻塞直到队列有空间或者超时。 - 超时会抛出
Queue.Full
异常。 - 如果
blocked
设置为False
且队列已满,会立即抛出Queue.Full
异常。
- get方法:
- 用于从队列中读取并删除一个元素。
- 同样有两个可选参数:
blocked
和timeout
。 - 默认情况下,
blocked
设置为True
。 - 如果
timeout
是一个正值,并且在指定时间内队列中没有元素可取,会抛出Queue.Empty
异常。 - 如果block设置为False则有两种情况:
- 如果队列为空,会立即抛出
Queue.Empty
异常。 - 如果队列中有值可用,会立即返回该值。
- 如果队列为空,会立即抛出
这些特性确保了在多进程环境中,队列操作的同步性和数据的一致性。
# 实现消费者-生产者模式
from multiprocessing import Process,Queue
import time
def ProducerA(q):
count = 1
while True:
q.put(f"冷饮 {count}")
print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
count +=1
time.sleep(1)
def ConsumerB(q):
while True:
print(f"{time.strftime('%H:%M:%S')} B 放入:[取出 {q.get()}]")
time.sleep(5)
if __name__ == '__main__':
q = Queue(maxsize=5)
p = Process(target=ProducerA,args=(q,))
c = Process(target=ConsumerB,args=(q,))
c.start()
p.start()
c.join()
p.join()
上述代码定义了生产者函数和消费者函数,设置其队列的最大容量是5,生产者不停的生产冷饮,消费者就不停的取出冷饮消费,当队列满时,生产者等待,当队列空时,消费者等待。
六、多进程之进程池Pool
进程池(Pool)按需创建进程,池满则请求排队等待。
import multiprocessing
import time
def task(name):
print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")
time.sleep(3)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
for i in range(10):
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func=task, args=(i,))
pool.close()
pool.join()
从结果来看,同一时刻只有三个进程在执行,使用Pool实现了对进程并发数的控制
七、多进程之数据交换Pipe
在Python的多进程编程中,multiprocessing.Pipe() 方法可以创建一个管道,用于进程间的数据传输。
默认情况下,管道是全双工的,即两个进程可以互相发送和接收数据。如果需要设置为半双工(单向传输),可以通过传递参数duplex=False 来实现。
管道对象提供了send() 方法用于发送消息,以及recv() 方法用于接收消息。如果recv()在没有消息的情况下被调用,它会阻塞;如果管道已关闭,recv()会抛出EOFError
异常。
import multiprocessing
import time
def task1(pipe):
for i in range(5):
str = f"task1-{i}"
print(f"{time.strftime('%H:%M:%S')}: task1 发送:{str}")
pipe.send(str)
time.sleep(2)
for i in range(5):
print(f"{time.strftime('%H:%M:%S')}: task1 接收:{ pipe.recv() }")
def task2(pipe):
for i in range(5):
print(f"{time.strftime('%H:%M:%S')}: task2 接收:{pipe.recv()}")
time.sleep(2)
for i in range(5):
str = f"task1-{i}"
print(f"{time.strftime('%H:%M:%S')}: task2 发送:{ str }")
if __name__ == '__main__':
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task1,args=(pipe[0],))
p2 = multiprocessing.Process(target=task2,args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
首先程序定义了两个子进程函数:task1先发送5条消息,再接收消息;task2先接收消息,再发送消息。