创建进程
os.fork()
该方法只能在linux和mac os中使用,因为其主要基于系统的fork来实现。window中没有这个方法。
通过os.fork()方法会创建一个子进程,子进程的程序集为该语句下方的所有语句。
import os
print("主进程的PID为:" , os.getpid())
w = 1
pid = os.fork() # 创建子进程
print('fork方法的返回值为: ', pid)
if pid == 0:
print(f'子进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')
else:
print(f'主进程PID: {os.getpid()}, 主进程PID: {os.getppid()}, 子进程中w: {w}')
multiprocessing(target为函数)
通过multiprocessing模块中的Process类创建一个进程实例对象,并通过其start方法启动该进程。
进程中的程序集为Process类的target参数,可以是一个函数也可以是一个方法。
需要注意的是windows系统中创建进程的过程需要放在if __name__== "__main__"
代码块中,因为其实现数据集的复制时,是通过import语句实现。
而在Linux和MacOS系统下则不需要,因为他们原生支持fork方法。
import multiprocessing
import os
import time
def task():
for i in range(3):
print("wating... 当前pid为 : ", os.getpid(), "父进程为: ", os.getppid())
time.sleep(1)
if __name__ == "__main__":
print('主进程pid:', os.getpid())
process = multiprocessing.Process(target=task)
process.start()
multiprocessing(taget为方法)
创建Process实例对象的target参数,不仅可以是函数名,还可以是类的方法名。
- 如果需要往target中传入参数,可以通过args和kwargs两个参数进行相应的传参
import multiprocessing
import time
class Tasks():
def task1(self):
time.sleep(1)
print('task1')
def task2(self):
time.sleep(1)
print('task2')
if __name__ == "__main__":
t = Tasks()
process1 = multiprocessing.Process(target=t.task1)
process1.start()
process2 = multiprocessing.Process(target=t.task2)
process2.start()
继承Process类
通过继承multiprocessing.Process
类,并重写其中的run方法。
- 必须要重写
run
方法,Process子类的实例对象的run方法,就是进程执行的程序
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, i):
super().__init__()
self.name = str(i)
def run(self):
time.sleep(2)
print(f'子进程-{self.name}')
if __name__ == '__main__':
for i in range(5):
p = MyProcess(i)
p.start()
print('主进程')
进程阻塞
目前系统一般都是多核的,当处理多任务时,一般都以并发或并行的方式处理多任务。所以系统一般以异步的方式处理多进程。
在Process
的实例方法中,通过join
方法表示进程阻塞时,主将处于等待状态,并不会处理其他进程。
单进程阻塞
针对每个进程开启后立马启用join方法,这种方法效率低下。使得系统的处理方式编程同步阻塞
,使得主进程依次处理子进程。
import time
from multiprocessing import Process
def eat():
time.sleep(2)
print('eat')
def drink():
time.sleep(2)
print('drink')
if __name__ == '__main__':
process1 = Process(target=eat)
process1.start()
process1.join()
process2 = Process(target=drink)
process2.start()
process2.join()
print('主进程')
多进程阻塞
先利用start方法将多个进程同时创建并启动,然后在创建完成后统一阻塞进程。
- 统一创建进程,并让其统一运行
- 统一等待进程结束,避免每个进程都等一段时间
import time
from multiprocessing import Process
def eat():
time.sleep(2)
print('eat')
def drink():
time.sleep(2)
print('drink')
if __name__ == '__main__':
process1 = Process(target=eat)
process1.start()
process2 = Process(target=drink)
process2.start()
for p in [process1, process2]:
p.join()
print('主进程')
进程锁
当多进程编辑同一文件或数据时,往往会导致数据不一致问题,针对这种情况,需要在进程中对处理文件或数据的代码前后进行加锁和解锁操作。
如果没有锁,会导致数据的不一致
import time, json
from multiprocessing import Process
def read_ticket(user):
with open('ticket.txt') as f:
num = json.load(f)['ticket']
time.sleep(1)
print(f'User {user}: 当前剩余{num}张票')
return num
def order_ticket(user, num):
time.sleep(1)
num -= 1
with open('ticket.txt', 'w') as f:
json.dump({'ticket': num}, f)
print(f'User {user}: 购票成功')
def ticket(user):
num = read_ticket(user)
if num > 0:
order_ticket(user, num)
else:
print(f'User {user}: 购票失败')
if __name__ == '__main__':
queue = []
for i in range(5):
p = Process(target=ticket, args=(i,))
p.start()
queue.append(p)
for q in queue:
q.join()
print('运行结束')
加锁/解锁
在编辑数据的之前通过acquire
方法加锁,当数据编辑完成后,通过release
方法解锁。
- 在主进程中创建一个锁对象
- 然后在每个修改共同数据的进程中传入已经创建的锁对象
- 在修改数据的代码前后分别加锁和解锁
"""
@Time: 2024/6/28 20:18
@Author: 'Ethan'
@Email: ethanzhou4406@outlook.com
@File: 1. 同步阻塞.py
@Project: python
@Feature:
"""
import time, json
from multiprocessing import Process, Lock
def read_ticket(user):
with open('ticket.txt') as f:
num = json.load(f)['ticket']
time.sleep(0.1)
print(f'User {user}: 当前剩余{num}张票')
def order_ticket(user):
time.sleep(0.1)
with open('ticket.txt') as f:
num = json.load(f)['ticket']
if num > 0:
with open('ticket.txt', 'w') as f:
num -= 1
json.dump({'ticket': num}, f)
print(f'User {user}: 购票成功')
else:
print(f'User {user}: 购票失败')
def ticket(user,lock):
read_ticket(user)
lock.acquire()
order_ticket(user)
lock.release()
if __name__ == '__main__':
lock = Lock()
queue = []
for i in range(5):
p = Process(target=ticket, args=(i, lock))
p.start()
queue.append(p)
for q in queue:
q.join()
print('运行结束')
锁的上下文管理器
如果在代码加锁后,解锁前,代码出现了异常就会导致进程没有来得及解锁,而导致死锁现象。通过锁的上下文管理器语法,可以有效避免这种情况的发生。
import time, json
from multiprocessing import Process, Lock
def read_ticket(user):
with open('ticket.txt') as f:
num = json.load(f)['ticket']
time.sleep(0.1)
print(f'User {user}: 当前剩余{num}张票')
def order_ticket(user):
time.sleep(0.1)
with open('ticket.txt') as f:
num = json.load(f)['ticket']
if num > 0:
with open('ticket.txt', 'w') as f:
num -= 1
json.dump({'ticket': num}, f)
print(f'User {user}: 购票成功')
else:
print(f'User {user}: 购票失败')
def ticket(user,lock):
read_ticket(user)
with lock:
order_ticket(user)
if __name__ == '__main__':
lock = Lock()
queue = []
for i in range(5):
p = Process(target=ticket, args=(i, lock))
p.start()
queue.append(p)
for q in queue:
q.join()
print('运行结束')
进程间通信
进程之间可以进行通信,主要是通过各个进程之中传入一个公共的沟通工具,所有的进程都通过这个工具进行沟通。multiprocessing中提供了两种进程间沟通的工具Queue
和Pipe
Queue方式
Queue是基于文件传输的socket通信方式,并且它是带锁机制的。它的数据主要的特点是先进先出,后进后出。
当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用pickle
序列化,并将序列化后的数据通过一个底层管道的管道传递给队列中。
主要使用如下方法:
- qsize(): 返回队列的大致的长度。返回的值由于多线程或多进程的上下文而变得不可靠
- empty(): 队列为空返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
- full(): 队列满了返回True,否则返回False。返回的值由于多线程或多进程的上下文而变得不可靠
- put(obj[, block[, timeout]]): 将obj放入队列。
- 如果block为True(默认值)而且timeout是None(默认值),将会阻塞当前进程,直到有空的缓冲槽。
- 如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的缓冲槽时抛出
queue.Full
异常 - 反之block为False,仅当有可用缓冲槽时才放入对象,否则抛出
queue.Full
异常(这种情况下timeout参数会被忽略)
- get([block[, timeout]]): 从队列中取出并返回对象。如果可选参数block是True而且timeout是None,将会阻塞当前进程,直到队列中出现可用对象。如果timeout是正数,将会阻塞了最多timeout秒之后还是没有可用的对象时抛出
queue.Empty
异常。- 反之,block是False时,仅当有可用对象能够取出时返回,否则抛出
queue.Empty
异常(这种情况下timeout参数会被忽略)
- 反之,block是False时,仅当有可用对象能够取出时返回,否则抛出
import time, json
from multiprocessing import Process, Queue
def task(i, queue: Queue):
time.sleep(1)
queue.put(i)
print(f'task {i}, 入列')
if __name__ == '__main__':
queue = Queue()
process_queue = []
for i in range(5):
p = Process(target=task, args=(i, queue))
p.start()
process_queue.append(p)
for p in process_queue:
p.join()
for i in range(5):
print(f'主进程中消费队列内容{queue.get()}')
print('运行结束')
Pipe方式
Pipe方式是进程之间通信的另一种方式和Queue不同之处在于,它不带锁,且信息顺序无法得到保障。
主要的使用方法:
- send(obj): 将一个对象发送到链接的另一端,可以用
recv()
读取,发送的对象必须是可序列化的,多大的对象(接近32MiB)可能引发ValueError
异常 - recv(): 返回一个由另一端使用
send()
发送的对象,该方法会一直阻塞直到接收到对象。如果对端关闭了链接或者没有东西可接收,将抛出EOFError
异常
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
def task(pipe:Connection):
print('子进程往管道里加了内容')
time.sleep(1)
pipe.send("子进程往管道中加了点东西")
if __name__ == '__main__':
pipe1, pipe2 = Pipe()
p = Process(target=task, args=(pipe1,))
p.start()
p.join()
print('主进程中获取管道内的内容为:', pipe2.recv())
print('运行结束')