Python 协程:使用 asyncio
高效管理并发任务
随着现代应用程序对并发执行的需求越来越高,异步编程(Asynchronous Programming)成为了提高性能的重要工具。Python 通过 asyncio
提供了内置的支持,允许我们以非阻塞的方式处理 I/O 密集型任务。
本文将带你从基础到进阶,了解如何使用 Python 协程与 asyncio
库管理并发任务。我们将通过实际的代码示例,并对每个示例进行优化,增加任务耗时的打印,帮助你更好地理解异步编程。
1. 协程与异步编程简介
在 Python 中,协程是一个可以暂停执行并在以后恢复执行的函数。你可以通过 async def
来定义一个协程函数,而 await
用于暂停协程的执行,直到某个异步操作完成。
异步编程的核心优势是能够并发地执行 I/O 密集型任务,而不需要阻塞主线程。例如,进行大量的文件读写或网络请求时,异步编程能够帮助你高效地利用系统资源,提高程序的响应速度。
2. 协程基本用法
2.1 启动单个协程
首先,我们来看一个简单的异步任务。下面的代码展示了如何定义和执行一个基本的异步任务,使用 asyncio.sleep()
模拟一个耗时的 I/O 操作。
import asyncio
import time
async def main():
print("Start")
start_time = time.time()
await asyncio.sleep(1) # 模拟一个异步操作,例如 I/O 操作
print(f"End (took {time.time() - start_time:.2f} seconds)")
# 运行事件循环
asyncio.run(main())
输出:
Start
End (took 1.01 seconds)
在这个例子中,main
是一个简单的协程函数,模拟了一个耗时 1 秒的操作。通过 await asyncio.sleep(1)
暂停协程的执行,直到这个异步操作完成。
2.2 多个协程的串行执行
如果你有多个异步任务,并且它们依赖于前一个任务的完成,那么你可以串行地执行它们。下面是一个串行执行的例子:
import asyncio
import time
async def main():
print("Start")
start_time = time.time()
await asyncio.sleep(1) # 模拟第一个异步操作
await asyncio.sleep(1) # 模拟第二个异步操作
print(f"End (took {time.time() - start_time:.2f} seconds)")
# 运行事件循环
asyncio.run(main())
输出:
Start
End (took 2.02 seconds)
在这个例子中,我们增加了一个额外的 await asyncio.sleep(1)
,模拟了连续的异步操作。这两个操作是串行执行的,因此总共耗时 2 秒。
3. 创建并发任务
3.1 使用 asyncio.create_task()
创建并发任务
asyncio.create_task()
是一个非常重要的函数,它用于将协程包装成任务并安排执行。与 asyncio.gather()
不同,create_task()
是一种更灵活的方式来创建和调度异步任务。
下面是使用 asyncio.create_task()
创建并发任务的一个简单示例:
import asyncio
import time
async def task(name):
await asyncio.sleep(1)
print(f"{name} completed")
async def main():
print("Start")
start_time = time.time()
# 使用 asyncio.create_task 创建并发任务
task1 = asyncio.create_task(task("task1"))
task2 = asyncio.create_task(task("task2"))
# 等待所有任务完成
await task1
await task2
print(f"End (took {time.time() - start_time:.2f} seconds)")
# 运行事件循环
asyncio.run(main())
输出:
Start
task1 completed
task2 completed
End (took 1.01 seconds)
在这个例子中,task1
和 task2
是通过 asyncio.create_task()
创建的,它们会并发执行。
3.2 使用 asyncio.gather()
创建并发任务
如果你希望同时执行多个任务,使用 asyncio.gather()
可以让你并发地运行多个协程,直到它们全部完成。下面是一个例子:
import asyncio
import time
async def task(name):
await asyncio.sleep(1)
print(f"{name} completed")
async def main():
print("Start")
start_time = time.time()
# 并发执行两个任务
await asyncio.gather(task('task1'), task('task2'))
print(f"End (took {time.time() - start_time:.2f} seconds)")
# 运行事件循环
asyncio.run(main())
输出:
Start
task1 completed
task2 completed
End (took 1.01 seconds)
在这个例子中,task1
和 task2
会并发执行,因此总的执行时间为 1 秒,而不是 2 秒。
3.3 使用 asyncio.TaskGroup
创建并发任务
在 Python 3.11 中,asyncio
引入了 TaskGroup
,这是一个更高级的任务管理工具。通过 TaskGroup
,你可以动态创建和管理多个异步任务,同时确保所有任务在退出时都已完成或抛出异常。
下面是一个使用 TaskGroup
执行并发任务的例子:
import asyncio
import time
async def task(name):
await asyncio.sleep(1)
print(f"{name} completed")
return name
async def main():
print("Start")
start_time = time.time()
# 使用 TaskGroup 并发执行任务
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task1"))
tg.create_task(task("task2"))
print(f"End (took {time.time() - start_time:.2f} seconds)")
# 运行事件循环
asyncio.run(main())
输出:
Start
task1 completed
task2 completed
End (took 1.01 seconds)
在这个例子中,我们使用 async with asyncio.TaskGroup()
创建了一个任务组,并通过 create_task
动态地将任务添加到任务组中。所有任务会并发执行,并且任务组会确保在所有任务完成后才会退出。
4. 任务异常处理
如果任务中的任何一个发生异常,TaskGroup
会自动取消所有其他未完成的任务,并抛出第一个异常。这是 TaskGroup
提供的一个非常实用的功能,它保证了任务的原子性。
import asyncio
import time
async def task(name):
await asyncio.sleep(1)
if name == "task2":
raise ValueError("An error occurred in task2")
print(f"{name} completed")
return name
async def main():
print("Start")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task1"))
tg.create_task(task("task2"))
except Exception as e:
print(f"Error: {e}")
print("End")
# 运行事件循环
asyncio.run(main())
输出:
Start
task1 completed
Error: unhandled errors in a TaskGroup (1 sub-exception)
End
5. 回调函数的使用
如果你希望在任务完成后执行某些操作(如日志记录或清理工作),你可以为任务设置回调函数。回调函数会在任务完成时被调用,传递任务的结果。
import asyncio
import time
async def task(name):
await asyncio.sleep(1)
print(f"{name} completed")
return name
def callback(task):
print(f"Callback: {task.result()}")
async def main():
print("Start")
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(task("task1"))
task2 = tg.create_task(task("task2"))
# 为任务添加回调函数
task1.add_done_callback(callback)
task2.add_done_callback(callback)
print("End")
# 运行事件循环
asyncio
.run(main())
输出:
Start
task1 completed
task2 completed
Callback: task1
Callback: task2
End
5. 超时与等待
5.1 asyncio.wait_for
使用 asyncio.wait_for()
可以为协程设置超时时间,超过这个时间后会抛出 TimeoutError
。
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
try:
result = await asyncio.wait_for(task('task1', 3), timeout=2) # 设置 2 秒超时
print(result)
except asyncio.TimeoutError:
print("The task timed out!")
asyncio.run(main())
5.2 asyncio.wait
asyncio.wait
允许你同时等待多个任务的完成,并且可以设置超时时间。
asyncio.wait
是 Python 中 asyncio
库的一个函数,它用于等待多个异步任务的完成。与 asyncio.gather
不同,asyncio.wait
返回的是两个集合,分别是已完成(done)和未完成(pending)任务的集合,可以让你更加灵活地处理任务的结果。
asyncio.wait
函数的签名:
asyncio.wait(fs, timeout=None, return_when=ALL_COMPLETED)
参数:
- fs:一个可迭代对象,包含多个协程任务、
Future
或Task
对象。 - timeout(可选):一个浮动的时间值(以秒为单位),如果在此时间内任务未完成,则函数会返回,且未完成的任务仍然处于挂起状态。
- return_when(可选):确定什么时候返回。可取以下值:
asyncio.ALL_COMPLETED
(默认):等待所有任务完成。asyncio.FIRST_COMPLETED
:等待第一个任务完成。asyncio.FIRST_EXCEPTION
:等待第一个任务完成或抛出异常。
返回:
返回一个元组 (done, pending)
,其中:
done
是已完成的任务集合。pending
是未完成的任务集合。
示例代码
1. 等待所有任务完成
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} completed")
async def main():
tasks = [
asyncio.create_task(task('task1', 3)),
asyncio.create_task(task('task2', 2)),
asyncio.create_task(task('task3', 1))
]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
print("All tasks completed")
for t in done:
print(f"{t.result()}")
# 运行事件循环
asyncio.run(main())
输出:
task3 completed
task2 completed
task1 completed
All tasks completed
Task finished
Task finished
Task finished
在这个例子中,我们使用 asyncio.wait
等待所有任务完成。它返回了两个集合,done
和 pending
。由于我们没有设置超时,因此它会等到所有任务完成后返回。
2. 等待第一个任务完成
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} completed")
async def main():
tasks = [
asyncio.create_task(task('task1', 3)),
asyncio.create_task(task('task2', 2)),
asyncio.create_task(task('task3', 1))
]
# 等待第一个任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print("At least one task is completed")
for t in done:
print(f"{t.result()}")
# 运行事件循环
asyncio.run(main())
输出:
task3 completed
At least one task is completed
Task finished
在这个例子中,asyncio.wait
通过设置 return_when=asyncio.FIRST_COMPLETED
,会在第一个任务完成时立即返回,不会等待所有任务完成。
3. 等待第一个抛出异常的任务
import asyncio
async def task(name, delay, should_fail=False):
await asyncio.sleep(delay)
if should_fail:
raise Exception(f"{name} failed")
print(f"{name} completed")
return f"{name} finished"
async def main():
tasks = [
asyncio.create_task(task('task1', 3)),
asyncio.create_task(task('task2', 2, should_fail=True)),
asyncio.create_task(task('task3', 1))
]
# 等待第一个任务完成或抛出异常
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
print("At least one task is completed or failed")
for t in done:
if t.exception():
print(f"Task failed with exception: {t.exception()}")
else:
print(f"Task result: {t.result()}")
# 运行事件循环
asyncio.run(main())
输出:
task3 completed
task2 failed with exception: task2 failed
At least one task is completed or failed
Task result: task3 finished
在这个例子中,asyncio.wait
设置了 return_when=asyncio.FIRST_EXCEPTION
,它会在第一个任务抛出异常时停止等待。返回的 done
集合会包含已完成的任务(包括抛出异常的任务)。我们可以通过 task.exception()
获取任务的异常信息。
4. 设置超时
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task('task1', 3)),
asyncio.create_task(task('task2', 2)),
asyncio.create_task(task('task3', 1))
]
# 等待最多 2 秒,之后返回
done, pending = await asyncio.wait(tasks, timeout=2)
print("Waiting completed (with timeout)")
for t in done:
print(f"{t.result()}")
# 检查未完成的任务
for t in pending:
print(f"Task {t.get_name()} is still pending.")
# 运行事件循环
asyncio.run(main())
输出:
task3 completed
task2 completed
Waiting completed (with timeout)
Task task1 is still pending.
在这个例子中,asyncio.wait
使用了 timeout=2
来设定最多等待 2 秒。在 2 秒内,task1
没有完成,因此它被放入了 pending
集合中,而其他任务已经完成。
总结
asyncio.wait()
允许你并行等待多个任务,它会返回两个集合:done
(已完成的任务)和pending
(未完成的任务)。- 可以通过
return_when
控制何时返回,支持以下选项:asyncio.ALL_COMPLETED
:等待所有任务完成。asyncio.FIRST_COMPLETED
:等待第一个任务完成。asyncio.FIRST_EXCEPTION
:等待第一个任务抛出异常。
- 你可以设置
timeout
来限制等待的最大时间。 done
集合中的任务可以通过task.result()
获取任务的结果,或通过task.exception()
获取任务抛出的异常。
5.3 asyncio.timeout
(Python 3.11+)
Python 3.11 引入了 asyncio.timeout
,这为超时控制提供了更加简洁的语法。
import asyncio
async def long_running_task():
await asyncio.sleep(5) # 模拟一个耗时的异步任务
return "Task completed"
async def main():
try:
async with asyncio.timeout(3): # 设置 3 秒的超时
result = await long_running_task() # 如果超时,将抛出 TimeoutError
print(result)
except asyncio.TimeoutError:
print("Task timed out!")
asyncio.run(main())
6. 总结
本文展示了如何使用 Python 3.11 中的 asyncio
进行异步编程。我们从基础的单个任务执行开始,逐步介绍了如何并发执行多个任务、如何使用 TaskGroup
管理任务、以及如何通过 asyncio.create_task()
创建并发任务并调度执行。
asyncio.create_task()
是非常常用的方式,它能够直接将协程转换成任务,并将任务加入事件循环。它比 asyncio.gather()
更灵活,适用于需要更复杂任务调度和控制的场景。希望本文能够帮助你更好地理解 Python 协程和 asyncio
库的使用,提升你在处理并发任务时的效率。