非阻塞模式
# 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程
# 但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法.
# 初始化Poo1时,可以指定一个最大进程数,当有新的请求提交到Poo1中时,如果池还没有满,
# 那么就会创建一个新的进程用来执行该请求,但如果池中的进程数已经达到指定的最大值,那么 该请求就会等待
# 直到池中有进程结束,才会创建新的进程来执行。
#阻塞式:添加一个执行一个,然后接着添加下一个
#非阻塞式,全部添加到队列,立刻返回,并不会等待他们执行完毕,但是回调函数是等待任务执行完毕后才调用
import os
import time
from multiprocessing import Pool
from random import random
def task(task_name):
print("开始做任务",task_name)
#记录开始时间
start = time.time()
#模拟做任务
time.sleep(random()*2)
end = time.time()
return "完成任务用时",(end-start),'进程id',os.getpid()
'''
回调函数,当我们任务做完了,我们想把这个事情通知一下,我们把通知给扔出去了,扔给了回调函数
'''
container = []
def callback_func(n):
container.append(n)
if __name__ == '__main__':
#Pool()需要传递参数,整型的参数
pool = Pool(5)
tasks = ['听音乐','吃饭','洗衣服','打游戏','散步','看孩子','做饭']
#模拟循环创建8个任务
for i in tasks:
#下面函数表示用的是池子的非阻塞模式,需要传递函数,然后函数需要参数,需要可迭代的参数
#循环一次池子添加一个任务
#要使用进程池,要挡住你的主进程,因为进程或者要依靠主进程
#callback需要传递的参数就是你的回调函数
#回调函数就是当你的任务完成时,将任务完成的结果返回给回调函数,也就是test1的返回值成了callback_func的参数
#之后可以在最后产生结果
pool.apply_async(task,args=(i,),callback=callback_func)
pool.close() #添加任务结束
pool.join()#相当于插队,不让你的主进程往下走
for c in container:
print(c)
结果
阻塞模式
#阻塞式
#进程的优点没有体现出来,不可以并行
#添加一个任务执行一个任务,如果一个任务不结束,另外一个让你无就进不来
#进程池:
#pool = Pool(max) 创建进程池对象
# pool.apply()阻塞
# pool.apply_async()非阻塞
# pool.close()
# pool.join()让主进程给子进程让路
import os
import time
from multiprocessing import Pool
from random import random
def task(task_name):
print("开始做任务",task_name)
#记录开始时间
start = time.time()
#模拟做任务
time.sleep(random()*2)
end = time.time()
print("完成任务用时", (end - start), '进程id', os.getpid())
# container = []
# def callback_func(n):
# container.append(n)
if __name__ == '__main__':
pool = Pool(5)
pool = Pool(5)
tasks = ['听音乐', '吃饭', '洗衣服', '打游戏', '散步', '看孩子', '做饭']
for i in tasks:
pool.apply(task, args=(i,))
pool.close()
pool.join()
结果
queue
# 当进程平行执行时,数据如何从一个地方传输到另外一个地方,这个时候就需要用到队列了
# 队列就相当于在两个进程中搭个梯子,然后传输数据,queue
from multiprocessing import Queue
# queue需要的参数,通过put往Queue放东西
#规定队列里面的东西不超过5个
#put参数有个timeout,这个参数就是当有个东西等待相应的时间若还是没有空出来,就会报异常
q=Queue(5)
q.put('a')
q.put('b')
q.put('c')
q.put('d')
q.put('e')
print(q.qsize())
if not q.full(): #full用来判断队列是否满了 q.empty()判断队列是不是空的
q.put('f',timeout=4) #如果你的队列满了,则你现在只能等,除非队列里面空出一个位置
else:
print("队列已满")
#取出队列的值,同样这个也有个timeout参数,这个就是当你取东西的时间超过了对应时间,就会报异常
#这个异常可以用try去掉
print(q.get())
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
print(q.get(timeout=2))
结果
进程间通信
借助queue
# 白大黑
# 开发时间:$[DATE] $[TIME]
from multiprocessing import Process,Queue
from time import sleep
def download(q):
#模拟图片
images=['a.jpg','b.jpg','c.jpg']
#模拟下载图片
for i in images:
print('正在下载',i)
sleep(0.5)
q.put(i)
#下载成功后需要把文件送到getfile中,然后getfile拿到这个文件,就把这个文件放到一个具体的位置
#但是现在是两个进程,两个进程是平行的。如何把下载的东西上传到getfile中就是一个问题
#这时queue就出现了
#两者通信当然要保持两者通道是一致的,queue当然使用的是一个
#所以需要两个函数都弄一个参数q,然后使用args参数传进去
def getfile(q):
while True:
try:
file = q.get(timeout=2)
print('{}文件保存成功'.format(file))
except:
print('保存完毕')
break
if __name__ == '__main__':
q=Queue(5)
#当download执行完成后,他就会把剩下的内存回收,这时getfile就不能用了,所以下载的东西可能不完全
#所以就需要p1这个任务等待一下
p1 = Process(target=download,args=(q,))
p2 = Process(target=getfile,args=(q,))
p1.start()
#放了join就相当于p1插队了
p1.join()
p2.start()
结果
进程对于全局变量的访问
import os
from multiprocessing import Process
from time import sleep
#当你的子进程调用全局变量的时候,相当于每个子进程会拥有一个自己的m变量,意思是两个子进程的变量m是分开的
#m是不可变类型,可变类型也是同理,这样就实现了多任务
m = 1
def test1(x):
global m
while True:
sleep(x)
m = m+1
print("这是任务1",'子',os.getpid(),'父',os.getppid(),m)
def test2(a):
global m
while True:
sleep(a)
m = m + 1
#os.getpid进程号os.getppid父进程号
print('这是任务2','子',os.getpid(),'父',os.getppid(),m)
if __name__ == '__main__':
#创建进程,这是两个子进程,当你运行程序的时候python解释器就已经默认给你分配了一个进程,然后这个程序里面有创建了两个子进程
#若是需要传参数,Process有别的属性args这个后面需要跟一个可迭代的,这个里面的就是你给函数传的参数
#p.run只启动任务不执行进程,p.start启动进程并执行任务.p.terminate终止进程
p = Process(target=test1,name='任务1',args=(1,))
p.start()
print(p.name)
p1 = Process(target=test2,name="任务2",args=(2,))
p1.start()
print(p1.name)
结果
m是每个进程单独的一个
不嫌弃的点点关注,点点赞 ଘ(੭ˊᵕˋ)੭* ੈ✩‧₊˚