一、引言
Python异步开发已经非常流行了,一些主流的组件像MySQL、Redis、RabbitMQ等都提供了异步的客户端,再处理耗时的时候不会堵塞住主线程,不但可以提高并发能力,也能减少多线程带来的cpu上下文切换以及内存资源消耗。但在业务开发的时候一些第三方库没有异步的处理方式,例如OSS、CV、其他第三方提供的SDK以及自己封装的函数有耗时等,此时还是需要借助线程来加速,再异步中就不会堵塞主线程,因此封装一个异步装饰器可以更好的处理异步,让代码更简洁。
常用组件异步库
序号 | 组件名 | 异步库 | 说明 |
---|---|---|---|
1 | MySQL | aiomysql github.com/aio-libs/ai… | |
SQLAIchemy github.com/sqlalchemy/… | |||
tortoise-orm github.com/tortoise/to… | aiomysql 基于asyncio的MySQL驱动,用asyncio实现异步IO。这是最常用的Python异步MySQL驱动。 | ||
SQLAIchemy 和 Tortoise-ORM 都是支持异步的ORM | |||
2 | Redis | aioredis github.com/aio-libs-ab… | aioredis因为性能好、使用广泛,是Python最主流的Redis异步驱动之一。 之前同步库的redis.py 后面再4.2.0rc1+版本也支持aioredis |
3 | RabbitMQ | aio-pika github.com/mosquito/ai… | |
celery github.com/celery/cele… | 基于asyncio的RabbitMQ异步客户端,是最常用的Python异步RabbitMQ客户端之一。 celery是分布式任务队列框架,也支持常用消息中间件的异步封装 | ||
4 | Kafka | aio-kafka github.com/aio-libs/ai… | kafka常用的异步客户端 |
5 | http客户端 | httpx github.com/encode/http… | |
aiohttp github.com/aio-libs/ai… | httpx一个现代的异步HTTP客户端,支持asyncio和操作语法与 Requests 库的API一致。 aiohttp也是十分常用的异步客户端,性能也不错 | ||
6 | 文件处理 | aiofiles github.com/Tinche/aiof… | 基于asyncio的异步文件操作库,提供类文件对象接口 |
这里就简单列举一些常用的异步库,可以发现好多异步库都在 github 的 aio-libs 中慢慢孵化,质量都挺不错的。大家有什么好用的异步库欢迎评论区推荐。
github.com/aio-libs
二、功能分析
-
支持同步函数使用线程加速
-
异、同步函数需支持 await 语法等待返回结果
-
异、同步函数需支持后台任务,无需等待
同步函数使用线程加速
同步函数使用线程,这还是挺简单的使用,内置库的 threading.Thread 就可以实现
import time
import threading
def task1(name):
print(f"Hello {name}")
time.sleep(1)
print(f"Completed {name}")
t1 = threading.Thread(target=task1, args=("hui",))
t2 = threading.Thread(target=task1, args=("wang",))
t1.start()
t2.start()
t1.join()
t2.join()
>>> out
Hello hui
Hello wang
Completed hui
Completed wang
- start()方法用于启动线程执行函数。
- join()方法用于等待线程执行结束。
但这样直接开线程的方式比较暴力,也不太好管理,因此可以想到线程池,进行线程复用与管理。Python内置的 concurrent.futures 模块提供了线程池和进程池的实现与封装。
import time
from concurrent.futures import ThreadPoolExecutor
def task2(name):
print(f"Hello {name}")
time.sleep(1)
return f"Completed {name}"
with ThreadPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(task2, "hui")
future2 = executor.submit(task2, "zack")
print("ret1", future1.result())
print("ret2", future2.result())
>>> out
Hello hui
Hello zack
ret1 Completed hui
ret2 Completed zack
异、同步函数需支持 await 语法
异、同步函数需支持 await 语法等待返回结果,异步函数本身就支持 await语法,这里主要是实现同步函数支持
await 语法,在python中可以await语法的对象有如下几大类:
- 协程对象(coroutine):定义了__await__方法的对象,异步框架中的协程函数都是此类型。
- 任务对象(Task):封装了协程的对象, 如 asyncio 中的 Task, trio中的Task。
- Future对象:表示异步操作结果的对象, 如 concurrent.futures.Future。
- 协程装饰器封装的对象:一些装饰器可以将普通函数或对象包装成可await的对象,如@asyncio.coroutine。
综上,凡是实现了__await__魔术方法的对象或者封装了协程/任务的对象,都可以被await,await会自动把对象交给事件循环运行,等待其完成。
常见的可await对象包括协程函数、任务对象、Future、被@coroutine装饰的函数等,这可以使异步代码更简洁。await对象可以暂停当前协程,等待异步操作完成后再继续执行。
import asyncio
async def coro_demo():
print("await coroutine demo")
async def task_demo():
print("await task demo")
async def coro():
print("in coro task")
# 创建 Task 对象
task = asyncio.create_task(coro())
await task
async def future_demo():
print("await future demo")
future = asyncio.Future()
await future
# 这个装饰器已经过时
@asyncio.coroutine
def coro_decorated_demo():
print("await decorated function demo")
async def main():
await coro_demo()
await task_demo()
await future_demo()
await coro_decorated_demo()
if __name__ == '__main__':
asyncio.run(main())
>>> out
DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
def coro_decorated_demo():
await coroutine demo
await task demo
in coro task
await future demo
这个 @asyncio.coroutine 协程装饰器已经过时了,都是使用 async、await 语法替代。
下面是实现 __await__ 方法的demo
import asyncio
class AsyncDownloader:
def __init__(self, url):
self.url = url
self.download_ret = None
def __await__(self):
print(f'Starting download of {self.url}')
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, self.download)
yield from future.__await__()
return self
def download(self):
print(f'Downloading {self.url}...')
# 模拟下载过程
import time
time.sleep(2)
self.download_ret = f'{self.url} downloaded ok'
async def main():
print('Creating downloader...')
downloader = AsyncDownloader('https://www.ithui.top/file.zip')
print('Waiting for download...')
downloader_obj = await downloader
print(f'Download result: {downloader_obj.download_ret}')
if __name__ == '__main__':
asyncio.run(main())
>>> out
Creating downloader...
Waiting for download...
Starting download of https://www.ithui.top/file.zip
Downloading https://www.ithui.top/file.zip...
Download result: https://www.ithui.top/file.zip downloaded ok
用 yield from 来迭代 future对象(符合__await__逻辑),并在结束时return self
异、同步函数需支持后台任务
异步、后台任务的好处与场景
-
减少主程序的等待时间
异步函数可以通过后台任务的方式执行一些耗时操作,如IO操作、网络请求等,而主程序无需等待这些操作完成,可以继续执行其他任务,从而减少程序总体的等待时间。
-
提高程序响应性能
后台任务的异步执行,可以避免主程序被长时间阻塞,从而改善程序的整体响应性能。用户无需长时间等待才能得到响应。
-
解决IO密集型任务阻塞问题
对于网络、文件IO等密集型任务,使用同步执行可能会导致长时间阻塞,而异步后台任务可以很好地解决这个问题,避免资源浪费。
-
良好的用户体验
后台任务的异步处理,给用户的感觉是多个任务同时在执行,实际上CPU在切换处理,这相比线性等待任务完成,可以提供更好的用户体验。
-
适用于不需要实时结果的任务
邮件发送、数据批处理、文件处理等不需要用户即时等待结果的任务非常适合通过异步方式在后台完成。
在python中同异步函数实现后台任务
-
异步函数可以通过 asyncio.create_task 方法实现后台任务
-
同步函数可以通过线程、线程池来实现
import asyncio
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
async def async_bg_task():
print('async bg task running')
await asyncio.sleep(3)
print('async bg task completed')
def sync_bg_task():
print('sync bg task running')
time.sleep(3)
print('sync bg task completed')
async def main():
print('Starting main program')
# 异步函数的后台任务
asyncio.create_task(async_bg_task())
# 同步函数的后台任务
# with ThreadPoolExecutor() as executor:
# executor.submit(sync_bg_task)
# Thread(target=sync_bg_task).start()
loop = asyncio.get_running_loop()
loop.run_in_executor(executor=ThreadPoolExecutor(), func=sync_bg_task)
print('Main program continues')
await asyncio.sleep(5)
if __name__ == '__main__':
asyncio.run(main())
>>> ThreadPoolExecutor out
Starting main program
sync bg task running
sync bg task completed
Main program continues
async bg task running
async bg task completed
>>> Thread out
Starting main program
sync bg task running
Main program continues
async bg task running
sync bg task completed
async bg task completed
>>> run_in_executor out
Starting main program
sync bg task running
Main program continues
async bg task running
async bg task completed
sync bg task completed
看输出结果可以发现在同步函数使用直接使用线程池 ThreadPoolExecutor 执行还是堵塞了主线程,然后 Thread 没有,通过 loop.run_in_executor 也不会阻塞。后面发现 是 with 语法导致的堵塞,with 的根本原因就是它会等待线程池内的所有线程任务完成并回收,所以主线程必须等同步函数结束后才能继续。一开始我还一以为是线程池使用了主线程的线程后面打印线程名称看了下不是,然后调试下就发现了with的问题。
import asyncio
import time
import threading
from concurrent.futures import ThreadPoolExecutor
async def async_bg_task():
print(f"async_bg_task In thread: {threading.current_thread().name}")
print('async bg task running')
await asyncio.sleep(3)
print('async bg task completed')
def sync_bg_task(num):
print(f"sync_bg_task{num} In thread: {threading.current_thread().name}")
print(f'sync bg task{num} running')
time.sleep(3)
print(f'sync bg task{num} completed')
async def main():
print('Starting main program')
# 异步函数的后台任务
asyncio.create_task(async_bg_task())
# 同步函数的后台任务
thread_pool = ThreadPoolExecutor()
# with thread_pool as pool:
# for i in range(5):
# pool.submit(sync_bg_task, i)
for i in range(5):
thread_pool.submit(sync_bg_task, i)
threading.Thread(target=sync_bg_task, args=["thread"]).start()
loop = asyncio.get_running_loop()
loop.run_in_executor(ThreadPoolExecutor(), sync_bg_task, "loop.run_in_executor")
print('Main program continues')
print(f"Main program In thread: {threading.current_thread().name}")
await asyncio.sleep(5)
if __name__ == '__main__':
asyncio.run(main())
三、具体封装实现
import asyncio
from concurrent.futures import ThreadPoolExecutor, Executor
def run_on_executor(executor: Executor = None, background: bool = False):
"""
异步装饰器
- 支持同步函数使用 executor 加速
- 异步函数和同步函数都可以使用 `await` 语法等待返回结果
- 异步函数和同步函数都支持后台任务,无需等待
Args:
executor: 函数执行器, 装饰同步函数的时候使用
background: 是否后台执行,默认False
Returns:
"""
def _run_on_executor(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if background:
return asyncio.create_task(func(*args, **kwargs))
else:
return await func(*args, **kwargs)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
task_func = functools.partial(func, *args, **kwargs) # 支持关键字参数
return loop.run_in_executor(executor, task_func)
# 异步函数判断
wrapper_func = async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return wrapper_func
return _run_on_executor
封装成了带参数的装饰器
-
executor: 函数执行器, 装饰同步函数的时候使用
- 可以传递指定的线程池,默认None 根据系统cpu核心数动态创建线程的数量
-
background: 用于标识是否后台执行,默认False
- 有点诟病同步函数的后台任务没有用到这个参数而是使用 await 语法控制,但在使用装饰器时候可以起到后台任务标识作用,别人一看有这个参数就知道是后台任务就不用细看函数业务逻辑
- 后续再看看怎么优化,大家有没有比较好建议
-
loop.run_in_executor(executor, task_func) 方法不支持关键字参数的传递,故而采用 task_func = functools.partial(func, *args, **kwargs) ,来构造一个不带参数的函数就可以方便使用了
测试demo
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from py_tools.decorators.base import run_on_executor
from loguru import logger
thread_executor = ThreadPoolExecutor(max_workers=3)
@run_on_executor(background=True)
async def async_func_bg_task():
logger.debug("async_func_bg_task start")
await asyncio.sleep(1)
logger.debug("async_func_bg_task running")
await asyncio.sleep(1)
logger.debug("async_func_bg_task end")
return "async_func_bg_task ret end"
@run_on_executor()
async def async_func():
logger.debug("async_func start")
await asyncio.sleep(1)
logger.debug("async_func running")
await asyncio.sleep(1)
return "async_func ret end"
@run_on_executor(background=True, executor=thread_executor)
def sync_func_bg_task():
logger.debug("sync_func_bg_task start")
time.sleep(1)
logger.debug("sync_func_bg_task running")
time.sleep(1)
logger.debug("sync_func_bg_task end")
return "sync_func_bg_task end"
@run_on_executor()
def sync_func():
logger.debug("sync_func start")
time.sleep(1)
logger.debug("sync_func running")
time.sleep(1)
return "sync_func ret end"
async def main():
ret = await async_func()
logger.debug(ret)
async_bg_task = await async_func_bg_task()
logger.debug(f"async bg task {async_bg_task}")
logger.debug("async_func_bg_task 等待后台执行中")
loop = asyncio.get_event_loop()
for i in range(3):
loop.create_task(async_func())
ret = await sync_func()
logger.debug(ret)
sync_bg_task = sync_func_bg_task()
logger.debug(f"sync bg task {sync_bg_task}")
logger.debug("sync_func_bg_task 等待后台执行")
await asyncio.sleep(10)
if __name__ == '__main__':
asyncio.run(main())
测试详细输出
async_func start
async_func running
async_func ret end
async bg task <Task pending name='Task-2' coro=<async_func_bg_task() running at ...
sync_func start
async_func_bg_task start
async_func start
async_func start
async_func start
sync_func running
async_func_bg_task running
async_func running
async_func running
async_func running
async_func_bg_task end
sync_func ret end
sync_func_bg_task start
sync bg task <Future pending cb=[_chain_future.<locals>._call_check_cancel() at ...
sync_func_bg_task 等待后台执行
sync_func_bg_task running
sync_func_bg_task end
如果你对Python感兴趣,想要学习python,这里给大家分享一份Python全套学习资料,都是我自己学习时整理的,希望可以帮到你,一起加油!
😝有需要的小伙伴,可以V扫描下方二维码免费领取🆓
1️⃣零基础入门
① 学习路线
对于从来没有接触过Python的同学,我们帮你准备了详细的学习成长路线图。可以说是最科学最系统的学习路线,你可以按照上面的知识点去找对应的学习资源,保证自己学得较为全面。
② 路线对应学习视频
还有很多适合0基础入门的学习视频,有了这些视频,轻轻松松上手Python~
③练习题
每节视频课后,都有对应的练习题哦,可以检验学习成果哈哈!
2️⃣国内外Python书籍、文档
① 文档和书籍资料
3️⃣Python工具包+项目源码合集
①Python工具包
学习Python常用的开发软件都在这里了!每个都有详细的安装教程,保证你可以安装成功哦!
②Python实战案例
光学理论是没用的,要学会跟着一起敲代码,动手实操,才能将自己的所学运用到实际当中去,这时候可以搞点实战案例来学习。100+实战案例源码等你来拿!
③Python小游戏源码
如果觉得上面的实战案例有点枯燥,可以试试自己用Python编写小游戏,让你的学习过程中增添一点趣味!
4️⃣Python面试题
我们学会了Python之后,有了技能就可以出去找工作啦!下面这些面试题是都来自阿里、腾讯、字节等一线互联网大厂,并且有阿里大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。
上述所有资料 ⚡️ ,朋友们如果有需要的,可以扫描下方👇👇👇二维码免费领取🆓