参考资料
https://blog.csdn.net/qq_43380180/article/details/111573642?spm=1001.2014.3001.5506
协程的概念
指的是在一个线程中,可以在某个地方挂起的特殊函数,并且可以重新在挂起处继续运行。协程不是进程,也不是线程。
进程 VS 线程 VS 协程
- 进程是操作系统资源分配的基本单位,偏向于内存,所以进程多了之后比较消耗内存
- 线程是操作系统资源调度的基本单位,偏向于CPU,它是依赖于进程运行,即一个进程包含多个线程,一个进程内可以有多个线程并发运行。
- 协程是在线程内的,一个线程可以包含多个协程,但协程都是串行运行的,不论CPU的核心数。单一个协程运行时,其他协程就会挂起,等待。
- 如看下图的关系图(网上找的)
- 上下文切换比较
进程 | 线程 | 协程 | |
---|---|---|---|
切换者 | 操作系统 | 操作系统 | 用户(编程者/应用程序) |
切换时机 | 根据操作系统自己的切换策略,用户不感知 | 根据操作系统自己的切换策略,用户不感知 | 用户(的程序)自己决定 |
切换内容 | 页全局目录 内核栈 硬核上下文 | 内核栈 硬核上下文 | 硬件上下文 |
切换内容的保存 | 保存于内核中 | 保存于内核中 | 保存于用户自己的变量(用户栈/堆) |
切换过程 | 用户态-内核态-用户态 | 用户态-内核态-用户态 | 用户态(没有内核态) |
阻塞/非阻塞
- 概念:
指的是调用者(程序) 在等待返回结果,或输入时 的状态。
- 阻塞时
在调用结果返回前,当前线程会被挂起,并在结果之后返回。
- 非阻塞时
如果不能立刻得到结果,则该调用者不会阻塞当前线程。因此对应非阻塞的情况,调用者需要定时轮询查看处理状态
并发/并行
- 并发:
在操作系统中,是指一个时间段中几个程序都处于运行中,且这几个程序都是在同一个处理器上运行,但任一个时刻点上只有一个程序在处理器上运行。
- 并行:
在操作系统中,在不同进程中同时执行,无论微观还是宏观,程序都是一起执行的。
- 区别:
并发是在同一个时间段内,两个或多个程序执行,有时间上的重叠(宏观上是同时,微观上仍是顺序执行)
同步/异步
- 同步:在发出一个同步调用时,在没有得到结果前,改掉用就不返回。
- 异步:在发出一个异步调用后,调用者不会立刻得到结果,该调用就返回了。
- 和阻塞的区别:
- 同步和阻塞的定义很想,但是两个不同概念,同步不一定阻塞;在结果没有返回前,阻塞是指线程被挂起,这段程序不再执行;而同步是在线程还在运行状态,CPU 还在执行这段程序。
- 异步和非阻塞定义也很想,也是两个不同概念,异步指的是在调用时不会立即得到结果,调用就会返回了。线程可能阻塞,也可能不阻塞。而非阻塞是指调用的时候,线程一定不会进入非阻塞状态。
协程的使用场景
在我们的程序执行过程中,IO 是我们最大的瓶颈。协程是适合处理IO阻塞的任务,即一个协程在遇到IO阻塞时,就会挂起,而去处理其他协程。等上一个IO阻塞释放了,就会重新再接着挂起处,继续往下执行。所以它适合做异步任务,比如像网络请求,文件的读写,数据库的读写等。我们常用的sleep操作也是属于阻塞的。
但如果是 I/O 密集型的,协程因为不能利用多核的能力,那么它就不能应付了,所以得使用多核的能力,比如是"多进程/多线程+协程"的方案来处理。
异步编程
事件循环+回调
- 所谓事件循环,并非是一个真正意义上的循环,理解为一种定义,可以理解为是主线程不断的从事件队列里面取值/函数的过程,因为这一过程是不断的去检测并执行某些代码,所以我们为了方便把这个过程叫
事件循环
。 - 事件本身没有循环,循环的只是主线程取时间的动作。软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用,回调、异步调用。
- 同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用。
- 回调是一种双向调用模式,调用方要等待对方执行完才返回,它是一种单向调用。
- 异步调用是一种类似消息或事件的机制,不过他的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。
- 回调和异步调用的关系非常紧密,通常我们使用回调来实现异步消息的注册,通过异步调用来实现消息的通知。
伪代码如下:
任务列表 = [任务1,任务2,任务3...]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行' 和 '已完成'的任务返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
if 任务列表中的任务都已完成
则终止循环
python3.7之前的事件循环
单任务
import asyncio
async def test():
print(test,hello,world!)
pass
# 创建协程对象(任务)
co = test()
# 去生成或获取一个事件循环对象
loop = asyncio.get_envent_loop()
# 将任务放到任务列表
loop.run_until_complete(co)
python3.7之后的事件循环
- 协程函数:定义函数的时候格式:
async def 函数名
- 协程对象:协程函数(),得到的就是一个协程对象
import asyncio
async def test():
print(test,hello,world!)
pass
# 创建协程对象
co = test()
asyncio.run(co)
任务等待 await
await 可等待的对象(协程对象、Future、Task对象,IO 等待)
await 是当前的处理操作 需要得到上一个异步的处理结果时,就需要用 await,但不影响异步的处理。
例子1:
import asyncio
async def test2():
print("test2")
# io等待2秒,如果有其他任务,就会切换到其他任务
res = await asyncio.sleep(2)
print("结束",res)
# 执行协程对象
asyncio.run(test2())
例子2:
import asyncio
import datetime
async def delay_print(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {datetime.datetime.now()}")
await delay_print(1, 'hello')
# await say_after(1,'hello')执行完后,才继续向下执行
await delay_print(2, 'world')
print(f"finished at {datetime.datetime.now()}")
if __name__ == '__main__':
asyncio.run(main())
Task对象
指在事件循环中添加多个任务。
Tasks 用于并发调度协程,通过 asyncio.create_task(协程对象)
的方式创建Task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task(协程对象)
外,还可以用低层级的loop.create_task()
或ensure_future()
函数。不建议手动实例化Task 对象。
注意:
asyncio.create_task()
函数在Python3.7中被加入。在 Python3.7之前,可以改用低层级的asyncio.ensure_future()
函数
- 示例1:
import asyncio
async def t1():
"""
异步任务1
@return:
"""
print(1)
await asyncio.sleep(2)
print(2)
print('task1 end')
return 't1'
async def t2():
"""
异步任务2
@return:
"""
print(3)
await asyncio.sleep(2)
print(4)
print('task2 end')
return 't2'
async def main():
"""
两个协程任务的执行,有点像 JAVA 中的线程通信机制notify...wait
@return:
"""
print('main开始')
# 创建两个任务
task1 = asyncio.create_task(t1())
task2 = asyncio.create_task(t2())
# 当执行task1时,遇到IO阻塞,就会挂起,然后去执行task2,同理,task2遇到IO阻塞,也会挂起,然后去执行task1
# await是等待task1,task2 执行完毕获取结果
res1 = await task1
res2 = await task2
print(res1, res2)
print('main end')
if __name__ == '__main__':
asyncio.run(main())
执行效果图:
执行流程如下:
1. 先t1协程,打印出1,接着sleep2秒,IO阻塞,t1协程挂起,Cpu执行权交个t2协程;
2. 然后执行t2协程,打印3,接着sleep2秒,IO阻塞,t2协程挂起,Cpu执行权又交给t1协程;
3. 再执行t1协程,接着上一次挂起位置继续往下执行,打印2,打印task1 end
,返回t1
,
4. 最后执行t2协程,接着上一次挂起位置继续往下执行,打印4,打印task2 end
,返回t2
下面是时序图如下:
-
示例2
-
任务列表
在示例1中只是两个协程任务,如果写任务有N个,那么这样子就不方便了,所以我们可以将多个任务放进任务列表,就像最上面的那个事件循环中的伪代码一样,可以将上面的示例1进行改造
import asyncio
async def t1():
"""
异步任务1
@return:
"""
print(1)
await asyncio.sleep(2)
print(2)
print('task1 end')
return 't1'
async def t2():
"""
异步任务2
@return:
"""
print(3)
await asyncio.sleep(2)
print(4)
print('task2 end')
return 't2'
async def main():
"""
@todo 注意:这里在创键任务列表的时候,同时也创建了事件循环对象
@return:
"""
print('main start')
# 将任务放入任务列表中,通过查看 create_task源码可以看到,它可以接收协程对象,协程名字,上下文对象
task_list = [
asyncio.create_task(t1(), name='t1'),
asyncio.create_task(t2(), name='t2')
]
# 等待任务任务全部执行完毕,done表示已经执行完毕的任务,pending表示未执行完毕的任务
done, pending = await asyncio.wait(task_list)
print("已经完成:", done)
print("未完成:", pending)
print('main end')
if __name__ == '__main__':
# 启动事件循环,事件循环在协程main()中,已经创建了
asyncio.run(main())
运行结果图:
create_task方法的源码,看它的参数说明
asyncio.wait 方法,则返回的是一个元祖,有两个返回值,done是已经完成的任务,pending 表示还未完成的任务,这两个都是集合类型
asyncio 的Future对象
它其实是底层的异步任务基类,Task继承自它,Task对象内部的await结果await就是基于它来实现的。
示例1
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),这个任务什么都不干
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果则会一直等待下去
await fut
if __name__ == '__main__':
asyncio.run(main())
示例2
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result(11111)
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),如果没有绑定事件,则这个任务永远不知道什么时候结束
fut = loop.create_future()
# 创建一个任务(Task 对象),绑定了set_after函数,在函数内部 sleep2秒后,给fut设置结果
# 手动设置future结果, 那么future就可以结束了
await loop.create_task(set_after(fut))
# 等待 future 的最终结果,否则就一直等待
data = await fut
print(data)
if __name__ == '__main__':
asyncio.run(main())
concurrent.futures.Future 对象
使用线程池,进程池实现异步操作时来使用
示例1:
不使用异步函数的情况
import time
from concurrent.futures.thread import ThreadPoolExecutor
def func(value):
time.sleep(1)
print(value)
if __name__ == '__main__':
"""
todo 这里使用线程池+异步任务的方式,可以实现并发执行
这里的线程池采用4个线程来处理10个任务,通过执行效果来看,任务是4个任务同时执行的,执行完再执行,后面4个任务
"""
# 创建线程池,因为电脑是4核,启动4个线程
pool = ThreadPoolExecutor(max_workers=4)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=4)
# 循环10次,往线程池中提交10次
for i in range(10):
# 提交异步任务,返回的是线程池中的Future对象
fut = pool.submit(func, i)
print(fut)
执行效果:
示例2:
不使用异步函数的线程池
import time
import asyncio
import concurrent.futures
def func1():
"""
同步函数
"""
time.sleep(2)
return "hello"
async def main():
"""
异步函数
"""
# 获取当前执行的事件驱动,在事件循环中使用默认线程池
loop = asyncio.get_running_loop()
# 第一步:内部先调用ThreadPoolExecutor的submit方法,去线程池中申请一个线程去执行func1函数,并返回一个 concurrent.futures.Future对象
# 第二步:调用 asyncio.wrap_future 将concurrent.future.Future 对象包装成 asyncio.Future 对象。
# 因为concurrent.futures.Future对象不支持 await 语法,所以需要包装为asyncio.Future 对象才能使用。
# 下面,就是将同步方法加入事件的线程池
fut = loop.run_in_executor(None,func1)
result = await fut
print('在事件循环中使用默认线程池:',result)
# 运行在自定义的线程池
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool,func1)
# print('自定义线程池:',result)
# 运行在自定义的进程池
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool,func1)
# print('自定义进程池:',result)
异步上下文
异步上下文的作用和应用场景
uvloop
asyncio 的增强版 uvloop,性能和 golang 比肩
- 安装
pip install uvloop
实际在操作时,也很简单,只要将事件驱动器替换成uvloop 就行,其他的都和 asyncio一样
- 代码实现
import asyncio
import uvloop
# 这就是来修改事件驱动器改成 uvloop
async.set_event_loop_policy(uvloop.EventloopPolicy())
# 编写 async 的代码,与之前使用 asyncio的代码一致
# 内部的事件循环的自动化会变成 uvloop
asyncio.run(...)
- 额外:
asgi 的 uvicorn底层就是使用了 uvloop,像 fastapi, django3+,底层 web 服务都是基于 asgi 的。
后续
后续会将异步请求,异步mysql,异步redis补上