如何正确定义一个协程函数?直接调用协程会引发什么问题?
在 Python 中,正确定义一个协程函数需要使用async def
关键字。协程函数是一种特殊的函数,它在执行过程中可以暂停和恢复,从而允许程序在等待 I/O 操作时执行其他任务。以下是定义协程函数的示例:
import asyncio
async def my_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
在上述代码中,my_coroutine
是一个协程函数,使用async def
关键字定义。函数内部使用了await
关键字,它用于暂停协程的执行,直到等待的异步操作完成。
直接调用协程函数并不会执行协程内部的代码,而是返回一个协程对象。如果直接调用协程对象而不将其交给事件循环处理,不会有任何实际的执行效果,并且会引发RuntimeWarning
警告。例如:
coro = my_coroutine()
# 这里不会执行协程内部代码,会警告
要正确执行协程,需要将协程对象交给事件循环处理。可以使用asyncio.run()
函数来运行协程,它会自动创建和管理事件循环。示例如下:
asyncio.run(my_coroutine())
或者手动管理事件循环:
loop = asyncio.get_event_loop()
loop.run_until_complete(my_coroutine())
loop.close()
如果不将协程对象交给事件循环处理,协程内部的代码不会执行,因为协程的执行需要事件循环的调度。事件循环负责监控协程的状态,当协程遇到await
语句暂停时,事件循环可以切换到其他协程继续执行,从而实现异步编程的效果。
使用 async def 定义的协程与普通函数执行流程有何本质区别?
使用async def
定义的协程与普通函数在执行流程上有本质区别。普通函数是顺序执行的,一旦开始执行,会一直运行到函数结束,期间不会暂停。例如:
def normal_function():
print("普通函数开始")
for i in range(3):
print(i)
print("普通函数结束")
normal_function()
在上述代码中,normal_function
是一个普通函数,当调用它时,会按照代码的顺序依次执行,直到函数结束。
而协程函数使用async def
定义,它可以在执行过程中暂停和恢复。协程函数内部可以使用await
关键字来暂停执行,等待某个异步操作完成后再继续执行。例如:
import asyncio
async def coroutine_function():
print("协程函数开始")
await asyncio.sleep(1)
print("协程函数结束")
async def main():
await coroutine_function()
asyncio.run(main())
在上述代码中,coroutine_function
是一个协程函数。当调用coroutine_function
时,它会先打印 “协程函数开始”,然后遇到await asyncio.sleep(1)
语句,此时协程会暂停执行,将控制权交还给事件循环。事件循环可以在这段时间内执行其他协程。当asyncio.sleep(1)
完成后,协程会恢复执行,打印 “协程函数结束”。
普通函数的调用是同步的,调用者必须等待函数执行完毕才能继续执行后续代码。而协程函数的调用是异步的,调用者可以在协程暂停时执行其他任务,从而提高程序的并发性能。此外,普通函数只能返回一个值,而协程函数可以通过await
与其他协程进行交互,实现更复杂的异步操作。
解释 asyncio.run () 的作用及与手动管理事件循环的差异
asyncio.run()
是 Python 3.7 及以上版本引入的一个高级函数,用于运行顶级的协程函数。它的主要作用是简化异步编程的代码,自动创建和管理事件循环。以下是asyncio.run()
的基本用法:
import asyncio
async def main():
print("异步任务开始")
await asyncio.sleep(1)
print("异步任务结束")
asyncio.run(main())
在上述代码中,asyncio.run(main())
会自动创建一个事件循环,将main
协程函数提交给事件循环执行,并且在协程执行完毕后关闭事件循环。
手动管理事件循环需要使用asyncio.get_event_loop()
来获取事件循环对象,然后使用run_until_complete()
方法来运行协程,最后手动关闭事件循环。示例如下:
import asyncio
async def main():
print("异步任务开始")
await asyncio.sleep(1)
print("异步任务结束")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
asyncio.run()
与手动管理事件循环的差异主要体现在以下几个方面:
- 代码简洁性:
asyncio.run()
的代码更加简洁,只需要一行代码就可以完成事件循环的创建、协程的执行和事件循环的关闭。而手动管理事件循环需要更多的代码来处理事件循环的创建、运行和关闭。 - 兼容性:
asyncio.run()
会确保事件循环是全新的,并且在运行结束后会关闭事件循环,避免了事件循环的复用问题。手动管理事件循环时,如果不小心复用了已经关闭的事件循环,会导致错误。 - 异常处理:
asyncio.run()
会自动处理异常,当协程中出现未处理的异常时,会打印错误信息并关闭事件循环。手动管理事件循环时,需要手动处理异常,否则事件循环可能会进入错误状态。
为什么协程中必须使用 await 而非 yield 挂起操作?
在 Python 的协程中,使用await
而不是yield
来挂起操作,主要是因为await
和yield
在语义和功能上有本质的区别。
yield
是 Python 生成器的关键字,它用于生成器函数中,将函数转换为一个生成器。生成器是一种可迭代对象,它可以在迭代过程中暂停和恢复执行。例如:
def generator_function():
for i in range(3):
yield i
gen = generator_function()
for num in gen:
print(num)
在上述代码中,generator_function
是一个生成器函数,使用yield
关键字将函数转换为生成器。当调用generator_function()
时,返回一个生成器对象,通过迭代生成器对象可以依次获取生成的值。
而await
是 Python 协程的关键字,它用于协程函数中,用于暂停协程的执行,等待一个可等待对象(如另一个协程、Future
对象等)完成。例如:
import asyncio
async def coroutine_function():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
async def main():
await coroutine_function()
asyncio.run(main())
在上述代码中,coroutine_function
是一个协程函数,使用await asyncio.sleep(1)
暂停协程的执行,等待asyncio.sleep(1)
完成后再继续执行。
使用await
而不是yield
的主要原因如下:
- 语义清晰:
await
明确表示协程需要等待一个异步操作完成,而yield
的语义更侧重于生成值。使用await
可以使代码的意图更加清晰,提高代码的可读性。 - 异步兼容性:
await
专门用于异步编程,它与事件循环和异步库(如asyncio
)紧密配合,可以实现高效的异步操作。而yield
主要用于同步的迭代操作,不适合处理异步任务。 - 错误处理:
await
会自动处理异步操作的结果和异常,当异步操作出现异常时,await
会将异常抛出,方便进行错误处理。而yield
需要手动处理异常,增加了代码的复杂性。
写出通过 async for 实现异步迭代器的代码模板
async for
是 Python 中用于异步迭代的语法,它允许在协程中异步地迭代一个异步可迭代对象。以下是通过async for
实现异步迭代器的代码模板:
import asyncio
# 定义一个异步迭代器类
class AsyncIterator:
def __init__(self, limit):
self.index = 0
self.limit = limit
# 实现异步迭代器协议的 __aiter__ 方法
def __aiter__(self):
return self
# 实现异步迭代器协议的 __anext__ 方法
async def __anext__(self):
if self.index < self.limit:
await asyncio.sleep(0.1) # 模拟异步操作
result = self.index
self.index += 1
return result
else:
raise StopAsyncIteration
# 定义一个异步函数,使用 async for 进行异步迭代
async def main():
async_iterator = AsyncIterator(5)
async for item in async_iterator:
print(item)
# 运行异步函数
asyncio.run(main())
在上述代码中,首先定义了一个异步迭代器类AsyncIterator
,它实现了异步迭代器协议的__aiter__
和__anext__
方法。__aiter__
方法返回异步迭代器对象本身,__anext__
方法是一个异步方法,用于返回下一个异步迭代的值。当迭代结束时,抛出StopAsyncIteration
异常。
然后定义了一个异步函数main
,在函数内部创建了一个AsyncIterator
对象,并使用async for
语句进行异步迭代。async for
会自动调用异步迭代器的__anext__
方法,直到抛出StopAsyncIteration
异常为止。
最后,使用asyncio.run()
函数运行main
协程函数,启动异步迭代过程。通过这种方式,可以在协程中异步地迭代一个异步可迭代对象,提高程序的并发性能。
如何捕获协程中未正确使用 await 导致的 RuntimeWarning?
在 Python 中,当协程中未正确使用 await
时,会触发 RuntimeWarning
。这通常发生在直接调用协程函数却未使用 await
或事件循环来运行协程的情况下。为了捕获这类警告,可以使用 warnings
模块。该模块提供了灵活的机制来控制警告的显示和处理。
下面是一个示例代码,展示了如何捕获因未正确使用 await
导致的 RuntimeWarning
:
import warnings
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return 42
def catch_runtime_warning():
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", RuntimeWarning)
# 直接调用协程函数,不使用 await
coro = my_coroutine()
for warning in w:
if issubclass(warning.category, RuntimeWarning):
print(f"捕获到 RuntimeWarning: {warning.message}")
catch_runtime_warning()
在上述代码中,warnings.catch_warnings
上下文管理器用于临时控制警告的行为。warnings.simplefilter("always", RuntimeWarning)
表示总是捕获 RuntimeWarning
。通过遍历 w
列表,可以检查是否有 RuntimeWarning
被触发,并打印相应的警告信息。
另外,在更复杂的异步应用中,可以在事件循环启动时进行全局的警告捕获。例如:
import warnings
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return 42
async def main():
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", RuntimeWarning)
coro = my_coroutine()
for warning in w:
if issubclass(warning.category, RuntimeWarning):
print(f"捕获到 RuntimeWarning: {warning.message}")
asyncio.run(main())
这种方式可以确保在整个异步程序执行过程中,对未正确使用 await
导致的 RuntimeWarning
进行捕获和处理,有助于及时发现和修复代码中的异步编程错误。
async with 在异步上下文管理中的应用场景(如数据库连接池)
async with
是 Python 中用于异步上下文管理的语法,它在处理异步资源时非常有用,特别是在需要自动管理资源的获取和释放的场景中。数据库连接池就是一个典型的应用场景。
在数据库操作中,频繁地创建和销毁数据库连接会带来较大的开销。使用数据库连接池可以复用已经创建的连接,提高性能。async with
可以与异步数据库连接池结合使用,确保在使用完数据库连接后自动将其返回到连接池中。
以下是一个使用 async with
和异步数据库连接池(以 asyncpg
为例)的示例:
import asyncio
import asyncpg
async def main():
# 创建数据库连接池
pool = await asyncpg.create_pool(
user='your_user',
password='your_password',
database='your_database',
host='your_host',
port='your_port'
)
async with pool.acquire() as connection:
# 执行数据库查询
result = await connection.fetchrow('SELECT 1')
print(result)
# 关闭连接池
await pool.close()
asyncio.run(main())
在上述代码中,async with pool.acquire() as connection
用于从连接池中获取一个数据库连接。当代码块执行完毕后,async with
会自动将连接返回到连接池中。这样可以避免手动管理连接的获取和释放,减少出错的可能性。
除了数据库连接池,async with
还可以应用于其他需要异步资源管理的场景,如文件操作、网络连接等。例如,在异步文件操作中,可以使用 async with
来确保文件在使用完毕后自动关闭:
import asyncio
async def read_file():
async with open('example.txt', 'r') as file:
content = await file.read()
print(content)
asyncio.run(read_file())
通过使用 async with
,可以使异步代码更加简洁、安全,提高代码的可维护性。
协程中同步调用阻塞 IO(如 time.sleep)会引发什么问题?如何解决?
在协程中同步调用阻塞 IO 操作(如 time.sleep
)会引发严重的性能问题,因为阻塞 IO 会暂停整个线程的执行,使得其他协程无法在这段时间内运行,从而破坏了异步编程的并发优势。
异步编程的核心是利用事件循环在等待 IO 操作时切换到其他协程继续执行,以提高程序的并发性能。而阻塞 IO 操作会使线程进入等待状态,事件循环无法进行协程的切换,导致所有协程都被阻塞,程序的执行效率大幅降低。
以下是一个示例代码,展示了在协程中使用 time.sleep
带来的问题:
import asyncio
import time
async def blocking_coroutine():
print("阻塞协程开始")
time.sleep(2) # 阻塞操作
print("阻塞协程结束")
async def non_blocking_coroutine():
print("非阻塞协程开始")
await asyncio.sleep(0.1)
print("非阻塞协程结束")
async def main():
task1 = asyncio.create_task(blocking_coroutine())
task2 = asyncio.create_task(non_blocking_coroutine())
await task1
await task2
asyncio.run(main())
在上述代码中,blocking_coroutine
中使用了 time.sleep(2)
,这会阻塞整个线程 2 秒钟。在这 2 秒钟内,non_blocking_coroutine
无法执行,即使它只需要 0.1 秒。
为了解决这个问题,应该使用异步版本的 IO 操作。在 Python 的 asyncio
库中,提供了许多异步函数,如 asyncio.sleep
用于替代 time.sleep
。对于其他阻塞 IO 操作,如文件读写、网络请求等,可以使用异步库来替代同步库。例如,使用 aiohttp
替代 requests
进行异步网络请求。
以下是修改后的代码,使用 asyncio.sleep
替代 time.sleep
:
import asyncio
async def non_blocking_coroutine():
print("非阻塞协程开始")
await asyncio.sleep(2)
print("非阻塞协程结束")
async def another_non_blocking_coroutine():
print("另一个非阻塞协程开始")
await asyncio.sleep(0.1)
print("另一个非阻塞协程结束")
async def main():
task1 = asyncio.create_task(non_blocking_coroutine())
task2 = asyncio.create_task(another_non_blocking_coroutine())
await task1
await task2
asyncio.run(main())
在修改后的代码中,所有协程都使用了异步的 asyncio.sleep
,事件循环可以在等待时切换到其他协程执行,从而提高了程序的并发性能。
解释 asyncio.iscoroutinefunction () 与 inspect.iscoroutine () 的区别
asyncio.iscoroutinefunction()
和 inspect.iscoroutine()
是 Python 中用于检查对象是否为协程相关对象的两个函数,但它们的检查对象有所不同。
asyncio.iscoroutinefunction()
用于检查一个对象是否为协程函数。协程函数是使用 async def
定义的函数,它本身并不会立即执行,而是返回一个协程对象。以下是一个示例代码:
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return 42
print(asyncio.iscoroutinefunction(my_coroutine)) # 输出: True
在上述代码中,my_coroutine
是一个协程函数,使用 asyncio.iscoroutinefunction()
检查它会返回 True
。
而 inspect.iscoroutine()
用于检查一个对象是否为协程对象。协程对象是调用协程函数返回的对象,它需要通过事件循环来执行。以下是一个示例代码:
import asyncio
import inspect
async def my_coroutine():
await asyncio.sleep(1)
return 42
coro = my_coroutine()
print(inspect.iscoroutine(coro)) # 输出: True
在上述代码中,my_coroutine()
返回一个协程对象 coro
,使用 inspect.iscoroutine()
检查它会返回 True
。
总结来说,asyncio.iscoroutinefunction()
检查的是函数本身是否为协程函数,而 inspect.iscoroutine()
检查的是对象是否为协程对象。这两个函数在异步编程中非常有用,可以帮助开发者在运行时判断对象的类型,从而进行相应的处理。例如,在编写异步框架时,可以使用这两个函数来确保传入的函数或对象是合法的协程函数或协程对象。
实现一个异步生成器,每秒生成递增数字直至终止条件
在 Python 中,可以使用异步生成器来每秒生成递增数字,直到满足终止条件。异步生成器结合了异步编程和生成器的特性,允许在异步环境中逐个生成值。
以下是一个实现异步生成器的示例代码,该异步生成器每秒生成一个递增的数字,直到达到指定的终止条件:
import asyncio
async def async_number_generator(limit):
num = 0
while num < limit:
yield num
num += 1
await asyncio.sleep(1)
async def main():
limit = 5
async for number in async_number_generator(limit):
print(number)
asyncio.run(main())
在上述代码中,首先定义了一个异步生成器函数 async_number_generator
。在函数内部,使用 while
循环生成递增的数字,每次生成一个数字后,使用 await asyncio.sleep(1)
暂停 1 秒钟。yield
关键字用于将生成的数字返回给调用者。
然后定义了一个异步函数 main
,在函数内部使用 async for
语句来迭代异步生成器。async for
会自动处理异步生成器的迭代过程,每次迭代时等待异步生成器生成下一个值。
最后,使用 asyncio.run()
函数运行 main
协程函数,启动异步生成器的迭代过程。当生成的数字达到终止条件(即 num
等于 limit
)时,异步生成器停止生成值,迭代结束。
通过这种方式,可以在异步环境中每秒生成一个递增的数字,直到满足终止条件,实现了异步的递增数字生成功能。
asyncio.create_task () 与 loop.create_task () 的适用场景差异
asyncio.create_task()
和loop.create_task()
都用于创建异步任务,但它们的适用场景略有不同。
asyncio.create_task()
:是 Python 3.7 及以上版本中推荐的创建任务的方式。它会使用当前的事件循环来创建任务,代码更加简洁和直观,适用于大多数常规的异步编程场景。比如在一个简单的异步爬虫程序中,需要并发地请求多个网页,就可以使用asyncio.create_task()
来创建多个任务,每个任务负责请求一个网页。示例如下:
import asyncio
async def fetch_url(url):
await asyncio.sleep(1)
return f"Content from {url}"
async def main():
urls = ["https://example.com", "https://example.org"]
tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
loop.create_task()
:在一些需要更精细地控制事件循环的场景中更有用,比如在已经获取到特定的事件循环对象loop
的情况下,或者在自定义的事件循环管理逻辑中。在编写一些复杂的异步应用框架时,可能需要在不同的地方使用不同的事件循环,这时就可以通过loop.create_task()
在特定的循环中创建任务。示例如下:
import asyncio
async def task_coro():
await asyncio.sleep(1)
return "Task completed"
loop = asyncio.get_event_loop()
task = loop.create_task(task_coro())
loop.run_until_complete(task)
print(task.result())
如何获取当前事件循环中所有运行中的任务
在asyncio
中,可以通过以下方式获取当前事件循环中所有运行中的任务:
- 使用
asyncio.all_tasks()
函数,它会返回当前事件循环中所有尚未完成的任务集合。示例代码如下:
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
print("Task 2 completed")
async def main():
task_list = [asyncio.create_task(task1()), asyncio.create_task(task2())]
await asyncio.sleep(0.5)
tasks = asyncio.all_tasks()
print(f"Number of running tasks: {len(tasks)}")
for task in tasks:
print(task)
asyncio.run(main())
在上述代码中,首先创建了两个异步任务task1
和task2
,然后在main
函数中等待一段时间后,通过asyncio.all_tasks()
获取所有运行中的任务,并打印任务数量和每个任务的信息。
- 在 Python 3.10 及以上版本中,还可以使用
asyncio.current_task()
结合task.get_name()
等方法来获取更详细的任务信息,对任务进行更细致的管理和监控。
使用 gather () 时如何设置 return_exceptions=True 避免单个任务异常导致整体失败
在asyncio
中,asyncio.gather()
用于并发地运行多个协程,并收集它们的结果。当设置return_exceptions=True
时,它会将任务中的异常作为结果返回,而不是直接抛出,从而避免单个任务异常导致整体失败。示例代码如下:
import asyncio
async def coro1():
return 1
async def coro2():
raise ValueError("Error in coro2")
async def coro3():
return 3
async def main():
results = await asyncio.gather(
coro1(),
coro2(),
coro3(),
return_exceptions=True
)
print(results)
asyncio.run(main())
在上述代码中,coro2
会抛出一个ValueError
异常。由于设置了return_exceptions=True
,asyncio.gather()
不会因为coro2
的异常而中断,而是将异常作为结果列表中的一个元素返回,最终打印出的结果列表中,对应coro2
的位置会是ValueError
异常对象,而其他正常执行的任务的结果会正常返回。
asyncio.wait () 的 return_when 参数(如 FIRST_COMPLETED)在任务调度中的应用
asyncio.wait()
是asyncio
库中的一个函数,用于等待一组Future
或Task
对象完成。它的return_when
参数决定了函数在什么情况下返回,有以下几个取值及应用场景:
asyncio.FIRST_COMPLETED
:当传入的任务列表中第一个任务完成时,asyncio.wait()
就会返回。这在需要尽快处理最先完成的任务结果的场景中很有用。比如在多个异步任务同时请求不同的数据源获取数据,希望一旦有一个数据源返回数据就立即进行处理,而不必等待其他任务都完成,可以使用FIRST_COMPLETED
。示例代码如下:
import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 result"
async def task2():
await asyncio.sleep(1)
return "Task 2 result"
async def main():
tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
print(task.result())
asyncio.run(main())
在上述代码中,task2
会先完成,当task2
完成后,asyncio.wait()
就会返回,然后打印出task2
的结果。
asyncio.FIRST_EXCEPTION
:当传入的任务列表中第一个任务抛出异常时,asyncio.wait()
就会返回。这适用于希望尽快发现任务中的异常情况并进行处理的场景。asyncio.ALL_COMPLETED
:等待所有任务都完成后,asyncio.wait()
才会返回。这是最常用的情况,适用于需要所有任务的结果都处理完后再进行下一步操作的场景。
对比 asyncio.gather ()、asyncio.wait () 和 asyncio.as_completed () 的异同
asyncio.gather()
、asyncio.wait()
和asyncio.as_completed()
都是asyncio
中用于处理多个异步任务的函数,它们的异同如下:
- 相同点:都用于处理多个异步任务,能够实现并发执行多个协程,并在一定程度上对任务的结果进行收集和处理。
- 不同点
asyncio.gather()
:它接受多个协程或任务作为参数,按照传入的顺序返回结果。它更注重任务结果的顺序性,方便一次性获取所有任务的结果并进行统一处理。如果不设置return_exceptions=True
,只要有一个任务抛出异常,整个gather
操作就会中断并抛出异常。asyncio.wait()
:接受一个任务列表作为参数,通过return_when
参数可以灵活地控制返回时机,如FIRST_COMPLETED
、FIRST_EXCEPTION
、ALL_COMPLETED
等。返回值是两个集合,分别表示已完成的任务和未完成的任务,对任务的状态管理更细致。asyncio.as_completed()
:接受一个可迭代的任务列表作为参数,它会在任务完成时逐个返回结果,返回的顺序是任务完成的顺序,而不是任务添加的顺序。适用于需要在任务完成后立即处理结果,而不关心任务执行顺序的场景。示例代码如下:
import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 result"
async def task2():
await asyncio.sleep(1)
return "Task 2 result"
async def main():
tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]
# 使用asyncio.gather()
results_gather = await asyncio.gather(*tasks)
print("Results from gather:", results_gather)
# 使用asyncio.wait()
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
results_wait = [task.result() for task in done]
print("Results from wait:", results_wait)
# 使用asyncio.as_completed()
results_as_completed = []
for task in asyncio.as_completed(tasks):
result = await task
results_as_completed.append(result)
print("Results from as_completed:", results_as_completed)
asyncio.run(main())
如何通过 loop.run_in_executor () 将同步代码异步化?
asyncio
库中的 loop.run_in_executor()
方法可以将同步代码放在线程池或进程池中执行,从而实现异步化。这是因为 asyncio
的事件循环是单线程的,当遇到阻塞的同步代码时会影响整个事件循环的运行,而 run_in_executor()
方法可以利用其他线程或进程来执行这些同步代码,避免阻塞事件循环。
loop.run_in_executor()
方法接受两个主要参数:一个是执行器对象(Executor
),可以是 ThreadPoolExecutor
或 ProcessPoolExecutor
;另一个是要执行的同步函数以及函数的参数。
下面是一个使用 loop.run_in_executor()
将同步代码异步化的示例,其中使用 time.sleep()
模拟一个阻塞的同步操作:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# 模拟一个阻塞的同步函数
def blocking_function():
print("开始执行阻塞函数")
time.sleep(2)
print("阻塞函数执行结束")
return "阻塞函数的结果"
async def main():
loop = asyncio.get_running_loop()
# 创建一个线程池执行器
with ThreadPoolExecutor() as executor:
# 使用 run_in_executor 将同步函数提交到线程池执行
result = await loop.run_in_executor(
executor, blocking_function
)
print(f"获取到的结果: {result}")
asyncio.run(main())
在上述代码中:
- 定义了一个阻塞的同步函数
blocking_function
,其中使用time.sleep(2)
模拟阻塞操作。 - 在
main
协程函数中,首先获取当前的事件循环loop
。 - 创建一个
ThreadPoolExecutor
线程池执行器。 - 使用
loop.run_in_executor()
方法将blocking_function
提交到线程池执行,并使用await
等待其执行结果。由于是在另一个线程中执行,所以不会阻塞事件循环,其他协程可以继续执行。
如果需要使用进程池来执行同步代码,只需将 ThreadPoolExecutor
替换为 ProcessPoolExecutor
即可,但要注意进程间的数据共享和通信问题。
解释 Task 对象生命周期(Pending/Running/Done)及状态监控方法
在 asyncio
中,Task
对象代表一个异步任务,它有三个主要的生命周期状态:Pending
、Running
和 Done
。
- Pending(等待状态):当使用
asyncio.create_task()
或loop.create_task()
创建一个任务后,任务就处于Pending
状态。此时任务已经被创建,但尚未开始执行。例如:
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "任务完成"
task = asyncio.create_task(my_coroutine())
print(task.status) # 输出: Pending
-
Running(运行状态):当事件循环调度到该任务并开始执行协程函数时,任务进入
Running
状态。在任务执行过程中,它处于Running
状态。但asyncio
并没有直接提供一个属性来明确表示任务处于Running
状态,不过可以通过任务的执行过程来判断。 -
Done(完成状态):当协程函数执行完毕(正常返回或抛出异常),任务就进入
Done
状态。可以通过task.done()
方法来检查任务是否已完成。例如:
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "任务完成"
async def main():
task = asyncio.create_task(my_coroutine())
await asyncio.sleep(2)
print(task.done()) # 输出: True
asyncio.run(main())
状态监控方法:
task.done()
:用于判断任务是否已完成,返回True
或False
。task.result()
:在任务完成后调用,用于获取任务的返回结果。如果任务是因为异常而结束,task.result()
会重新抛出该异常。task.exception()
:在任务完成后调用,用于获取任务执行过程中抛出的异常。如果任务正常完成,task.exception()
返回None
。
通过这些方法,可以有效地监控 Task
对象的生命周期状态,并对任务的执行结果或异常进行处理。
为什么推荐使用 asyncio.run () 而非手动管理事件循环?
asyncio.run()
是 Python 3.7 及以上版本引入的一个高级函数,用于运行顶级的协程函数。推荐使用 asyncio.run()
而非手动管理事件循环,主要有以下几个原因:
- 简化代码:
asyncio.run()
自动创建和管理事件循环,只需要传入一个顶级协程函数即可。相比之下,手动管理事件循环需要编写更多的代码来创建事件循环对象、运行协程以及关闭事件循环。例如:
# 使用 asyncio.run()
import asyncio
async def main():
print("异步任务开始")
await asyncio.sleep(1)
print("异步任务结束")
asyncio.run(main())
# 手动管理事件循环
import asyncio
async def main():
print("异步任务开始")
await asyncio.sleep(1)
print("异步任务结束")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
可以看到,使用 asyncio.run()
的代码更加简洁明了。
-
避免循环复用问题:
asyncio.run()
会确保每次运行时创建一个新的事件循环,并且在运行结束后关闭该循环,避免了事件循环的复用问题。手动管理事件循环时,如果不小心复用了已经关闭的事件循环,会导致错误。 -
异常处理:
asyncio.run()
会自动处理顶级协程函数中的异常,当协程中出现未处理的异常时,会打印错误信息并关闭事件循环。而手动管理事件循环时,需要手动处理异常,否则事件循环可能会进入错误状态。例如:
import asyncio
async def main():
raise ValueError("发生异常")
try:
asyncio.run(main())
except ValueError as e:
print(f"捕获到异常: {e}")
asyncio.run()
会自动捕获并处理异常,使得代码更加健壮。
- 兼容性和一致性:
asyncio.run()
提供了一种统一的方式来运行异步代码,使得代码在不同的 Python 版本和环境中具有更好的兼容性和一致性。
综上所述,asyncio.run()
提供了更简单、更安全、更可靠的方式来运行异步代码,因此推荐使用它而非手动管理事件循环。
实现任务超时控制的两种方式(wait_for () 与 asyncio.Timeout 上下文)
在 asyncio
中,实现任务超时控制有两种常见的方式:asyncio.wait_for()
函数和 asyncio.Timeout
上下文管理器。
方式一:使用 asyncio.wait_for ()
asyncio.wait_for()
函数接受两个参数:一个是要等待的协程或任务,另一个是超时时间(以秒为单位)。如果在指定的时间内任务没有完成,asyncio.wait_for()
会抛出 asyncio.TimeoutError
异常。
示例代码如下:
import asyncio
async def my_coroutine():
await asyncio.sleep(2)
return "任务完成"
async def main():
try:
result = await asyncio.wait_for(my_coroutine(), timeout=1)
print(f"获取到的结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
在上述代码中,my_coroutine
协程函数会睡眠 2 秒,但 asyncio.wait_for()
设置的超时时间为 1 秒,所以会抛出 TimeoutError
异常。
方式二:使用 asyncio.Timeout 上下文管理器
asyncio.Timeout
上下文管理器可以在一个代码块中设置超时时间。如果在超时时间内代码块中的操作没有完成,会抛出 asyncio.TimeoutError
异常。
示例代码如下:
import asyncio
async def my_coroutine():
await asyncio.sleep(2)
return "任务完成"
async def main():
async with asyncio.timeout(1):
try:
result = await my_coroutine()
print(f"获取到的结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())
在上述代码中,同样 my_coroutine
协程函数会睡眠 2 秒,而 asyncio.timeout(1)
设置的超时时间为 1 秒,所以会抛出 TimeoutError
异常。
两种方式的主要区别在于使用场景和代码结构。asyncio.wait_for()
更适合直接对单个任务设置超时,而 asyncio.Timeout
上下文管理器更适合在一个代码块中对多个操作进行统一的超时控制,并且可以更好地组织代码结构。
如何为任务添加回调函数并在完成后获取结果?
在 asyncio
中,可以为 Task
对象添加回调函数,以便在任务完成后执行一些额外的操作,并获取任务的结果。
为任务添加回调函数的方法是使用 task.add_done_callback()
方法,该方法接受一个回调函数作为参数,回调函数会在任务完成时被调用。
示例代码如下:
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "任务完成"
def callback(task):
try:
result = task.result()
print(f"任务结果: {result}")
except Exception as e:
print(f"任务执行出错: {e}")
async def main():
task = asyncio.create_task(my_coroutine())
task.add_done_callback(callback)
await task
asyncio.run(main())
在上述代码中:
- 定义了一个协程函数
my_coroutine
,它会睡眠 1 秒后返回一个字符串。 - 定义了一个回调函数
callback
,该函数接受一个Task
对象作为参数。在回调函数中,使用task.result()
获取任务的结果,并进行相应的处理。如果任务执行过程中抛出异常,task.result()
会重新抛出该异常,回调函数可以捕获并处理这个异常。 - 在
main
协程函数中,创建一个任务task
,并使用task.add_done_callback(callback)
为任务添加回调函数。然后使用await task
等待任务完成。
当任务完成时,callback
函数会被自动调用,从而可以在任务完成后获取结果并进行进一步的处理。这样可以实现任务完成后的一些自定义逻辑,如记录日志、更新状态等。
使用 loop.call_soon () 与 loop.call_later () 实现延时调度的场景差异
loop.call_soon()
和loop.call_later()
都是asyncio
事件循环loop
提供的用于延时调度的方法,但它们在使用场景上存在一些差异。
loop.call_soon()
方法会尽快将指定的回调函数添加到事件循环的任务队列中,使其在当前事件循环迭代中尽可能早地执行。也就是说,它的延时几乎为零,只是将任务安排到事件循环的下一次执行机会。例如:
import asyncio
def callback():
print("回调函数被调用")
loop = asyncio.get_event_loop()
loop.call_soon(callback)
loop.run_until_complete(asyncio.sleep(0))
loop.close()
在上述代码中,loop.call_soon(callback)
将callback
函数尽快添加到事件循环的任务队列中,asyncio.sleep(0)
只是让事件循环有机会执行队列中的任务,最终callback
函数会被调用并输出相应信息。loop.call_soon()
适用于那些需要在当前事件循环周期内立即执行的回调任务,比如在某个操作完成后,需要立即触发一些后续处理逻辑,且不希望有额外的延迟。
loop.call_later()
方法则会在指定的延迟时间(以秒为单位)后,将回调函数添加到事件循环的任务队列中执行。它可以实现真正意义上的延时调度。示例如下:
import asyncio
def callback():
print("回调函数被调用")
loop = asyncio.get_event_loop()
loop.call_later(2, callback)
loop.run_until_complete(asyncio.sleep(3))
loop.close()
在这个例子中,loop.call_later(2, callback)
表示在 2 秒后将callback
函数添加到事件循环的任务队列中。asyncio.sleep(3)
让事件循环持续运行 3 秒,确保callback
函数有机会被执行。loop.call_later()
适用于需要在一段时间延迟后执行某个任务的场景,比如定时任务、在某个操作完成后等待一段时间再执行后续操作等。
总体而言,loop.call_soon()
用于几乎无延迟的立即执行任务调度,而loop.call_later()
用于有明确延迟时间的任务调度,开发者可以根据具体的业务需求选择合适的方法。
事件循环的 run_forever () 与 run_until_complete () 适用场景对比
asyncio
事件循环的run_forever()
和run_until_complete()
方法在功能和适用场景上有明显的区别。
run_forever()
方法会让事件循环无限期地运行下去,不断地处理事件队列中的任务,直到手动调用loop.stop()
来停止事件循环。它适用于需要持续运行并不断处理各种异步事件的场景,比如服务器应用。以一个简单的异步 TCP 服务器为例:
import asyncio
async def handle_connection(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"收到来自 {addr} 的消息: {message}")
writer.write(b"你好,已收到你的消息")
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(
handle_connection, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f"服务器已启动,监听 {addr}")
async with server:
await server.serve_forever()
loop = asyncio.get_event_loop()
loop.run_forever()
loop.close()
在上述代码中,server.serve_forever()
配合loop.run_forever()
使服务器持续运行,不断接受和处理客户端的连接请求。
run_until_complete()
方法接受一个Future
或Task
对象作为参数,会运行事件循环直到传入的Future
或Task
完成。一旦任务完成,事件循环就会停止。它适用于执行单个或一组明确的异步任务,当这些任务执行完毕后,程序继续后续的逻辑。例如:
import asyncio
async def task():
await asyncio.sleep(2)
print("任务完成")
loop = asyncio.get_event_loop()
loop.run_until_complete(task())
loop.close()
这里loop.run_until_complete(task())
会运行事件循环,直到task
协程完成,然后关闭事件循环。
综上所述,run_forever()
适用于需要持续运行的服务或应用,而run_until_complete()
适用于执行特定的、可预期完成的异步任务的场景。
为什么在协程中操作全局变量可能导致竞态条件?给出解决方案
在协程中操作全局变量可能导致竞态条件,这是因为asyncio
中的协程是在单线程的事件循环中执行的,虽然不存在传统意义上多线程并发执行的情况,但协程之间可以通过await
进行切换。当多个协程同时访问和修改全局变量时,就可能出现竞态条件。
例如,假设有两个协程都要对一个全局计数器进行加 1 操作:
import asyncio
count = 0
async def increment():
global count
value = count
await asyncio.sleep(0)
count = value + 1
async def main():
tasks = [increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(count)
asyncio.run(main())
在上述代码中,increment
协程先获取全局变量count
的值,然后通过await asyncio.sleep(0)
让事件循环有机会切换到其他协程。如果在切换期间其他协程也获取了相同的count
值并进行修改,那么最终的结果可能并不是预期的 100(因为存在重复读取旧值并覆盖的情况)。
为了解决这个问题,可以使用锁(Lock
)来保证在同一时间只有一个协程可以访问和修改全局变量。asyncio.Lock
是一个异步锁,示例代码如下:
import asyncio
count = 0
lock = asyncio.Lock()
async def increment():
global count
async with lock:
value = count
await asyncio.sleep(0)
count = value + 1
async def main():
tasks = [increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(count)
asyncio.run(main())
在这个改进的代码中,async with lock
确保了在进入关键代码段(对全局变量count
的操作)时,其他协程无法同时进入,从而避免了竞态条件,保证了count
的正确递增。
实现基于 asyncio.Lock 的线程安全计数器(需处理协程挂起时的上下文切换)
要实现基于asyncio.Lock
的线程安全计数器,并处理协程挂起时的上下文切换,可以按照以下步骤进行:
import asyncio
class ThreadSafeCounter:
def __init__(self):
self.count = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
self.count += 1
await asyncio.sleep(0) # 模拟可能的协程挂起
async def decrement(self):
async with self.lock:
self.count -= 1
await asyncio.sleep(0) # 模拟可能的协程挂起
async def get_count(self):
async with self.lock:
return self.count
async def main():
counter = ThreadSafeCounter()
tasks = [
counter.increment() for _ in range(50)
] + [
counter.decrement() for _ in range(30)
]
await asyncio.gather(*tasks)
result = await counter.get_count()
print(f"计数器的最终值: {result}")
asyncio.run(main())
在上述代码中:
- 定义了一个
ThreadSafeCounter
类,其中包含一个全局变量count
用于存储计数器的值,以及一个asyncio.Lock
对象lock
用于实现线程安全。 increment
方法用于增加计数器的值,在进入方法时获取锁,确保在修改count
时不会有其他协程同时访问。await asyncio.sleep(0)
模拟了协程可能的挂起操作,但由于持有锁,不会影响计数器的正确性。decrement
方法用于减少计数器的值,原理与increment
方法相同。get_count
方法用于获取计数器的当前值,同样需要获取锁以保证数据的一致性。- 在
main
协程中,创建了多个任务,包括增加和减少计数器的操作,最后获取并打印计数器的最终值。
通过使用asyncio.Lock
,即使在协程挂起和上下文切换的情况下,也能保证计数器的操作是线程安全的。
解释 asyncio.Semaphore 在限流场景中的应用(如 API 并发请求限制)
asyncio.Semaphore
是asyncio
库中用于控制并发数量的工具,在限流场景中有着广泛的应用,比如限制对 API 的并发请求数量。
Semaphore
(信号量)有一个内部计数器,初始值在创建时指定。当一个协程想要执行受限制的操作时,它需要先获取信号量(通过await semaphore.acquire()
),如果信号量的计数器大于 0,则获取成功,计数器减 1;如果计数器为 0,则协程会被阻塞,直到有其他协程释放信号量(通过semaphore.release()
),计数器加 1 后才能继续执行。
以限制对 API 的并发请求数量为例,假设我们要对某个 API 进行请求,并且限制同时最多只能有 3 个请求并发进行:
import asyncio
async def api_request(semaphore, url):
async with semaphore:
print(f"开始请求 {url}")
await asyncio.sleep(2) # 模拟API请求的耗时操作
print(f"完成请求 {url}")
async def main():
urls = [
f"https://api.example.com/{i}" for i in range(5)
]
semaphore = asyncio.Semaphore(3)
tasks = [api_request(semaphore, url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
在上述代码中:
api_request
协程函数表示对 API 的请求操作,在进入函数时,通过async with semaphore
获取信号量。如果信号量的计数器大于 0,则可以继续执行请求操作,这里使用await asyncio.sleep(2)
模拟请求的耗时操作;如果计数器为 0,则协程会被阻塞,直到有其他协程完成请求并释放信号量。- 在
main
协程中,定义了要请求的 URL 列表urls
,创建了一个初始值为 3 的信号量semaphore
,表示最多允许 3 个并发请求。然后创建了多个任务,每个任务都调用api_request
函数进行 API 请求。 - 通过
asyncio.gather(*tasks)
等待所有任务完成,由于信号量的限制,同时最多只有 3 个请求会并发执行,从而实现了对 API 请求的限流。
通过使用asyncio.Semaphore
,可以有效地控制并发操作的数量,避免因过多的并发请求导致目标系统(如 API 服务器)负载过高或出现其他问题。
如何通过 asyncio.Event 实现多任务同步启动?
asyncio.Event
是 asyncio
库中用于在协程之间进行同步的工具,它可以实现多任务的同步启动。asyncio.Event
对象有一个内部标志,初始状态为 False
,可以通过 set()
方法将其设置为 True
,通过 clear()
方法将其重置为 False
,协程可以使用 wait()
方法等待该标志变为 True
。
以下是一个使用 asyncio.Event
实现多任务同步启动的示例:
import asyncio
async def task(event, task_id):
print(f"任务 {task_id} 准备就绪,等待启动信号")
await event.wait()
print(f"任务 {task_id} 启动")
await asyncio.sleep(1)
print(f"任务 {task_id} 完成")
async def main():
event = asyncio.Event()
tasks = [task(event, i) for i in range(3)]
# 模拟一些准备工作
await asyncio.sleep(2)
print("所有准备工作完成,发送启动信号")
event.set()
await asyncio.gather(*tasks)
asyncio.run(main())
在上述代码中,首先定义了一个 task
协程函数,它接受一个 asyncio.Event
对象和一个任务 ID 作为参数。在 task
函数中,协程会先打印准备就绪的信息,然后调用 event.wait()
方法等待事件的标志变为 True
。当事件标志变为 True
时,协程会继续执行后续的任务。
在 main
协程函数中,创建了一个 asyncio.Event
对象,并创建了多个 task
协程任务。然后模拟了一些准备工作,等待 2 秒后,调用 event.set()
方法将事件的标志设置为 True
,此时所有等待该事件的协程都会继续执行。最后使用 asyncio.gather()
方法等待所有任务完成。
通过这种方式,asyncio.Event
可以确保多个任务在收到启动信号后同时开始执行,实现了多任务的同步启动。
使用 asyncio.Condition 实现生产者 - 消费者模型的异步版本
asyncio.Condition
是 asyncio
库中用于在协程之间进行同步的高级工具,它可以结合 asyncio.Lock
和 asyncio.Event
的功能,用于实现生产者 - 消费者模型的异步版本。
以下是一个使用 asyncio.Condition
实现生产者 - 消费者模型的示例:
import asyncio
async def producer(condition, queue):
for i in range(5):
async with condition:
while len(queue) >= 3:
print("队列已满,生产者等待")
await condition.wait()
queue.append(i)
print(f"生产者生产了 {i}")
condition.notify()
await asyncio.sleep(1)
async def consumer(condition, queue):
while True:
async with condition:
while not queue:
print("队列为空,消费者等待")
await condition.wait()
item = queue.pop(0)
print(f"消费者消费了 {item}")
condition.notify()
await asyncio.sleep(0.5)
async def main():
queue = []
condition = asyncio.Condition()
producer_task = asyncio.create_task(producer(condition, queue))
consumer_task = asyncio.create_task(consumer(condition, queue))
await asyncio.gather(producer_task, consumer_task)
asyncio.run(main())
在上述代码中,首先定义了 producer
协程函数和 consumer
协程函数。producer
函数负责向队列中添加元素,当队列已满时,它会调用 condition.wait()
方法等待,直到有消费者消费了元素并通知它。consumer
函数负责从队列中取出元素,当队列为空时,它会调用 condition.wait()
方法等待,直到有生产者生产了元素并通知它。
在 main
协程函数中,创建了一个空队列和一个 asyncio.Condition
对象,然后分别创建了生产者任务和消费者任务,并使用 asyncio.gather()
方法等待它们执行。
通过使用 asyncio.Condition
,可以在协程之间实现高效的同步,确保生产者和消费者之间的协作,避免了数据竞争和不一致的问题。
死锁场景复现:两个协程互相等待对方释放锁
死锁是指两个或多个协程相互等待对方释放资源,从而导致所有协程都无法继续执行的情况。在 asyncio
中,可以通过两个协程互相等待对方释放锁来复现死锁场景。
以下是一个死锁场景的示例:
import asyncio
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def coroutine1():
print("协程 1 尝试获取锁 1")
await lock1.acquire()
print("协程 1 已获取锁 1,尝试获取锁 2")
await asyncio.sleep(1)
await lock2.acquire()
print("协程 1 已获取锁 2")
lock2.release()
lock1.release()
async def coroutine2():
print("协程 2 尝试获取锁 2")
await lock2.acquire()
print("协程 2 已获取锁 2,尝试获取锁 1")
await asyncio.sleep(1)
await lock1.acquire()
print("协程 2 已获取锁 1")
lock1.release()
lock2.release()
async def main():
task1 = asyncio.create_task(coroutine1())
task2 = asyncio.create_task(coroutine2())
await asyncio.gather(task1, task2)
asyncio.run(main())
在上述代码中,定义了两个锁 lock1
和 lock2
,以及两个协程 coroutine1
和 coroutine2
。coroutine1
先获取 lock1
,然后尝试获取 lock2
;coroutine2
先获取 lock2
,然后尝试获取 lock1
。由于两个协程都在等待对方释放锁,因此会陷入死锁状态。
当运行这个程序时,会发现程序会一直阻塞,无法继续执行,这就是死锁的表现。
如何避免因未释放锁导致的协程永久阻塞?
在 asyncio
中,未释放锁可能会导致协程永久阻塞,为了避免这种情况,可以采取以下几种方法:
使用 async with
语句:async with
语句会自动管理锁的获取和释放,确保在代码块执行完毕后,锁会被正确释放。
import asyncio
lock = asyncio.Lock()
async def task():
async with lock:
print("获取到锁,执行任务")
await asyncio.sleep(1)
print("任务完成,释放锁")
async def main():
task1 = asyncio.create_task(task())
task2 = asyncio.create_task(task())
await asyncio.gather(task1, task2)
asyncio.run(main())
在上述代码中,使用 async with lock
语句获取锁,当代码块执行完毕后,锁会自动释放,即使代码块中出现异常,锁也会被正确释放。
使用 try...finally
语句:如果不使用 async with
语句,也可以使用 try...finally
语句来确保锁的释放。
import asyncio
lock = asyncio.Lock()
async def task():
await lock.acquire()
try:
print("获取到锁,执行任务")
await asyncio.sleep(1)
print("任务完成,释放锁")
finally:
lock.release()
async def main():
task1 = asyncio.create_task(task())
task2 = asyncio.create_task(task())
await asyncio.gather(task1, task2)
asyncio.run(main())
在上述代码中,在 try
块中获取锁并执行任务,无论任务是否成功完成,都会在 finally
块中释放锁。
设置超时时间:可以使用 asyncio.wait_for()
函数为获取锁的操作设置超时时间,避免协程长时间等待锁。
import asyncio
lock = asyncio.Lock()
async def task():
try:
await asyncio.wait_for(lock.acquire(), timeout=2)
print("获取到锁,执行任务")
await asyncio.sleep(1)
print("任务完成,释放锁")
lock.release()
except asyncio.TimeoutError:
print("获取锁超时")
async def main():
task1 = asyncio.create_task(task())
task2 = asyncio.create_task(task())
await asyncio.gather(task1, task2)
asyncio.run(main())
在上述代码中,使用 asyncio.wait_for()
函数为 lock.acquire()
操作设置了 2 秒的超时时间,如果在 2 秒内无法获取到锁,会抛出 asyncio.TimeoutError
异常,避免协程永久阻塞。
对比 asyncio.Queue 与标准库 queue.Queue 的线程安全性差异
asyncio.Queue
和标准库 queue.Queue
都用于实现队列数据结构,但它们在线程安全性方面存在差异。
queue.Queue
:queue.Queue
是标准库中用于多线程编程的队列实现,它是线程安全的。这意味着在多线程环境中,多个线程可以同时对队列进行入队和出队操作,而不会出现数据竞争和不一致的问题。queue.Queue
使用锁机制来保证线程安全,当一个线程对队列进行操作时,会获取锁,其他线程需要等待锁释放后才能进行操作。
以下是一个使用 queue.Queue
的示例:
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print(f"生产者生产了 {i}")
time.sleep(1)
def consumer():
while True:
item = q.get()
print(f"消费者消费了 {item}")
q.task_done()
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
q.join()
在上述代码中,producer
线程负责向队列中添加元素,consumer
线程负责从队列中取出元素,由于 queue.Queue
是线程安全的,多个线程可以安全地对队列进行操作。
asyncio.Queue
:asyncio.Queue
是 asyncio
库中用于异步编程的队列实现,它是协程安全的。在异步编程中,协程是在单线程的事件循环中执行的,不存在多线程并发的问题,但协程之间可以通过 await
进行切换。asyncio.Queue
使用异步锁和条件变量来保证协程安全,当一个协程对队列进行操作时,其他协程需要等待。
以下是一个使用 asyncio.Queue
的示例:
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"生产者生产了 {i}")
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
print(f"消费者消费了 {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await producer_task
await queue.join()
consumer_task.cancel()
asyncio.run(main())
在上述代码中,producer
协程负责向队列中添加元素,consumer
协程负责从队列中取出元素,由于 asyncio.Queue
是协程安全的,多个协程可以安全地对队列进行操作。
综上所述,queue.Queue
适用于多线程编程,保证线程安全;asyncio.Queue
适用于异步编程,保证协程安全。
实现优先级任务队列控制高优先级任务插队执行
在 asyncio
中,若要实现优先级任务队列,让高优先级任务插队执行,可借助 heapq
模块。heapq
是 Python 标准库中的堆队列算法实现,它能维护一个最小堆,方便我们根据任务的优先级来管理任务。
首先,需要定义一个任务类,包含任务的优先级和具体的协程。接着,使用 asyncio.Queue
结合 heapq
来实现优先级队列。以下是示例代码:
import asyncio
import heapq
class PriorityTask:
def __init__(self, priority, coro):
self.priority = priority
self.coro = coro
def __lt__(self, other):
return self.priority < other.priority
class PriorityQueue:
def __init__(self):
self.queue = []
self.lock = asyncio.Lock()
async def put(self, task):
async with self.lock:
heapq.heappush(self.queue, task)
async def get(self):
async with self.lock:
return heapq.heappop(self.queue)
def empty(self):
return len(self.queue) == 0
async def task_func(task_id):
print(f"Task {task_id} started")
await asyncio.sleep(1)
print(f"Task {task_id} finished")
async def main():
priority_queue = PriorityQueue()
tasks = [
PriorityTask(3, task_func(1)),
PriorityTask(1, task_func(2)),
PriorityTask(2, task_func(3))
]
for task in tasks:
await priority_queue.put(task)
while not priority_queue.empty():
next_task = await priority_queue.get()
await next_task.coro
asyncio.run(main())
在上述代码中,PriorityTask
类用于封装任务的优先级和协程,__lt__
方法用于比较任务的优先级。PriorityQueue
类实现了优先级队列的基本操作,包括 put
方法用于添加任务,get
方法用于获取优先级最高的任务。在 main
协程中,创建了多个不同优先级的任务并添加到队列中,然后依次取出并执行任务,确保高优先级任务先执行。
使用 asyncio.Barrier 实现多阶段并行任务同步
asyncio.Barrier
是 asyncio
库中用于多任务同步的工具,它可以让多个协程在某个点上等待,直到所有协程都到达该点后再继续执行。利用 asyncio.Barrier
可以实现多阶段并行任务的同步。
以下是一个使用 asyncio.Barrier
实现多阶段并行任务同步的示例:
import asyncio
async def stage_one(barrier, task_id):
print(f"Task {task_id} started stage one")
await asyncio.sleep(1)
print(f"Task {task_id} finished stage one, waiting at barrier")
await barrier.wait()
print(f"Task {task_id} passed the barrier and starting stage two")
async def main():
num_tasks = 3
barrier = asyncio.Barrier(num_tasks)
tasks = [asyncio.create_task(stage_one(barrier, i)) for i in range(num_tasks)]
await asyncio.gather(*tasks)
asyncio.run(main())
在上述代码中,stage_one
协程表示一个任务的第一阶段,在该阶段结束后,协程会调用 barrier.wait()
方法等待其他任务。main
协程中创建了 asyncio.Barrier
对象,并指定了需要同步的任务数量。然后创建了多个任务并使用 asyncio.gather
并发执行这些任务。当所有任务都调用了 barrier.wait()
后,它们会同时继续执行后续的代码,实现了多阶段并行任务的同步。
使用 aiohttp 实现异步 HTTP 客户端并发请求 10 个 URL
aiohttp
是一个用于实现异步 HTTP 客户端和服务器的 Python 库。使用 aiohttp
可以方便地实现异步 HTTP 客户端并发请求多个 URL。
以下是一个使用 aiohttp
并发请求 10 个 URL 的示例:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
f"https://httpbin.org/get?num={i}" for i in range(10)
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result[:100])
asyncio.run(main())
在上述代码中,fetch
协程用于发送 HTTP 请求并获取响应内容。main
协程中创建了 aiohttp.ClientSession
对象,该对象用于管理 HTTP 连接。然后创建了多个 fetch
任务,每个任务对应一个 URL。最后使用 asyncio.gather
并发执行这些任务,并获取所有任务的结果。通过这种方式,可以高效地并发请求多个 URL,避免了串行请求带来的性能问题。
通过 asyncio.open_connection () 实现 TCP 客户端消息收发
asyncio.open_connection()
是 asyncio
库中用于创建 TCP 连接的函数,利用它可以实现 TCP 客户端的消息收发。
以下是一个使用 asyncio.open_connection()
实现 TCP 客户端消息收发的示例:
import asyncio
async def tcp_client():
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888
)
message = 'Hello, server!'
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()}')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_client())
在上述代码中,asyncio.open_connection()
函数用于建立与服务器的 TCP 连接,它返回一个 StreamReader
和一个 StreamWriter
对象。StreamWriter
对象用于向服务器发送消息,StreamReader
对象用于接收服务器的响应。在代码中,先向服务器发送了一条消息,然后等待服务器的响应并打印出来。最后关闭连接。
解释 StreamReader 与 StreamWriter 在异步 Socket 编程中的作用
在异步 Socket 编程中,StreamReader
和 StreamWriter
是 asyncio
库提供的两个重要对象,它们分别用于处理数据的读取和写入。
StreamReader
主要用于从套接字中异步读取数据。它提供了一系列方法,如 read()
、readline()
、readexactly()
等。read()
方法用于读取指定数量的字节,readline()
方法用于读取一行数据,readexactly()
方法用于读取指定长度的数据。StreamReader
是异步的,当调用这些方法时,如果没有数据可读,协程会暂停执行,直到有数据到达或发生错误,这样可以避免阻塞事件循环,提高程序的并发性能。
StreamWriter
主要用于向套接字中异步写入数据。它提供了 write()
方法用于写入数据,drain()
方法用于刷新缓冲区。write()
方法将数据写入缓冲区,而 drain()
方法会等待缓冲区中的数据被实际发送出去。同样,这些操作都是异步的,当调用 drain()
方法时,如果缓冲区已满或网络拥塞,协程会暂停执行,直到数据可以发送出去。
例如,在前面的 TCP 客户端示例中,StreamReader
对象用于接收服务器的响应,StreamWriter
对象用于向服务器发送消息。通过使用 StreamReader
和 StreamWriter
,可以方便地实现异步的 Socket 通信,提高程序的性能和响应能力。
如何用 asyncio 实现 UDP 服务器广播功能?
在 asyncio
中实现 UDP 服务器广播功能,需要借助 asyncio.DatagramProtocol
协议类。UDP 广播是指将消息发送到网络中的所有设备。
首先,要创建一个自定义的 DatagramProtocol
类,该类需实现 connection_made
和 datagram_received
方法。connection_made
方法在建立连接时被调用,datagram_received
方法在接收到数据报时被调用。
以下是实现 UDP 服务器广播功能的示例代码:
import asyncio
class UDPServerProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('UDP 服务器已启动')
def datagram_received(self, data, addr):
message = data.decode()
print(f'收到来自 {addr} 的消息: {message}')
# 广播消息
broadcast_address = ('<broadcast>', 9999)
self.transport.sendto(data, broadcast_address)
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: UDPServerProtocol(),
local_addr=('0.0.0.0', 9999),
allow_broadcast=True
)
try:
await asyncio.sleep(3600)
finally:
transport.close()
asyncio.run(main())
在上述代码中,UDPServerProtocol
类继承自 asyncio.DatagramProtocol
,在 datagram_received
方法中,当接收到数据报后,将其广播到指定的广播地址。main
函数中,使用 loop.create_datagram_endpoint
方法创建 UDP 端点,并设置允许广播。通过 allow_broadcast=True
开启广播功能。最后,程序会运行 3600 秒,期间可以不断接收和广播消息。
异步文件读写的最佳实践(结合线程池与 loop.run_in_executor ())
在 asyncio
中,文件读写操作是阻塞的,为了实现异步文件读写,可以结合线程池和 loop.run_in_executor()
方法。
loop.run_in_executor()
方法可以将阻塞的文件读写操作放到线程池中执行,从而避免阻塞事件循环。以下是异步文件读写的示例代码:
import asyncio
import concurrent.futures
async def read_file_async(file_path):
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
def read_file():
with open(file_path, 'r') as f:
return f.read()
return await loop.run_in_executor(executor, read_file)
async def write_file_async(file_path, content):
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
def write_file():
with open(file_path, 'w') as f:
f.write(content)
return await loop.run_in_executor(executor, write_file)
async def main():
file_path = 'test.txt'
content = 'Hello, asyncio!'
await write_file_async(file_path, content)
result = await read_file_async(file_path)
print(result)
asyncio.run(main())
在上述代码中,read_file_async
函数用于异步读取文件,write_file_async
函数用于异步写入文件。在这两个函数中,都使用了 concurrent.futures.ThreadPoolExecutor
创建线程池,并通过 loop.run_in_executor()
方法将阻塞的文件读写操作提交到线程池中执行。在 main
函数中,先调用 write_file_async
写入文件内容,再调用 read_file_async
读取文件内容并打印。
使用 aiomysql 实现数据库连接池的异步查询。
aiomysql
是一个用于异步操作 MySQL 数据库的 Python 库,借助它可以实现数据库连接池的异步查询。
首先,要安装 aiomysql
库,然后创建数据库连接池,使用连接池进行异步查询。以下是示例代码:
import asyncio
import aiomysql
async def query_data(pool):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM your_table")
result = await cur.fetchall()
for row in result:
print(row)
async def main():
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='your_user',
password='your_password',
db='your_database',
autocommit=True
)
try:
await query_data(pool)
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())
在上述代码中,query_data
函数用于执行异步查询操作。在 main
函数中,使用 aiomysql.create_pool
方法创建数据库连接池,设置数据库的连接信息。然后调用 query_data
函数进行查询,最后关闭连接池。通过使用连接池,可以复用数据库连接,提高查询效率。
WebSocket 服务器开发:处理连接保持与心跳检测。
在 WebSocket 服务器开发中,连接保持和心跳检测是非常重要的功能。连接保持可以确保客户端和服务器之间的连接不会因为长时间无数据传输而断开,心跳检测可以及时发现断开的连接并进行处理。
以下是使用 websockets
库实现 WebSocket 服务器的连接保持与心跳检测的示例代码:
import asyncio
import websockets
async def heartbeat(websocket, path):
try:
while True:
try:
# 发送心跳消息
await websocket.send('ping')
# 等待客户端响应
pong_waiter = await websocket.ping()
await asyncio.wait_for(pong_waiter, timeout=5)
except asyncio.TimeoutError:
print('心跳检测超时,关闭连接')
break
await asyncio.sleep(10)
except websockets.exceptions.ConnectionClosedOK:
print('连接正常关闭')
start_server = websockets.serve(heartbeat, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
在上述代码中,heartbeat
函数用于实现心跳检测。在函数中,服务器每隔 10 秒向客户端发送一个 ping
消息,并等待客户端的 pong
响应。如果在 5 秒内没有收到响应,则认为连接超时,关闭连接。websockets.serve
方法用于启动 WebSocket 服务器,监听本地的 8765 端口。
实现异步 DNS 解析器(结合 aiodns 库)。
aiodns
是一个用于异步 DNS 解析的 Python 库,结合 asyncio
可以实现异步 DNS 解析器。
首先,要安装 aiodns
库,然后使用它进行异步 DNS 解析。以下是示例代码:
import asyncio
import aiodns
async def resolve_dns(resolver, domain):
try:
result = await resolver.query(domain, 'A')
for record in result:
print(f'{domain} 的 IP 地址是: {record.host}')
except aiodns.error.DNSError as e:
print(f'解析 {domain} 时出错: {e}')
async def main():
resolver = aiodns.DNSResolver()
domains = ['example.com', 'google.com']
tasks = [resolve_dns(resolver, domain) for domain in domains]
await asyncio.gather(*tasks)
asyncio.run(main())
在上述代码中,resolve_dns
函数用于异步解析指定域名的 IP 地址。在 main
函数中,创建了 aiodns.DNSResolver
对象,然后创建了多个解析任务,使用 asyncio.gather
方法并发执行这些任务。通过这种方式,可以高效地实现异步 DNS 解析。
分享
以下是对 5 道 asyncio 面试题的回答:
如何统一捕获协程链中未处理的异常
在asyncio
中,可以使用try-except
块来捕获协程链中的异常。通过在最外层的协程中使用try-except
,可以捕获整个协程链中未处理的异常。
import asyncio
async def inner_coroutine():
# 这里模拟一个会抛出异常的操作
raise ValueError("这是一个测试异常")
async def outer_coroutine():
await inner_coroutine()
async def main():
try:
await outer_coroutine()
except ValueError as e:
print(f"捕获到异常: {e}")
asyncio.run(main())
还可以使用asyncio.Task
的add_done_callback
方法来捕获异常。当任务完成时,如果任务中抛出了未处理的异常,可以在回调函数中获取到异常信息。
import asyncio
async def coroutine_with_exception():
raise ValueError("这是一个测试异常")
async def main():
task = asyncio.create_task(coroutine_with_exception())
def handle_exception(task):
if task.exception() is not None:
print(f"捕获到异常: {task.exception()}")
task.add_done_callback(handle_exception)
await task
asyncio.run(main())
使用 asyncio.get_event_loop ().set_exception_handler () 定制异常日志
asyncio.get_event_loop().set_exception_handler()
方法可以用来设置一个自定义的异常处理函数,用于处理事件循环中未捕获的异常。以下是一个示例:
import asyncio
def exception_handler(loop, context):
# 这里可以根据需要进行更详细的日志记录
print(f"异常发生: {context['exception']}")
print(f"任务: {context['task']}")
async def coroutine_with_exception():
raise ValueError("这是一个测试异常")
async def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
task = asyncio.create_task(coroutine_with_exception())
await task
asyncio.run(main())
在上述代码中,定义了一个exception_handler
函数,它接受loop
和context
两个参数。context
是一个字典,包含了有关异常的信息,如异常对象、触发异常的任务等。通过设置这个异常处理函数,当事件循环中出现未捕获的异常时,就会调用这个函数来进行定制的日志记录或其他处理操作。
调试技巧:通过 task.print_stack () 追踪协程挂起点
task.print_stack()
是asyncio
中用于调试的一个方法,它可以打印出协程当前的堆栈信息,帮助开发者追踪协程的执行路径和挂起点。以下是一个简单的示例:
import asyncio
async def my_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
async def main():
task = asyncio.create_task(my_coroutine())
await asyncio.sleep(0.5)
task.print_stack()
await task
asyncio.run(main())
在代码中,创建了一个协程my_coroutine
,它会暂停 1 秒。在main
函数中,创建了一个任务并在 0.5 秒后调用task.print_stack()
。此时,协程应该还在await asyncio.sleep(1)
这一行暂停,task.print_stack()
会打印出当前协程的堆栈信息,显示出协程暂停的位置。
如何处理因任务取消引发的 CancelledError
当使用asyncio
时,任务可能会被取消,这会引发CancelledError
异常。可以在协程中使用try-except
块来捕获CancelledError
异常,以进行适当的处理。
import asyncio
async def my_coroutine():
try:
# 这里模拟一个长时间运行的操作
await asyncio.sleep(5)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消")
async def main():
task = asyncio.create_task(my_coroutine())
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(main())
在代码中,my_coroutine
协程会暂停 5 秒,在main
函数中,创建任务后等待 2 秒然后取消任务。当任务被取消时,my_coroutine
中的try-except
块会捕获CancelledError
异常,并打印出 "任务被取消"。
使用 contextvars 实现异步上下文中的请求 ID 透传
contextvars
是 Python 的一个标准库,用于在异步上下文中传递数据。以下是一个使用contextvars
实现请求 ID 透传的示例:
import asyncio
import contextvars
# 创建一个上下文变量
request_id_var = contextvars.ContextVar('request_id')
async def handle_request(request_id):
# 设置当前上下文的请求ID
request_id_var.set(request_id)
print(f"处理请求 {request_id_var.get()}")
await asyncio.sleep(1)
print(f"请求 {request_id_var.get()} 处理完成")
async def main():
# 模拟多个请求
tasks = [asyncio.create_task(handle_request(i)) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
在上述代码中,首先创建了一个contextvar
对象request_id_var
,用于存储请求 ID。在handle_request
协程中,通过request_id_var.set(request_id)
将请求 ID 设置到当前上下文中,然后可以通过request_id_var.get()
获取当前上下文的请求 ID。这样,在不同的协程中都可以获取到相同的请求 ID,实现了请求 ID 在异步上下文中的透传。
分享
以下是对三道 asyncio 面试题的回答:
如何通过 asyncio.Semaphore 限制并发连接数避免资源耗尽?
asyncio.Semaphore
是一个用于控制并发访问资源的同步原语,通过它可以方便地限制并发连接数,从而避免资源耗尽。以下是具体的原理和实现方式:
- 原理:
asyncio.Semaphore
内部维护了一个计数器,用于表示可用资源的数量。当协程想要访问受限制的资源时,需要先获取信号量。如果计数器大于 0,则获取成功,计数器减 1;如果计数器为 0,则协程会被阻塞,直到其他协程释放信号量,使计数器大于 0。 - 代码示例
import asyncio
# 模拟网络连接任务
async def connect_to_server(semaphore, url):
async with semaphore:
print(f'开始连接 {url}')
# 这里模拟连接耗时
await asyncio.sleep(2)
print(f'完成连接 {url}')
async def main():
# 创建信号量,限制并发连接数为3
semaphore = asyncio.Semaphore(3)
urls = [f'http://example.com/{i}' for i in range(5)]
tasks = [asyncio.create_task(connect_to_server(semaphore, url)) for url in urls]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
在上述代码中,创建了一个信号量semaphore
,并将其并发连接数限制为 3。然后,为每个 URL 创建一个连接任务,这些任务在获取信号量后才会执行连接操作。由于信号量的限制,同一时间最多只有 3 个任务可以进行连接操作,从而避免了资源耗尽。
使用 cProfile 分析异步程序的性能瓶颈
cProfile
是 Python 标准库中的一个性能分析工具,可以用来分析程序中各个函数的执行时间、调用次数等信息,从而帮助找到性能瓶颈。在分析异步程序时,cProfile
同样可以发挥作用,以下是使用步骤:
- 导入必要的模块:首先需要导入
cProfile
和要分析的异步函数所在的模块。 - 定义分析函数:创建一个函数,在函数内部调用要分析的异步函数,并使用
asyncio.run()
来运行异步函数。 - 使用 cProfile.run () 进行分析:在主程序中,使用
cProfile.run()
来运行分析函数,它会执行异步函数,并输出性能分析结果。 - 分析结果:
cProfile
的输出结果包含了各个函数的执行时间、调用次数等信息。重点关注cumtime
列,它表示函数及其调用的所有子函数的累计执行时间。通常,累计执行时间较长的函数可能是性能瓶颈所在。
以下是一个简单的示例:
import asyncio
import cProfile
async def async_task():
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(async_task()) for _ in range(5)]
await asyncio.gather(*tasks)
def profile_main():
asyncio.run(main())
if __name__ == '__main__':
cProfile.run('profile_main()')
在实际应用中,可以根据cProfile
的输出结果,进一步优化异步程序中耗时较长的函数或操作,例如优化数据库查询、减少网络请求次数等。
解释 asyncio 与 gevent 在实现协程时的底层差异
asyncio 和 gevent 都是 Python 中用于实现异步编程的库,它们在实现协程时存在一些底层差异:
- 事件循环
- asyncio:是 Python 标准库的一部分,其事件循环基于操作系统的 I/O 多路复用机制,如
select
、poll
、epoll
等。它提供了一个较为底层和灵活的事件驱动框架,开发者可以直接操作事件循环,注册和管理各种 I/O 事件和回调函数。 - gevent:使用了
greenlet
库来实现轻量级的协程,其事件循环是基于libev
或libuv
等 C 库,提供了高效的 I/O 多路复用和事件驱动机制。gevent 的事件循环对开发者来说相对更透明,开发者主要通过 gevent 提供的高层 API 来操作协程和事件循环。
- asyncio:是 Python 标准库的一部分,其事件循环基于操作系统的 I/O 多路复用机制,如
- 协程切换
- asyncio:协程的切换是通过
await
关键字来显式地暂停和恢复协程的执行。当协程遇到await
表达式时,它会将控制权交回给事件循环,事件循环会在适当的时候调度其他协程执行,当await
的条件满足时,再恢复该协程的执行。 - gevent:协程的切换是通过
greenlet
的switch
方法来实现的,在 gevent 中,协程函数内部通过gevent.sleep()
等方式来主动让出执行权,触发协程的切换。gevent 会自动检测 I/O 操作等可能导致阻塞的情况,并在阻塞发生时自动切换到其他协程。
- asyncio:协程的切换是通过
- 对标准库的修改
- asyncio:尽量保持与 Python 标准库的兼容性,对标准库的修改和替换较少。在使用 asyncio 时,通常需要使用
async
和await
关键字来定义和使用协程,并且需要使用 asyncio 提供的异步版本的 I/O 操作函数和工具。 - gevent:会对 Python 标准库中的一些 I/O 相关的模块进行猴子补丁(monkey patching),将标准的阻塞 I/O 操作替换为非阻塞的版本,使得在使用标准库的 I/O 函数时,能够自动实现异步操作,无需显式地修改代码中的 I/O 调用。
- asyncio:尽量保持与 Python 标准库的兼容性,对标准库的修改和替换较少。在使用 asyncio 时,通常需要使用