python之并发编程
- 线程的创建方式
- 线程的创建方式(方法包装)
- 线程的创建方式(类包装)
- join()【让主线程等待子线程结束】
- 守护线程【主线程结束,子线程就结束】
- 锁
- 多线程操作同一个对象(未使用线程同步)
- 多线程操作同一个对象(增加互斥锁,使用线程同步)
- 死锁案例
- 信号量使用
- 事件(Event)
- 生产者消费者模型
- 进程
- 方法模式创建进程
- 类模式创建进程
- Queue实现进程间通信
- Pipe实现进程间通信
- Manager管理器
- 进程池(Pool)
- 进程池使用案例
- 使用with管理进程池
- 协程是什么
- 不使用协程执行多个任务
- 使用yield协程,实现任务切换
- asyncio实现协程(重点)
线程的创建方式
Python的标准库提供了两个模块: _thread 和 threading , _thread 是低级模块, threading 是高级模块,对 _thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。
线程的创建可以通过分为两种方式:
- 方法包装
- 类包装
线程的执行统一通过 start() 方法
线程的创建方式(方法包装)
def func1(name):
print(f"线程{name},start") #format
for i in range(3):
print(f"线程{name},{i}")
sleep(3)
print(f"线程{name},end")
if __name__ == '__main__':
print("主线程,start")
#创建线程
t1 = Thread(target=func1,args=("t1",))
t2 = Thread(target=func1,args=("t2",))
#启动线程
t1.start()
t2.start()
print("主线程,end")
运行结果可能会出现换行问题,是因为多个线程抢夺控制台
输出的IO流。
线程的创建方式(类包装)
class MyThread(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name = name
def run(self):
print(f"线程{self.name},start") # format
for i in range(3):
print(f"线程{self.name},{i}")
sleep(3)
print(f"线程{self.name},end")
if __name__ == '__main__':
print("主线程,start")
#创建线程
t1 = MyThread("t1")
t2 = MyThread("t2")
#启动线程
t1.start()
t2.start()
print("主线程,end")
join()【让主线程等待子线程结束】
之前的代码,主线程不会等待子线程结束。
如果需要等待子线程结束后,再结束主线程,可使用join()方法。
def func1(name):
for i in range(3):
print(f"thread:{name} :{i}")
sleep(1)
if __name__ == '__main__':
print("主线程,start")
#创建线程
t1 = Thread(target=func1,args=("t1",))
t2 = Thread(target=func1,args=("t2",))
#启动线程
t1.start()
t2.start()
#主线程会等待t1,t2结束后,再往下执行
t1.join()
t2.join()
print("主线程,end")
守护线程【主线程结束,子线程就结束】
class MyThread(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name =name
def run(self):
for i in range(3):
print(f"thread:{self.name} :{i}")
sleep(1)
if __name__ == '__main__':
print("主线程,start")
#创建线程(类的方式)
t1 = MyThread('t1')
#t1设置为守护线程
t1.daemon = True
# t1.setDaemon(True)
#启动线程
t1.start()
print("主线程,end")
锁
多线程操作同一个对象(未使用线程同步)
class Account:
def __init__(self,money,name):
self.money = money
self.name = name
#模拟提款的操作
class Drawing(Thread):
def __init__(self,drawingNum,account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
if self.account.money<self.drawingNum:
return
sleep(1) #判断完可以取钱,则阻塞。就是为了测试发生冲突问题
self.account.money -=self.drawingNum
self.expenseTotal += self.drawingNum
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == '__main__':
a1 = Account(100,"gaoqi")
draw1 = Drawing(80,a1) #定义一个取钱的线程
draw2 = Drawing(80,a1) #定义一个取钱的线程
draw1.start()
draw2.start()
多线程操作同一个对象(增加互斥锁,使用线程同步)
class Account:
def __init__(self,money,name):
self.money = money
self.name = name
#模拟提款的操作
class Drawing(Thread):
def __init__(self,drawingNum,account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
lock1.acquire()
if self.account.money<self.drawingNum:
print("账户余额不足!")
return
sleep(1) #判断完可以取钱,则阻塞。就是为了测试发生冲突问题
self.account.money -=self.drawingNum
self.expenseTotal += self.drawingNum
lock1.release()
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == '__main__':
a1 = Account(1000,"gaoqi")
lock1 = Lock()
draw1 = Drawing(80,a1) #定义一个取钱的线程
draw2 = Drawing(80,a1) #定义一个取钱的线程
draw1.start()
draw2.start()
死锁案例
def fun1():
lock1.acquire()
print('fun1拿到菜刀')
sleep(2)
lock2.acquire()
print('fun1拿到锅')
lock2.release()
print('fun1释放锅')
lock1.release()
print('fun1释放菜刀')
def fun2():
lock2.acquire()
print('fun2拿到锅')
lock1.acquire()
print('fun2拿到菜刀')
lock1.release()
print('fun2释放菜刀')
lock2.release()
print('fun2释放锅')
if __name__ == '__main__':
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=fun1)
t2 = Thread(target=fun2)
t1.start()
t2.start()
信号量使用
def home(name,se):
se.acquire()
print(f"{name}进入房间")
sleep(3)
print(f"****{name}走出房间")
se.release()
if __name__ == '__main__':
se = Semaphore(5) #信号量对象
for i in range(7):
t = Thread(target=home,args=(f"tom{i}",se))
t.start()
事件(Event)
事件Event主要用于唤醒正在阻塞等待状态的线程;
Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行
def chihuoguo(name):
#等待事件,进入等待阻塞状态
print(f'{name}已经启动')
print(f'小伙伴{name}已经进入就餐状态!')
time.sleep(1)
event.wait()
# 收到事件后进入运行状态
print(f'{name}收到通知了.' )
print(f'小伙伴{name}开始吃咯!')
if __name__ == '__main__':
event = threading.Event()
# 创建新线程
thread1 = threading.Thread(target=chihuoguo, args=("tom", ))
thread2 = threading.Thread(target=chihuoguo, args=("cherry", ))
# 开启线程
thread1.start()
thread2.start()
time.sleep(10)
# 发送事件通知
print('---->>>主线程通知小伙伴开吃咯!')
event.set()
生产者消费者模型
从一个线程向另一个线程发送数据最安全的方式可能就是使用queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
def producer():
num = 1
while True:
if queue.qsize()<5:
print(f"生产{num}号,大馒头")
queue.put(f"大馒头:{num}号")
num +=1
else:
print("馒头框满了,等待来人消费啊!")
sleep(1)
def consumer():
while True:
print(f"获取馒头:{queue.get()}")
sleep(1)
if __name__ == '__main__':
queue = Queue()
t1 = Thread(target=producer)
t2 = Thread(target=consumer)
t1.start()
t2.start()
进程
方法模式创建进程
def fun1(name):
print(f"当前进程ID:{os.getpid()}")
print(f"父进程ID:{os.getppid()}")
print(f"Process:{name},start")
sleep(3)
print(f"Process:{name},end")
#windows上多进程实现的bug。如果不加main的限制,就会无限制的创建子进程,从而报错。
if __name__ == '__main__':
print("当前进程ID:",os.getpid())
#创建进程
p1 = Process(target=fun1,args=("p1",))
p2 = Process(target=fun1, args=("p2",))
#启动进程
p1.start()
p2.start()
类模式创建进程
class MyProcess(Process):
def __init__(self,name):
Process.__init__(self)
self.name = name
def run(self):
print(f"Process:{self.name},start")
sleep(3)
print(f"Process:{self.name},end")
if __name__ == '__main__':
#创建进程
p1 = MyProcess("p1")
p2 = MyProcess("p2")
p1.start()
p2.start()
Queue实现进程间通信
前面讲解了使用 Queue 模块中的 Queue 类实现线程间通信,但要实现进程间通信,需要使用 multiprocessing 模块中的 Queue 类。
简单的理解 Queue 实现进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各个进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走。
#coding=utf-8
from multiprocessing import Process, Queue
from time import sleep
class MyProcess(Process):
def __init__(self,name,mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print(f"Process:{self.name},start")
print(f"get Data:{mq.get()}")
sleep(2)
self.mq.put(f"new_data:{self.name}")
print(f"Process:{self.name},end")
if __name__ == '__main__':
mq = Queue()
mq.put("1")
mq.put("2")
mq.put("3")
#进程列表
p_list = []
for i in range(3):
p = MyProcess(f"p{i}",mq)
p_list.append(p)
for p in p_list:
p.start()
for p in p_list:
p.join()
print(mq.get())
print(mq.get())
print(mq.get())
Pipe实现进程间通信
Pipe 直译过来的意思是“管”或“管道”,和实际生活中的管(管道)是非常类似的。
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),
那么这个参数是全双工模式,也就是说conn1和conn2均可收发。若duplex为False,conn1只负责接收消息,conn2只负责
发送消息。send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,
conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
#coding=utf-8
import multiprocessing
from time import sleep
def func1(conn1):
sub_info = "Hello!"
print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
sleep(1)
conn1.send(sub_info)
print(f"来自进程2:{conn1.recv()}")
sleep(1)
def func2(conn2):
sub_info = "你好!"
print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
sleep(1)
conn2.send(sub_info)
print(f"来自进程1:{conn2.recv()}")
sleep(1)
if __name__ == '__main__':
#创建管道
conn1,conn2 = multiprocessing.Pipe()
# 创建子进程
process1 = multiprocessing.Process(target=func1,args=(conn1,))
process2 = multiprocessing.Process(target=func2,args=(conn2,))
# 启动子进程
process1.start()
process2.start()
Manager管理器
def func(name,m_list,m_dict):
m_dict['name'] = '123'
m_list.append('你好')
if __name__ == "__main__":
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append('Hello!!')
#两个进程不能直接互相使用对象,需要互相传递
p1 = Process(target=func,args=('p1',m_list,m_dict))
p1.start()
p1.join() #等p1进程结束,主进程继续执行
print(f"主进程:{m_list}")
print(f"主进程:{m_dict}")
进程池(Pool)
Python提供了更好的管理多个进程的方式,就是使用进程池
进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
进程池使用案例
def func1(name):
print(f"当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print(args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func = func1,args=('sxt1',),callback=func2)
pool.apply_async(func = func1,args=('sxt2',),callback=func2)
pool.apply_async(func = func1,args=('sxt3',),callback=func2)
pool.apply_async(func = func1,args=('sxt4',))
pool.apply_async(func = func1,args=('sxt5',))
pool.apply_async(func = func1,args=('sxt6',))
pool.apply_async(func = func1,args=('sxt7',))
pool.apply_async(func = func1,args=('sxt8',))
pool.close()
pool.join()
使用with管理进程池
def func1(name):
print(f"当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1,('sxt1,','sxt2,','sxt3,','sxt4,','sxt5,','sxt6,','sxt7,','sxt8,'))
for a in args:
print(a)
协程是什么
协程,Coroutines,也叫作纤程(Fiber)
协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。
当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。一个故事说明进程、线程、协程的关系
乔布斯想开工厂生产手机,费劲力气,制作一条生产线,这个生产线上有很多的器件以及材料。一条生产线就是一个进程。
只有生产线是不够的,所以找五个工人来进行生产,这个工人能够利用这些材料最终一步步的将手机做出来,这五个工人就
是五个线程。
为了提高生产率,想到3种办法:
1 一条生产线上多招些工人,一起来做手机,这样效率是成倍増长,即单进程多线程方式
2 多条生产线,每个生产线上多个工人,即多进程多线程
乔布斯深入一线发现工人不是那么忙,有很多等待时间。于是规定:如果某个员工在等待生
产线某个零件生产时 ,不要闲着,干点其他工作。也就是说:如果一个线程等待某些条件,
可以充分利用这个时间去做其它事情,这就是:协程方式。
不使用协程执行多个任务
def func1():
for i in range(3):
print(f'北京:第{i}次打印啦')
time.sleep(1)
return "func1执行完毕"
def func2():
for k in range(3):
print(f'上海:第{k}次打印了' )
time.sleep(1)
return "func2执行完毕"
def main():
func1()
func2()
if __name__ == '__main__':
start_time = time.time()
main()
end_time = time.time()
print(f"耗时{end_time-start_time}") #不使用协程,耗时6秒
使用yield协程,实现任务切换
def func1():
for i in range(3):
print(f'北京:第{i}次打印啦')
yield # 只要方法包含了yield,就变成一个生成器
time.sleep(1)
def func2():
g = func1() #func1是一个生成器,func1()就不会直接调用,需要通过next()
print(type(g))
for k in range(3):
print(f'上海:第{k}次打印了' )
next(g) #继续执行func1的代码
time.sleep(1)
if __name__ == '__main__':
#有了yield,我们实现了两个任务的切换+保存状态
start_time = time.time()
func2()
end_time = time.time()
print(f"耗时{end_time-start_time}") #耗时5.0秒,效率差别不大
asyncio实现协程(重点)
- 常的函数执行时是不会中断的,所以你要写一个能够中断的函数,就需要加 async.async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行
- await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。
- asyncio 是python3.5之后的协程模块,是python实现并发重要的包,这个包使用事件循环驱动实现并发。
async def func1(): #async表示方法是异步的
for i in range(3):
print(f'北京:第{i}次打印啦')
await asyncio.sleep(1)
return "func1执行完毕"
async def func2():
for k in range(3):
print(f'上海:第{k}次打印了' )
await asyncio.sleep(1)
return "func2执行完毕"
async def main():
res = await asyncio.gather(func1(), func2())
#await异步执行func1方法
#返回值为函数的返回值列表
print(res)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"耗时{end_time-start_time}") #耗时3秒,效率极大提高