Python asyncIO 面试题及参考答案 草

如何正确定义一个协程函数?直接调用协程会引发什么问题?

在 Python 中,正确定义一个协程函数需要使用async def关键字。协程函数是一种特殊的函数,它在执行过程中可以暂停和恢复,从而允许程序在等待 I/O 操作时执行其他任务。以下是定义协程函数的示例:

import asyncio

async def my_coroutine():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程结束")

在上述代码中,my_coroutine是一个协程函数,使用async def关键字定义。函数内部使用了await关键字,它用于暂停协程的执行,直到等待的异步操作完成。

直接调用协程函数并不会执行协程内部的代码,而是返回一个协程对象。如果直接调用协程对象而不将其交给事件循环处理,不会有任何实际的执行效果,并且会引发RuntimeWarning警告。例如:

coro = my_coroutine()
# 这里不会执行协程内部代码,会警告

要正确执行协程,需要将协程对象交给事件循环处理。可以使用asyncio.run()函数来运行协程,它会自动创建和管理事件循环。示例如下:

asyncio.run(my_coroutine())

或者手动管理事件循环:

loop = asyncio.get_event_loop()
loop.run_until_complete(my_coroutine())
loop.close()

如果不将协程对象交给事件循环处理,协程内部的代码不会执行,因为协程的执行需要事件循环的调度。事件循环负责监控协程的状态,当协程遇到await语句暂停时,事件循环可以切换到其他协程继续执行,从而实现异步编程的效果。

使用 async def 定义的协程与普通函数执行流程有何本质区别?

使用async def定义的协程与普通函数在执行流程上有本质区别。普通函数是顺序执行的,一旦开始执行,会一直运行到函数结束,期间不会暂停。例如:

def normal_function():
    print("普通函数开始")
    for i in range(3):
        print(i)
    print("普通函数结束")

normal_function()

在上述代码中,normal_function是一个普通函数,当调用它时,会按照代码的顺序依次执行,直到函数结束。

而协程函数使用async def定义,它可以在执行过程中暂停和恢复。协程函数内部可以使用await关键字来暂停执行,等待某个异步操作完成后再继续执行。例如:

import asyncio

async def coroutine_function():
    print("协程函数开始")
    await asyncio.sleep(1)
    print("协程函数结束")

async def main():
    await coroutine_function()

asyncio.run(main())

在上述代码中,coroutine_function是一个协程函数。当调用coroutine_function时,它会先打印 “协程函数开始”,然后遇到await asyncio.sleep(1)语句,此时协程会暂停执行,将控制权交还给事件循环。事件循环可以在这段时间内执行其他协程。当asyncio.sleep(1)完成后,协程会恢复执行,打印 “协程函数结束”。

普通函数的调用是同步的,调用者必须等待函数执行完毕才能继续执行后续代码。而协程函数的调用是异步的,调用者可以在协程暂停时执行其他任务,从而提高程序的并发性能。此外,普通函数只能返回一个值,而协程函数可以通过await与其他协程进行交互,实现更复杂的异步操作。

解释 asyncio.run () 的作用及与手动管理事件循环的差异

asyncio.run()是 Python 3.7 及以上版本引入的一个高级函数,用于运行顶级的协程函数。它的主要作用是简化异步编程的代码,自动创建和管理事件循环。以下是asyncio.run()的基本用法:

import asyncio

async def main():
    print("异步任务开始")
    await asyncio.sleep(1)
    print("异步任务结束")

asyncio.run(main())

在上述代码中,asyncio.run(main())会自动创建一个事件循环,将main协程函数提交给事件循环执行,并且在协程执行完毕后关闭事件循环。

手动管理事件循环需要使用asyncio.get_event_loop()来获取事件循环对象,然后使用run_until_complete()方法来运行协程,最后手动关闭事件循环。示例如下:

import asyncio

async def main():
    print("异步任务开始")
    await asyncio.sleep(1)
    print("异步任务结束")

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

asyncio.run()与手动管理事件循环的差异主要体现在以下几个方面:

  • 代码简洁性asyncio.run()的代码更加简洁,只需要一行代码就可以完成事件循环的创建、协程的执行和事件循环的关闭。而手动管理事件循环需要更多的代码来处理事件循环的创建、运行和关闭。
  • 兼容性asyncio.run()会确保事件循环是全新的,并且在运行结束后会关闭事件循环,避免了事件循环的复用问题。手动管理事件循环时,如果不小心复用了已经关闭的事件循环,会导致错误。
  • 异常处理asyncio.run()会自动处理异常,当协程中出现未处理的异常时,会打印错误信息并关闭事件循环。手动管理事件循环时,需要手动处理异常,否则事件循环可能会进入错误状态。

为什么协程中必须使用 await 而非 yield 挂起操作?

在 Python 的协程中,使用await而不是yield来挂起操作,主要是因为awaityield在语义和功能上有本质的区别。

yield是 Python 生成器的关键字,它用于生成器函数中,将函数转换为一个生成器。生成器是一种可迭代对象,它可以在迭代过程中暂停和恢复执行。例如:

def generator_function():
    for i in range(3):
        yield i

gen = generator_function()
for num in gen:
    print(num)

在上述代码中,generator_function是一个生成器函数,使用yield关键字将函数转换为生成器。当调用generator_function()时,返回一个生成器对象,通过迭代生成器对象可以依次获取生成的值。

await是 Python 协程的关键字,它用于协程函数中,用于暂停协程的执行,等待一个可等待对象(如另一个协程、Future对象等)完成。例如:

import asyncio

async def coroutine_function():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程结束")

async def main():
    await coroutine_function()

asyncio.run(main())

在上述代码中,coroutine_function是一个协程函数,使用await asyncio.sleep(1)暂停协程的执行,等待asyncio.sleep(1)完成后再继续执行。

使用await而不是yield的主要原因如下:

  • 语义清晰await明确表示协程需要等待一个异步操作完成,而yield的语义更侧重于生成值。使用await可以使代码的意图更加清晰,提高代码的可读性。
  • 异步兼容性await专门用于异步编程,它与事件循环和异步库(如asyncio)紧密配合,可以实现高效的异步操作。而yield主要用于同步的迭代操作,不适合处理异步任务。
  • 错误处理await会自动处理异步操作的结果和异常,当异步操作出现异常时,await会将异常抛出,方便进行错误处理。而yield需要手动处理异常,增加了代码的复杂性。

写出通过 async for 实现异步迭代器的代码模板

async for是 Python 中用于异步迭代的语法,它允许在协程中异步地迭代一个异步可迭代对象。以下是通过async for实现异步迭代器的代码模板:

import asyncio

# 定义一个异步迭代器类
class AsyncIterator:
    def __init__(self, limit):
        self.index = 0
        self.limit = limit

    # 实现异步迭代器协议的 __aiter__ 方法
    def __aiter__(self):
        return self

    # 实现异步迭代器协议的 __anext__ 方法
    async def __anext__(self):
        if self.index < self.limit:
            await asyncio.sleep(0.1)  # 模拟异步操作
            result = self.index
            self.index += 1
            return result
        else:
            raise StopAsyncIteration

# 定义一个异步函数,使用 async for 进行异步迭代
async def main():
    async_iterator = AsyncIterator(5)
    async for item in async_iterator:
        print(item)

# 运行异步函数
asyncio.run(main())

在上述代码中,首先定义了一个异步迭代器类AsyncIterator,它实现了异步迭代器协议的__aiter____anext__方法。__aiter__方法返回异步迭代器对象本身,__anext__方法是一个异步方法,用于返回下一个异步迭代的值。当迭代结束时,抛出StopAsyncIteration异常。

然后定义了一个异步函数main,在函数内部创建了一个AsyncIterator对象,并使用async for语句进行异步迭代。async for会自动调用异步迭代器的__anext__方法,直到抛出StopAsyncIteration异常为止。

最后,使用asyncio.run()函数运行main协程函数,启动异步迭代过程。通过这种方式,可以在协程中异步地迭代一个异步可迭代对象,提高程序的并发性能。

如何捕获协程中未正确使用 await 导致的 RuntimeWarning?

在 Python 中,当协程中未正确使用 await 时,会触发 RuntimeWarning。这通常发生在直接调用协程函数却未使用 await 或事件循环来运行协程的情况下。为了捕获这类警告,可以使用 warnings 模块。该模块提供了灵活的机制来控制警告的显示和处理。

下面是一个示例代码,展示了如何捕获因未正确使用 await 导致的 RuntimeWarning

import warnings
import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return 42

def catch_runtime_warning():
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always", RuntimeWarning)
        # 直接调用协程函数,不使用 await
        coro = my_coroutine()
        for warning in w:
            if issubclass(warning.category, RuntimeWarning):
                print(f"捕获到 RuntimeWarning: {warning.message}")

catch_runtime_warning()

在上述代码中,warnings.catch_warnings 上下文管理器用于临时控制警告的行为。warnings.simplefilter("always", RuntimeWarning) 表示总是捕获 RuntimeWarning。通过遍历 w 列表,可以检查是否有 RuntimeWarning 被触发,并打印相应的警告信息。

另外,在更复杂的异步应用中,可以在事件循环启动时进行全局的警告捕获。例如:

import warnings
import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return 42

async def main():
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always", RuntimeWarning)
        coro = my_coroutine()
        for warning in w:
            if issubclass(warning.category, RuntimeWarning):
                print(f"捕获到 RuntimeWarning: {warning.message}")

asyncio.run(main())

这种方式可以确保在整个异步程序执行过程中,对未正确使用 await 导致的 RuntimeWarning 进行捕获和处理,有助于及时发现和修复代码中的异步编程错误。

async with 在异步上下文管理中的应用场景(如数据库连接池)

async with 是 Python 中用于异步上下文管理的语法,它在处理异步资源时非常有用,特别是在需要自动管理资源的获取和释放的场景中。数据库连接池就是一个典型的应用场景。

在数据库操作中,频繁地创建和销毁数据库连接会带来较大的开销。使用数据库连接池可以复用已经创建的连接,提高性能。async with 可以与异步数据库连接池结合使用,确保在使用完数据库连接后自动将其返回到连接池中。

以下是一个使用 async with 和异步数据库连接池(以 asyncpg 为例)的示例:

import asyncio
import asyncpg

async def main():
    # 创建数据库连接池
    pool = await asyncpg.create_pool(
        user='your_user',
        password='your_password',
        database='your_database',
        host='your_host',
        port='your_port'
    )

    async with pool.acquire() as connection:
        # 执行数据库查询
        result = await connection.fetchrow('SELECT 1')
        print(result)

    # 关闭连接池
    await pool.close()

asyncio.run(main())

在上述代码中,async with pool.acquire() as connection 用于从连接池中获取一个数据库连接。当代码块执行完毕后,async with 会自动将连接返回到连接池中。这样可以避免手动管理连接的获取和释放,减少出错的可能性。

除了数据库连接池,async with 还可以应用于其他需要异步资源管理的场景,如文件操作、网络连接等。例如,在异步文件操作中,可以使用 async with 来确保文件在使用完毕后自动关闭:

import asyncio

async def read_file():
    async with open('example.txt', 'r') as file:
        content = await file.read()
        print(content)

asyncio.run(read_file())

通过使用 async with,可以使异步代码更加简洁、安全,提高代码的可维护性。

协程中同步调用阻塞 IO(如 time.sleep)会引发什么问题?如何解决?

在协程中同步调用阻塞 IO 操作(如 time.sleep)会引发严重的性能问题,因为阻塞 IO 会暂停整个线程的执行,使得其他协程无法在这段时间内运行,从而破坏了异步编程的并发优势。

异步编程的核心是利用事件循环在等待 IO 操作时切换到其他协程继续执行,以提高程序的并发性能。而阻塞 IO 操作会使线程进入等待状态,事件循环无法进行协程的切换,导致所有协程都被阻塞,程序的执行效率大幅降低。

以下是一个示例代码,展示了在协程中使用 time.sleep 带来的问题:

import asyncio
import time

async def blocking_coroutine():
    print("阻塞协程开始")
    time.sleep(2)  # 阻塞操作
    print("阻塞协程结束")

async def non_blocking_coroutine():
    print("非阻塞协程开始")
    await asyncio.sleep(0.1)
    print("非阻塞协程结束")

async def main():
    task1 = asyncio.create_task(blocking_coroutine())
    task2 = asyncio.create_task(non_blocking_coroutine())
    await task1
    await task2

asyncio.run(main())

在上述代码中,blocking_coroutine 中使用了 time.sleep(2),这会阻塞整个线程 2 秒钟。在这 2 秒钟内,non_blocking_coroutine 无法执行,即使它只需要 0.1 秒。

为了解决这个问题,应该使用异步版本的 IO 操作。在 Python 的 asyncio 库中,提供了许多异步函数,如 asyncio.sleep 用于替代 time.sleep。对于其他阻塞 IO 操作,如文件读写、网络请求等,可以使用异步库来替代同步库。例如,使用 aiohttp 替代 requests 进行异步网络请求。

以下是修改后的代码,使用 asyncio.sleep 替代 time.sleep

import asyncio

async def non_blocking_coroutine():
    print("非阻塞协程开始")
    await asyncio.sleep(2)
    print("非阻塞协程结束")

async def another_non_blocking_coroutine():
    print("另一个非阻塞协程开始")
    await asyncio.sleep(0.1)
    print("另一个非阻塞协程结束")

async def main():
    task1 = asyncio.create_task(non_blocking_coroutine())
    task2 = asyncio.create_task(another_non_blocking_coroutine())
    await task1
    await task2

asyncio.run(main())

在修改后的代码中,所有协程都使用了异步的 asyncio.sleep,事件循环可以在等待时切换到其他协程执行,从而提高了程序的并发性能。

解释 asyncio.iscoroutinefunction () 与 inspect.iscoroutine () 的区别

asyncio.iscoroutinefunction() 和 inspect.iscoroutine() 是 Python 中用于检查对象是否为协程相关对象的两个函数,但它们的检查对象有所不同。

asyncio.iscoroutinefunction() 用于检查一个对象是否为协程函数。协程函数是使用 async def 定义的函数,它本身并不会立即执行,而是返回一个协程对象。以下是一个示例代码:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return 42

print(asyncio.iscoroutinefunction(my_coroutine))  # 输出: True

在上述代码中,my_coroutine 是一个协程函数,使用 asyncio.iscoroutinefunction() 检查它会返回 True

而 inspect.iscoroutine() 用于检查一个对象是否为协程对象。协程对象是调用协程函数返回的对象,它需要通过事件循环来执行。以下是一个示例代码:

import asyncio
import inspect

async def my_coroutine():
    await asyncio.sleep(1)
    return 42

coro = my_coroutine()
print(inspect.iscoroutine(coro))  # 输出: True

在上述代码中,my_coroutine() 返回一个协程对象 coro,使用 inspect.iscoroutine() 检查它会返回 True

总结来说,asyncio.iscoroutinefunction() 检查的是函数本身是否为协程函数,而 inspect.iscoroutine() 检查的是对象是否为协程对象。这两个函数在异步编程中非常有用,可以帮助开发者在运行时判断对象的类型,从而进行相应的处理。例如,在编写异步框架时,可以使用这两个函数来确保传入的函数或对象是合法的协程函数或协程对象。

实现一个异步生成器,每秒生成递增数字直至终止条件

在 Python 中,可以使用异步生成器来每秒生成递增数字,直到满足终止条件。异步生成器结合了异步编程和生成器的特性,允许在异步环境中逐个生成值。

以下是一个实现异步生成器的示例代码,该异步生成器每秒生成一个递增的数字,直到达到指定的终止条件:

import asyncio

async def async_number_generator(limit):
    num = 0
    while num < limit:
        yield num
        num += 1
        await asyncio.sleep(1)

async def main():
    limit = 5
    async for number in async_number_generator(limit):
        print(number)

asyncio.run(main())

在上述代码中,首先定义了一个异步生成器函数 async_number_generator。在函数内部,使用 while 循环生成递增的数字,每次生成一个数字后,使用 await asyncio.sleep(1) 暂停 1 秒钟。yield 关键字用于将生成的数字返回给调用者。

然后定义了一个异步函数 main,在函数内部使用 async for 语句来迭代异步生成器。async for 会自动处理异步生成器的迭代过程,每次迭代时等待异步生成器生成下一个值。

最后,使用 asyncio.run() 函数运行 main 协程函数,启动异步生成器的迭代过程。当生成的数字达到终止条件(即 num 等于 limit)时,异步生成器停止生成值,迭代结束。

通过这种方式,可以在异步环境中每秒生成一个递增的数字,直到满足终止条件,实现了异步的递增数字生成功能。

asyncio.create_task () 与 loop.create_task () 的适用场景差异

asyncio.create_task()loop.create_task()都用于创建异步任务,但它们的适用场景略有不同。

  • asyncio.create_task():是 Python 3.7 及以上版本中推荐的创建任务的方式。它会使用当前的事件循环来创建任务,代码更加简洁和直观,适用于大多数常规的异步编程场景。比如在一个简单的异步爬虫程序中,需要并发地请求多个网页,就可以使用asyncio.create_task()来创建多个任务,每个任务负责请求一个网页。示例如下:

import asyncio

async def fetch_url(url):
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main():
    urls = ["https://example.com", "https://example.org"]
    tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

  • loop.create_task():在一些需要更精细地控制事件循环的场景中更有用,比如在已经获取到特定的事件循环对象loop的情况下,或者在自定义的事件循环管理逻辑中。在编写一些复杂的异步应用框架时,可能需要在不同的地方使用不同的事件循环,这时就可以通过loop.create_task()在特定的循环中创建任务。示例如下:

import asyncio

async def task_coro():
    await asyncio.sleep(1)
    return "Task completed"

loop = asyncio.get_event_loop()
task = loop.create_task(task_coro())
loop.run_until_complete(task)
print(task.result())

如何获取当前事件循环中所有运行中的任务

asyncio中,可以通过以下方式获取当前事件循环中所有运行中的任务:

  • 使用asyncio.all_tasks()函数,它会返回当前事件循环中所有尚未完成的任务集合。示例代码如下:

import asyncio

async def task1():
    await asyncio.sleep(2)
    print("Task 1 completed")

async def task2():
    await asyncio.sleep(1)
    print("Task 2 completed")

async def main():
    task_list = [asyncio.create_task(task1()), asyncio.create_task(task2())]
    await asyncio.sleep(0.5)  
    tasks = asyncio.all_tasks()
    print(f"Number of running tasks: {len(tasks)}")
    for task in tasks:
        print(task)

asyncio.run(main())

在上述代码中,首先创建了两个异步任务task1task2,然后在main函数中等待一段时间后,通过asyncio.all_tasks()获取所有运行中的任务,并打印任务数量和每个任务的信息。

  • 在 Python 3.10 及以上版本中,还可以使用asyncio.current_task()结合task.get_name()等方法来获取更详细的任务信息,对任务进行更细致的管理和监控。

使用 gather () 时如何设置 return_exceptions=True 避免单个任务异常导致整体失败

asyncio中,asyncio.gather()用于并发地运行多个协程,并收集它们的结果。当设置return_exceptions=True时,它会将任务中的异常作为结果返回,而不是直接抛出,从而避免单个任务异常导致整体失败。示例代码如下:

import asyncio

async def coro1():
    return 1

async def coro2():
    raise ValueError("Error in coro2")

async def coro3():
    return 3

async def main():
    results = await asyncio.gather(
        coro1(),
        coro2(),
        coro3(),
        return_exceptions=True
    )
    print(results)

asyncio.run(main())

在上述代码中,coro2会抛出一个ValueError异常。由于设置了return_exceptions=Trueasyncio.gather()不会因为coro2的异常而中断,而是将异常作为结果列表中的一个元素返回,最终打印出的结果列表中,对应coro2的位置会是ValueError异常对象,而其他正常执行的任务的结果会正常返回。

asyncio.wait () 的 return_when 参数(如 FIRST_COMPLETED)在任务调度中的应用

asyncio.wait()asyncio库中的一个函数,用于等待一组FutureTask对象完成。它的return_when参数决定了函数在什么情况下返回,有以下几个取值及应用场景:

  • asyncio.FIRST_COMPLETED:当传入的任务列表中第一个任务完成时,asyncio.wait()就会返回。这在需要尽快处理最先完成的任务结果的场景中很有用。比如在多个异步任务同时请求不同的数据源获取数据,希望一旦有一个数据源返回数据就立即进行处理,而不必等待其他任务都完成,可以使用FIRST_COMPLETED。示例代码如下:

import asyncio

async def task1():
    await asyncio.sleep(2)
    return "Task 1 result"

async def task2():
    await asyncio.sleep(1)
    return "Task 2 result"

async def main():
    tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for task in done:
        print(task.result())

asyncio.run(main())

在上述代码中,task2会先完成,当task2完成后,asyncio.wait()就会返回,然后打印出task2的结果。

  • asyncio.FIRST_EXCEPTION:当传入的任务列表中第一个任务抛出异常时,asyncio.wait()就会返回。这适用于希望尽快发现任务中的异常情况并进行处理的场景。
  • asyncio.ALL_COMPLETED:等待所有任务都完成后,asyncio.wait()才会返回。这是最常用的情况,适用于需要所有任务的结果都处理完后再进行下一步操作的场景。

对比 asyncio.gather ()、asyncio.wait () 和 asyncio.as_completed () 的异同

asyncio.gather()asyncio.wait()asyncio.as_completed()都是asyncio中用于处理多个异步任务的函数,它们的异同如下:

  • 相同点:都用于处理多个异步任务,能够实现并发执行多个协程,并在一定程度上对任务的结果进行收集和处理。
  • 不同点
    • asyncio.gather():它接受多个协程或任务作为参数,按照传入的顺序返回结果。它更注重任务结果的顺序性,方便一次性获取所有任务的结果并进行统一处理。如果不设置return_exceptions=True,只要有一个任务抛出异常,整个gather操作就会中断并抛出异常。
    • asyncio.wait():接受一个任务列表作为参数,通过return_when参数可以灵活地控制返回时机,如FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETED等。返回值是两个集合,分别表示已完成的任务和未完成的任务,对任务的状态管理更细致。
    • asyncio.as_completed():接受一个可迭代的任务列表作为参数,它会在任务完成时逐个返回结果,返回的顺序是任务完成的顺序,而不是任务添加的顺序。适用于需要在任务完成后立即处理结果,而不关心任务执行顺序的场景。示例代码如下:

import asyncio

async def task1():
    await asyncio.sleep(2)
    return "Task 1 result"

async def task2():
    await asyncio.sleep(1)
    return "Task 2 result"

async def main():
    tasks = [asyncio.create_task(task1()), asyncio.create_task(task2())]

    # 使用asyncio.gather()
    results_gather = await asyncio.gather(*tasks)
    print("Results from gather:", results_gather)

    # 使用asyncio.wait()
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    results_wait = [task.result() for task in done]
    print("Results from wait:", results_wait)

    # 使用asyncio.as_completed()
    results_as_completed = []
    for task in asyncio.as_completed(tasks):
        result = await task
        results_as_completed.append(result)
    print("Results from as_completed:", results_as_completed)

asyncio.run(main())

如何通过 loop.run_in_executor () 将同步代码异步化?

asyncio 库中的 loop.run_in_executor() 方法可以将同步代码放在线程池或进程池中执行,从而实现异步化。这是因为 asyncio 的事件循环是单线程的,当遇到阻塞的同步代码时会影响整个事件循环的运行,而 run_in_executor() 方法可以利用其他线程或进程来执行这些同步代码,避免阻塞事件循环。

loop.run_in_executor() 方法接受两个主要参数:一个是执行器对象(Executor),可以是 ThreadPoolExecutor 或 ProcessPoolExecutor;另一个是要执行的同步函数以及函数的参数。

下面是一个使用 loop.run_in_executor() 将同步代码异步化的示例,其中使用 time.sleep() 模拟一个阻塞的同步操作:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# 模拟一个阻塞的同步函数
def blocking_function():
    print("开始执行阻塞函数")
    time.sleep(2)
    print("阻塞函数执行结束")
    return "阻塞函数的结果"

async def main():
    loop = asyncio.get_running_loop()
    # 创建一个线程池执行器
    with ThreadPoolExecutor() as executor:
        # 使用 run_in_executor 将同步函数提交到线程池执行
        result = await loop.run_in_executor(
            executor, blocking_function
        )
        print(f"获取到的结果: {result}")

asyncio.run(main())

在上述代码中:

  1. 定义了一个阻塞的同步函数 blocking_function,其中使用 time.sleep(2) 模拟阻塞操作。
  2. 在 main 协程函数中,首先获取当前的事件循环 loop
  3. 创建一个 ThreadPoolExecutor 线程池执行器。
  4. 使用 loop.run_in_executor() 方法将 blocking_function 提交到线程池执行,并使用 await 等待其执行结果。由于是在另一个线程中执行,所以不会阻塞事件循环,其他协程可以继续执行。

如果需要使用进程池来执行同步代码,只需将 ThreadPoolExecutor 替换为 ProcessPoolExecutor 即可,但要注意进程间的数据共享和通信问题。

解释 Task 对象生命周期(Pending/Running/Done)及状态监控方法

在 asyncio 中,Task 对象代表一个异步任务,它有三个主要的生命周期状态:PendingRunning 和 Done

  1. Pending(等待状态):当使用 asyncio.create_task() 或 loop.create_task() 创建一个任务后,任务就处于 Pending 状态。此时任务已经被创建,但尚未开始执行。例如:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return "任务完成"

task = asyncio.create_task(my_coroutine())
print(task.status)  # 输出: Pending

  1. Running(运行状态):当事件循环调度到该任务并开始执行协程函数时,任务进入 Running 状态。在任务执行过程中,它处于 Running 状态。但 asyncio 并没有直接提供一个属性来明确表示任务处于 Running 状态,不过可以通过任务的执行过程来判断。

  2. Done(完成状态):当协程函数执行完毕(正常返回或抛出异常),任务就进入 Done 状态。可以通过 task.done() 方法来检查任务是否已完成。例如:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return "任务完成"

async def main():
    task = asyncio.create_task(my_coroutine())
    await asyncio.sleep(2)
    print(task.done())  # 输出: True

asyncio.run(main())

状态监控方法

  • task.done():用于判断任务是否已完成,返回 True 或 False
  • task.result():在任务完成后调用,用于获取任务的返回结果。如果任务是因为异常而结束,task.result() 会重新抛出该异常。
  • task.exception():在任务完成后调用,用于获取任务执行过程中抛出的异常。如果任务正常完成,task.exception() 返回 None

通过这些方法,可以有效地监控 Task 对象的生命周期状态,并对任务的执行结果或异常进行处理。

为什么推荐使用 asyncio.run () 而非手动管理事件循环?

asyncio.run() 是 Python 3.7 及以上版本引入的一个高级函数,用于运行顶级的协程函数。推荐使用 asyncio.run() 而非手动管理事件循环,主要有以下几个原因:

  1. 简化代码asyncio.run() 自动创建和管理事件循环,只需要传入一个顶级协程函数即可。相比之下,手动管理事件循环需要编写更多的代码来创建事件循环对象、运行协程以及关闭事件循环。例如:

# 使用 asyncio.run()
import asyncio

async def main():
    print("异步任务开始")
    await asyncio.sleep(1)
    print("异步任务结束")

asyncio.run(main())

# 手动管理事件循环
import asyncio

async def main():
    print("异步任务开始")
    await asyncio.sleep(1)
    print("异步任务结束")

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

可以看到,使用 asyncio.run() 的代码更加简洁明了。

  1. 避免循环复用问题asyncio.run() 会确保每次运行时创建一个新的事件循环,并且在运行结束后关闭该循环,避免了事件循环的复用问题。手动管理事件循环时,如果不小心复用了已经关闭的事件循环,会导致错误。

  2. 异常处理asyncio.run() 会自动处理顶级协程函数中的异常,当协程中出现未处理的异常时,会打印错误信息并关闭事件循环。而手动管理事件循环时,需要手动处理异常,否则事件循环可能会进入错误状态。例如:

import asyncio

async def main():
    raise ValueError("发生异常")

try:
    asyncio.run(main())
except ValueError as e:
    print(f"捕获到异常: {e}")

asyncio.run() 会自动捕获并处理异常,使得代码更加健壮。

  1. 兼容性和一致性asyncio.run() 提供了一种统一的方式来运行异步代码,使得代码在不同的 Python 版本和环境中具有更好的兼容性和一致性。

综上所述,asyncio.run() 提供了更简单、更安全、更可靠的方式来运行异步代码,因此推荐使用它而非手动管理事件循环。

实现任务超时控制的两种方式(wait_for () 与 asyncio.Timeout 上下文)

在 asyncio 中,实现任务超时控制有两种常见的方式:asyncio.wait_for() 函数和 asyncio.Timeout 上下文管理器。

方式一:使用 asyncio.wait_for ()
asyncio.wait_for() 函数接受两个参数:一个是要等待的协程或任务,另一个是超时时间(以秒为单位)。如果在指定的时间内任务没有完成,asyncio.wait_for() 会抛出 asyncio.TimeoutError 异常。

示例代码如下:

import asyncio

async def my_coroutine():
    await asyncio.sleep(2)
    return "任务完成"

async def main():
    try:
        result = await asyncio.wait_for(my_coroutine(), timeout=1)
        print(f"获取到的结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

在上述代码中,my_coroutine 协程函数会睡眠 2 秒,但 asyncio.wait_for() 设置的超时时间为 1 秒,所以会抛出 TimeoutError 异常。

方式二:使用 asyncio.Timeout 上下文管理器
asyncio.Timeout 上下文管理器可以在一个代码块中设置超时时间。如果在超时时间内代码块中的操作没有完成,会抛出 asyncio.TimeoutError 异常。

示例代码如下:

import asyncio

async def my_coroutine():
    await asyncio.sleep(2)
    return "任务完成"

async def main():
    async with asyncio.timeout(1):
        try:
            result = await my_coroutine()
            print(f"获取到的结果: {result}")
        except asyncio.TimeoutError:
            print("任务超时")

asyncio.run(main())

在上述代码中,同样 my_coroutine 协程函数会睡眠 2 秒,而 asyncio.timeout(1) 设置的超时时间为 1 秒,所以会抛出 TimeoutError 异常。

两种方式的主要区别在于使用场景和代码结构。asyncio.wait_for() 更适合直接对单个任务设置超时,而 asyncio.Timeout 上下文管理器更适合在一个代码块中对多个操作进行统一的超时控制,并且可以更好地组织代码结构。

如何为任务添加回调函数并在完成后获取结果?

在 asyncio 中,可以为 Task 对象添加回调函数,以便在任务完成后执行一些额外的操作,并获取任务的结果。

为任务添加回调函数的方法是使用 task.add_done_callback() 方法,该方法接受一个回调函数作为参数,回调函数会在任务完成时被调用。

示例代码如下:

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    return "任务完成"

def callback(task):
    try:
        result = task.result()
        print(f"任务结果: {result}")
    except Exception as e:
        print(f"任务执行出错: {e}")

async def main():
    task = asyncio.create_task(my_coroutine())
    task.add_done_callback(callback)
    await task

asyncio.run(main())

在上述代码中:

  1. 定义了一个协程函数 my_coroutine,它会睡眠 1 秒后返回一个字符串。
  2. 定义了一个回调函数 callback,该函数接受一个 Task 对象作为参数。在回调函数中,使用 task.result() 获取任务的结果,并进行相应的处理。如果任务执行过程中抛出异常,task.result() 会重新抛出该异常,回调函数可以捕获并处理这个异常。
  3. 在 main 协程函数中,创建一个任务 task,并使用 task.add_done_callback(callback) 为任务添加回调函数。然后使用 await task 等待任务完成。

当任务完成时,callback 函数会被自动调用,从而可以在任务完成后获取结果并进行进一步的处理。这样可以实现任务完成后的一些自定义逻辑,如记录日志、更新状态等。

使用 loop.call_soon () 与 loop.call_later () 实现延时调度的场景差异

loop.call_soon()loop.call_later()都是asyncio事件循环loop提供的用于延时调度的方法,但它们在使用场景上存在一些差异。

loop.call_soon()方法会尽快将指定的回调函数添加到事件循环的任务队列中,使其在当前事件循环迭代中尽可能早地执行。也就是说,它的延时几乎为零,只是将任务安排到事件循环的下一次执行机会。例如:

import asyncio

def callback():
    print("回调函数被调用")

loop = asyncio.get_event_loop()
loop.call_soon(callback)
loop.run_until_complete(asyncio.sleep(0))
loop.close()

在上述代码中,loop.call_soon(callback)callback函数尽快添加到事件循环的任务队列中,asyncio.sleep(0)只是让事件循环有机会执行队列中的任务,最终callback函数会被调用并输出相应信息。loop.call_soon()适用于那些需要在当前事件循环周期内立即执行的回调任务,比如在某个操作完成后,需要立即触发一些后续处理逻辑,且不希望有额外的延迟。

loop.call_later()方法则会在指定的延迟时间(以秒为单位)后,将回调函数添加到事件循环的任务队列中执行。它可以实现真正意义上的延时调度。示例如下:

import asyncio

def callback():
    print("回调函数被调用")

loop = asyncio.get_event_loop()
loop.call_later(2, callback)
loop.run_until_complete(asyncio.sleep(3))
loop.close()

在这个例子中,loop.call_later(2, callback)表示在 2 秒后将callback函数添加到事件循环的任务队列中。asyncio.sleep(3)让事件循环持续运行 3 秒,确保callback函数有机会被执行。loop.call_later()适用于需要在一段时间延迟后执行某个任务的场景,比如定时任务、在某个操作完成后等待一段时间再执行后续操作等。

总体而言,loop.call_soon()用于几乎无延迟的立即执行任务调度,而loop.call_later()用于有明确延迟时间的任务调度,开发者可以根据具体的业务需求选择合适的方法。

事件循环的 run_forever () 与 run_until_complete () 适用场景对比

asyncio事件循环的run_forever()run_until_complete()方法在功能和适用场景上有明显的区别。

run_forever()方法会让事件循环无限期地运行下去,不断地处理事件队列中的任务,直到手动调用loop.stop()来停止事件循环。它适用于需要持续运行并不断处理各种异步事件的场景,比如服务器应用。以一个简单的异步 TCP 服务器为例:

import asyncio

async def handle_connection(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print(f"收到来自 {addr} 的消息: {message}")

    writer.write(b"你好,已收到你的消息")
    await writer.drain()

    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_connection, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f"服务器已启动,监听 {addr}")

    async with server:
        await server.serve_forever()

loop = asyncio.get_event_loop()
loop.run_forever()
loop.close()

在上述代码中,server.serve_forever()配合loop.run_forever()使服务器持续运行,不断接受和处理客户端的连接请求。

run_until_complete()方法接受一个FutureTask对象作为参数,会运行事件循环直到传入的FutureTask完成。一旦任务完成,事件循环就会停止。它适用于执行单个或一组明确的异步任务,当这些任务执行完毕后,程序继续后续的逻辑。例如:

import asyncio

async def task():
    await asyncio.sleep(2)
    print("任务完成")

loop = asyncio.get_event_loop()
loop.run_until_complete(task())
loop.close()

这里loop.run_until_complete(task())会运行事件循环,直到task协程完成,然后关闭事件循环。

综上所述,run_forever()适用于需要持续运行的服务或应用,而run_until_complete()适用于执行特定的、可预期完成的异步任务的场景。

为什么在协程中操作全局变量可能导致竞态条件?给出解决方案

在协程中操作全局变量可能导致竞态条件,这是因为asyncio中的协程是在单线程的事件循环中执行的,虽然不存在传统意义上多线程并发执行的情况,但协程之间可以通过await进行切换。当多个协程同时访问和修改全局变量时,就可能出现竞态条件。

例如,假设有两个协程都要对一个全局计数器进行加 1 操作:

import asyncio

count = 0

async def increment():
    global count
    value = count
    await asyncio.sleep(0)
    count = value + 1

async def main():
    tasks = [increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(count)

asyncio.run(main())

在上述代码中,increment协程先获取全局变量count的值,然后通过await asyncio.sleep(0)让事件循环有机会切换到其他协程。如果在切换期间其他协程也获取了相同的count值并进行修改,那么最终的结果可能并不是预期的 100(因为存在重复读取旧值并覆盖的情况)。

为了解决这个问题,可以使用锁(Lock)来保证在同一时间只有一个协程可以访问和修改全局变量。asyncio.Lock是一个异步锁,示例代码如下:

import asyncio

count = 0
lock = asyncio.Lock()

async def increment():
    global count
    async with lock:
        value = count
        await asyncio.sleep(0)
        count = value + 1

async def main():
    tasks = [increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(count)

asyncio.run(main())

在这个改进的代码中,async with lock确保了在进入关键代码段(对全局变量count的操作)时,其他协程无法同时进入,从而避免了竞态条件,保证了count的正确递增。

实现基于 asyncio.Lock 的线程安全计数器(需处理协程挂起时的上下文切换)

要实现基于asyncio.Lock的线程安全计数器,并处理协程挂起时的上下文切换,可以按照以下步骤进行:

import asyncio

class ThreadSafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:
            self.count += 1
            await asyncio.sleep(0)  # 模拟可能的协程挂起

    async def decrement(self):
        async with self.lock:
            self.count -= 1
            await asyncio.sleep(0)  # 模拟可能的协程挂起

    async def get_count(self):
        async with self.lock:
            return self.count

async def main():
    counter = ThreadSafeCounter()
    tasks = [
        counter.increment() for _ in range(50)
    ] + [
        counter.decrement() for _ in range(30)
    ]

    await asyncio.gather(*tasks)
    result = await counter.get_count()
    print(f"计数器的最终值: {result}")

asyncio.run(main())

在上述代码中:

  1. 定义了一个ThreadSafeCounter类,其中包含一个全局变量count用于存储计数器的值,以及一个asyncio.Lock对象lock用于实现线程安全。
  2. increment方法用于增加计数器的值,在进入方法时获取锁,确保在修改count时不会有其他协程同时访问。await asyncio.sleep(0)模拟了协程可能的挂起操作,但由于持有锁,不会影响计数器的正确性。
  3. decrement方法用于减少计数器的值,原理与increment方法相同。
  4. get_count方法用于获取计数器的当前值,同样需要获取锁以保证数据的一致性。
  5. main协程中,创建了多个任务,包括增加和减少计数器的操作,最后获取并打印计数器的最终值。

通过使用asyncio.Lock,即使在协程挂起和上下文切换的情况下,也能保证计数器的操作是线程安全的。

解释 asyncio.Semaphore 在限流场景中的应用(如 API 并发请求限制)

asyncio.Semaphoreasyncio库中用于控制并发数量的工具,在限流场景中有着广泛的应用,比如限制对 API 的并发请求数量。

Semaphore(信号量)有一个内部计数器,初始值在创建时指定。当一个协程想要执行受限制的操作时,它需要先获取信号量(通过await semaphore.acquire()),如果信号量的计数器大于 0,则获取成功,计数器减 1;如果计数器为 0,则协程会被阻塞,直到有其他协程释放信号量(通过semaphore.release()),计数器加 1 后才能继续执行。

以限制对 API 的并发请求数量为例,假设我们要对某个 API 进行请求,并且限制同时最多只能有 3 个请求并发进行:

import asyncio

async def api_request(semaphore, url):
    async with semaphore:
        print(f"开始请求 {url}")
        await asyncio.sleep(2)  # 模拟API请求的耗时操作
        print(f"完成请求 {url}")

async def main():
    urls = [
        f"https://api.example.com/{i}" for i in range(5)
    ]
    semaphore = asyncio.Semaphore(3)
    tasks = [api_request(semaphore, url) for url in urls]
    await asyncio.gather(*tasks)

asyncio.run(main())

在上述代码中:

  1. api_request协程函数表示对 API 的请求操作,在进入函数时,通过async with semaphore获取信号量。如果信号量的计数器大于 0,则可以继续执行请求操作,这里使用await asyncio.sleep(2)模拟请求的耗时操作;如果计数器为 0,则协程会被阻塞,直到有其他协程完成请求并释放信号量。
  2. main协程中,定义了要请求的 URL 列表urls,创建了一个初始值为 3 的信号量semaphore,表示最多允许 3 个并发请求。然后创建了多个任务,每个任务都调用api_request函数进行 API 请求。
  3. 通过asyncio.gather(*tasks)等待所有任务完成,由于信号量的限制,同时最多只有 3 个请求会并发执行,从而实现了对 API 请求的限流。

通过使用asyncio.Semaphore,可以有效地控制并发操作的数量,避免因过多的并发请求导致目标系统(如 API 服务器)负载过高或出现其他问题。

如何通过 asyncio.Event 实现多任务同步启动?

asyncio.Event 是 asyncio 库中用于在协程之间进行同步的工具,它可以实现多任务的同步启动。asyncio.Event 对象有一个内部标志,初始状态为 False,可以通过 set() 方法将其设置为 True,通过 clear() 方法将其重置为 False,协程可以使用 wait() 方法等待该标志变为 True

以下是一个使用 asyncio.Event 实现多任务同步启动的示例:

import asyncio

async def task(event, task_id):
    print(f"任务 {task_id} 准备就绪,等待启动信号")
    await event.wait()
    print(f"任务 {task_id} 启动")
    await asyncio.sleep(1)
    print(f"任务 {task_id} 完成")

async def main():
    event = asyncio.Event()
    tasks = [task(event, i) for i in range(3)]
    # 模拟一些准备工作
    await asyncio.sleep(2)
    print("所有准备工作完成,发送启动信号")
    event.set()
    await asyncio.gather(*tasks)

asyncio.run(main())

在上述代码中,首先定义了一个 task 协程函数,它接受一个 asyncio.Event 对象和一个任务 ID 作为参数。在 task 函数中,协程会先打印准备就绪的信息,然后调用 event.wait() 方法等待事件的标志变为 True。当事件标志变为 True 时,协程会继续执行后续的任务。

在 main 协程函数中,创建了一个 asyncio.Event 对象,并创建了多个 task 协程任务。然后模拟了一些准备工作,等待 2 秒后,调用 event.set() 方法将事件的标志设置为 True,此时所有等待该事件的协程都会继续执行。最后使用 asyncio.gather() 方法等待所有任务完成。

通过这种方式,asyncio.Event 可以确保多个任务在收到启动信号后同时开始执行,实现了多任务的同步启动。

使用 asyncio.Condition 实现生产者 - 消费者模型的异步版本

asyncio.Condition 是 asyncio 库中用于在协程之间进行同步的高级工具,它可以结合 asyncio.Lock 和 asyncio.Event 的功能,用于实现生产者 - 消费者模型的异步版本。

以下是一个使用 asyncio.Condition 实现生产者 - 消费者模型的示例:

import asyncio

async def producer(condition, queue):
    for i in range(5):
        async with condition:
            while len(queue) >= 3:
                print("队列已满,生产者等待")
                await condition.wait()
            queue.append(i)
            print(f"生产者生产了 {i}")
            condition.notify()
        await asyncio.sleep(1)

async def consumer(condition, queue):
    while True:
        async with condition:
            while not queue:
                print("队列为空,消费者等待")
                await condition.wait()
            item = queue.pop(0)
            print(f"消费者消费了 {item}")
            condition.notify()
        await asyncio.sleep(0.5)

async def main():
    queue = []
    condition = asyncio.Condition()
    producer_task = asyncio.create_task(producer(condition, queue))
    consumer_task = asyncio.create_task(consumer(condition, queue))
    await asyncio.gather(producer_task, consumer_task)

asyncio.run(main())

在上述代码中,首先定义了 producer 协程函数和 consumer 协程函数。producer 函数负责向队列中添加元素,当队列已满时,它会调用 condition.wait() 方法等待,直到有消费者消费了元素并通知它。consumer 函数负责从队列中取出元素,当队列为空时,它会调用 condition.wait() 方法等待,直到有生产者生产了元素并通知它。

在 main 协程函数中,创建了一个空队列和一个 asyncio.Condition 对象,然后分别创建了生产者任务和消费者任务,并使用 asyncio.gather() 方法等待它们执行。

通过使用 asyncio.Condition,可以在协程之间实现高效的同步,确保生产者和消费者之间的协作,避免了数据竞争和不一致的问题。

死锁场景复现:两个协程互相等待对方释放锁

死锁是指两个或多个协程相互等待对方释放资源,从而导致所有协程都无法继续执行的情况。在 asyncio 中,可以通过两个协程互相等待对方释放锁来复现死锁场景。

以下是一个死锁场景的示例:

import asyncio

lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def coroutine1():
    print("协程 1 尝试获取锁 1")
    await lock1.acquire()
    print("协程 1 已获取锁 1,尝试获取锁 2")
    await asyncio.sleep(1)
    await lock2.acquire()
    print("协程 1 已获取锁 2")
    lock2.release()
    lock1.release()

async def coroutine2():
    print("协程 2 尝试获取锁 2")
    await lock2.acquire()
    print("协程 2 已获取锁 2,尝试获取锁 1")
    await asyncio.sleep(1)
    await lock1.acquire()
    print("协程 2 已获取锁 1")
    lock1.release()
    lock2.release()

async def main():
    task1 = asyncio.create_task(coroutine1())
    task2 = asyncio.create_task(coroutine2())
    await asyncio.gather(task1, task2)

asyncio.run(main())

在上述代码中,定义了两个锁 lock1 和 lock2,以及两个协程 coroutine1 和 coroutine2coroutine1 先获取 lock1,然后尝试获取 lock2coroutine2 先获取 lock2,然后尝试获取 lock1。由于两个协程都在等待对方释放锁,因此会陷入死锁状态。

当运行这个程序时,会发现程序会一直阻塞,无法继续执行,这就是死锁的表现。

如何避免因未释放锁导致的协程永久阻塞?

在 asyncio 中,未释放锁可能会导致协程永久阻塞,为了避免这种情况,可以采取以下几种方法:

使用 async with 语句async with 语句会自动管理锁的获取和释放,确保在代码块执行完毕后,锁会被正确释放。

import asyncio

lock = asyncio.Lock()

async def task():
    async with lock:
        print("获取到锁,执行任务")
        await asyncio.sleep(1)
        print("任务完成,释放锁")

async def main():
    task1 = asyncio.create_task(task())
    task2 = asyncio.create_task(task())
    await asyncio.gather(task1, task2)

asyncio.run(main())

在上述代码中,使用 async with lock 语句获取锁,当代码块执行完毕后,锁会自动释放,即使代码块中出现异常,锁也会被正确释放。

使用 try...finally 语句:如果不使用 async with 语句,也可以使用 try...finally 语句来确保锁的释放。

import asyncio

lock = asyncio.Lock()

async def task():
    await lock.acquire()
    try:
        print("获取到锁,执行任务")
        await asyncio.sleep(1)
        print("任务完成,释放锁")
    finally:
        lock.release()

async def main():
    task1 = asyncio.create_task(task())
    task2 = asyncio.create_task(task())
    await asyncio.gather(task1, task2)

asyncio.run(main())

在上述代码中,在 try 块中获取锁并执行任务,无论任务是否成功完成,都会在 finally 块中释放锁。

设置超时时间:可以使用 asyncio.wait_for() 函数为获取锁的操作设置超时时间,避免协程长时间等待锁。

import asyncio

lock = asyncio.Lock()

async def task():
    try:
        await asyncio.wait_for(lock.acquire(), timeout=2)
        print("获取到锁,执行任务")
        await asyncio.sleep(1)
        print("任务完成,释放锁")
        lock.release()
    except asyncio.TimeoutError:
        print("获取锁超时")

async def main():
    task1 = asyncio.create_task(task())
    task2 = asyncio.create_task(task())
    await asyncio.gather(task1, task2)

asyncio.run(main())

在上述代码中,使用 asyncio.wait_for() 函数为 lock.acquire() 操作设置了 2 秒的超时时间,如果在 2 秒内无法获取到锁,会抛出 asyncio.TimeoutError 异常,避免协程永久阻塞。

对比 asyncio.Queue 与标准库 queue.Queue 的线程安全性差异

asyncio.Queue 和标准库 queue.Queue 都用于实现队列数据结构,但它们在线程安全性方面存在差异。

queue.Queuequeue.Queue 是标准库中用于多线程编程的队列实现,它是线程安全的。这意味着在多线程环境中,多个线程可以同时对队列进行入队和出队操作,而不会出现数据竞争和不一致的问题。queue.Queue 使用锁机制来保证线程安全,当一个线程对队列进行操作时,会获取锁,其他线程需要等待锁释放后才能进行操作。

以下是一个使用 queue.Queue 的示例:

import threading
import queue
import time

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        print(f"生产者生产了 {i}")
        time.sleep(1)

def consumer():
    while True:
        item = q.get()
        print(f"消费者消费了 {item}")
        q.task_done()

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.join()

在上述代码中,producer 线程负责向队列中添加元素,consumer 线程负责从队列中取出元素,由于 queue.Queue 是线程安全的,多个线程可以安全地对队列进行操作。

asyncio.Queueasyncio.Queue 是 asyncio 库中用于异步编程的队列实现,它是协程安全的。在异步编程中,协程是在单线程的事件循环中执行的,不存在多线程并发的问题,但协程之间可以通过 await 进行切换。asyncio.Queue 使用异步锁和条件变量来保证协程安全,当一个协程对队列进行操作时,其他协程需要等待。

以下是一个使用 asyncio.Queue 的示例:

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"生产者生产了 {i}")
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"消费者消费了 {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await producer_task
    await queue.join()
    consumer_task.cancel()

asyncio.run(main())

在上述代码中,producer 协程负责向队列中添加元素,consumer 协程负责从队列中取出元素,由于 asyncio.Queue 是协程安全的,多个协程可以安全地对队列进行操作。

综上所述,queue.Queue 适用于多线程编程,保证线程安全;asyncio.Queue 适用于异步编程,保证协程安全。

实现优先级任务队列控制高优先级任务插队执行

在 asyncio 中,若要实现优先级任务队列,让高优先级任务插队执行,可借助 heapq 模块。heapq 是 Python 标准库中的堆队列算法实现,它能维护一个最小堆,方便我们根据任务的优先级来管理任务。

首先,需要定义一个任务类,包含任务的优先级和具体的协程。接着,使用 asyncio.Queue 结合 heapq 来实现优先级队列。以下是示例代码:

import asyncio
import heapq

class PriorityTask:
    def __init__(self, priority, coro):
        self.priority = priority
        self.coro = coro

    def __lt__(self, other):
        return self.priority < other.priority

class PriorityQueue:
    def __init__(self):
        self.queue = []
        self.lock = asyncio.Lock()

    async def put(self, task):
        async with self.lock:
            heapq.heappush(self.queue, task)

    async def get(self):
        async with self.lock:
            return heapq.heappop(self.queue)

    def empty(self):
        return len(self.queue) == 0

async def task_func(task_id):
    print(f"Task {task_id} started")
    await asyncio.sleep(1)
    print(f"Task {task_id} finished")

async def main():
    priority_queue = PriorityQueue()
    tasks = [
        PriorityTask(3, task_func(1)),
        PriorityTask(1, task_func(2)),
        PriorityTask(2, task_func(3))
    ]

    for task in tasks:
        await priority_queue.put(task)

    while not priority_queue.empty():
        next_task = await priority_queue.get()
        await next_task.coro

asyncio.run(main())

在上述代码中,PriorityTask 类用于封装任务的优先级和协程,__lt__ 方法用于比较任务的优先级。PriorityQueue 类实现了优先级队列的基本操作,包括 put 方法用于添加任务,get 方法用于获取优先级最高的任务。在 main 协程中,创建了多个不同优先级的任务并添加到队列中,然后依次取出并执行任务,确保高优先级任务先执行。

使用 asyncio.Barrier 实现多阶段并行任务同步

asyncio.Barrier 是 asyncio 库中用于多任务同步的工具,它可以让多个协程在某个点上等待,直到所有协程都到达该点后再继续执行。利用 asyncio.Barrier 可以实现多阶段并行任务的同步。

以下是一个使用 asyncio.Barrier 实现多阶段并行任务同步的示例:

import asyncio

async def stage_one(barrier, task_id):
    print(f"Task {task_id} started stage one")
    await asyncio.sleep(1)
    print(f"Task {task_id} finished stage one, waiting at barrier")
    await barrier.wait()
    print(f"Task {task_id} passed the barrier and starting stage two")

async def main():
    num_tasks = 3
    barrier = asyncio.Barrier(num_tasks)
    tasks = [asyncio.create_task(stage_one(barrier, i)) for i in range(num_tasks)]
    await asyncio.gather(*tasks)

asyncio.run(main())

在上述代码中,stage_one 协程表示一个任务的第一阶段,在该阶段结束后,协程会调用 barrier.wait() 方法等待其他任务。main 协程中创建了 asyncio.Barrier 对象,并指定了需要同步的任务数量。然后创建了多个任务并使用 asyncio.gather 并发执行这些任务。当所有任务都调用了 barrier.wait() 后,它们会同时继续执行后续的代码,实现了多阶段并行任务的同步。

使用 aiohttp 实现异步 HTTP 客户端并发请求 10 个 URL

aiohttp 是一个用于实现异步 HTTP 客户端和服务器的 Python 库。使用 aiohttp 可以方便地实现异步 HTTP 客户端并发请求多个 URL。

以下是一个使用 aiohttp 并发请求 10 个 URL 的示例:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        f"https://httpbin.org/get?num={i}" for i in range(10)
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result[:100])

asyncio.run(main())

在上述代码中,fetch 协程用于发送 HTTP 请求并获取响应内容。main 协程中创建了 aiohttp.ClientSession 对象,该对象用于管理 HTTP 连接。然后创建了多个 fetch 任务,每个任务对应一个 URL。最后使用 asyncio.gather 并发执行这些任务,并获取所有任务的结果。通过这种方式,可以高效地并发请求多个 URL,避免了串行请求带来的性能问题。

通过 asyncio.open_connection () 实现 TCP 客户端消息收发

asyncio.open_connection() 是 asyncio 库中用于创建 TCP 连接的函数,利用它可以实现 TCP 客户端的消息收发。

以下是一个使用 asyncio.open_connection() 实现 TCP 客户端消息收发的示例:

import asyncio

async def tcp_client():
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888
    )

    message = 'Hello, server!'
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()}')

    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_client())

在上述代码中,asyncio.open_connection() 函数用于建立与服务器的 TCP 连接,它返回一个 StreamReader 和一个 StreamWriter 对象。StreamWriter 对象用于向服务器发送消息,StreamReader 对象用于接收服务器的响应。在代码中,先向服务器发送了一条消息,然后等待服务器的响应并打印出来。最后关闭连接。

解释 StreamReader 与 StreamWriter 在异步 Socket 编程中的作用

在异步 Socket 编程中,StreamReader 和 StreamWriter 是 asyncio 库提供的两个重要对象,它们分别用于处理数据的读取和写入。

StreamReader 主要用于从套接字中异步读取数据。它提供了一系列方法,如 read()readline()readexactly() 等。read() 方法用于读取指定数量的字节,readline() 方法用于读取一行数据,readexactly() 方法用于读取指定长度的数据。StreamReader 是异步的,当调用这些方法时,如果没有数据可读,协程会暂停执行,直到有数据到达或发生错误,这样可以避免阻塞事件循环,提高程序的并发性能。

StreamWriter 主要用于向套接字中异步写入数据。它提供了 write() 方法用于写入数据,drain() 方法用于刷新缓冲区。write() 方法将数据写入缓冲区,而 drain() 方法会等待缓冲区中的数据被实际发送出去。同样,这些操作都是异步的,当调用 drain() 方法时,如果缓冲区已满或网络拥塞,协程会暂停执行,直到数据可以发送出去。

例如,在前面的 TCP 客户端示例中,StreamReader 对象用于接收服务器的响应,StreamWriter 对象用于向服务器发送消息。通过使用 StreamReader 和 StreamWriter,可以方便地实现异步的 Socket 通信,提高程序的性能和响应能力。

如何用 asyncio 实现 UDP 服务器广播功能?

在 asyncio 中实现 UDP 服务器广播功能,需要借助 asyncio.DatagramProtocol 协议类。UDP 广播是指将消息发送到网络中的所有设备。

首先,要创建一个自定义的 DatagramProtocol 类,该类需实现 connection_made 和 datagram_received 方法。connection_made 方法在建立连接时被调用,datagram_received 方法在接收到数据报时被调用。

以下是实现 UDP 服务器广播功能的示例代码:

import asyncio

class UDPServerProtocol(asyncio.DatagramProtocol):
    def __init__(self):
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('UDP 服务器已启动')

    def datagram_received(self, data, addr):
        message = data.decode()
        print(f'收到来自 {addr} 的消息: {message}')
        # 广播消息
        broadcast_address = ('<broadcast>', 9999)
        self.transport.sendto(data, broadcast_address)

async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: UDPServerProtocol(),
        local_addr=('0.0.0.0', 9999),
        allow_broadcast=True
    )

    try:
        await asyncio.sleep(3600)
    finally:
        transport.close()

asyncio.run(main())

在上述代码中,UDPServerProtocol 类继承自 asyncio.DatagramProtocol,在 datagram_received 方法中,当接收到数据报后,将其广播到指定的广播地址。main 函数中,使用 loop.create_datagram_endpoint 方法创建 UDP 端点,并设置允许广播。通过 allow_broadcast=True 开启广播功能。最后,程序会运行 3600 秒,期间可以不断接收和广播消息。

异步文件读写的最佳实践(结合线程池与 loop.run_in_executor ())

在 asyncio 中,文件读写操作是阻塞的,为了实现异步文件读写,可以结合线程池和 loop.run_in_executor() 方法。

loop.run_in_executor() 方法可以将阻塞的文件读写操作放到线程池中执行,从而避免阻塞事件循环。以下是异步文件读写的示例代码:

import asyncio
import concurrent.futures

async def read_file_async(file_path):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        def read_file():
            with open(file_path, 'r') as f:
                return f.read()
        return await loop.run_in_executor(executor, read_file)

async def write_file_async(file_path, content):
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        def write_file():
            with open(file_path, 'w') as f:
                f.write(content)
        return await loop.run_in_executor(executor, write_file)

async def main():
    file_path = 'test.txt'
    content = 'Hello, asyncio!'
    await write_file_async(file_path, content)
    result = await read_file_async(file_path)
    print(result)

asyncio.run(main())

在上述代码中,read_file_async 函数用于异步读取文件,write_file_async 函数用于异步写入文件。在这两个函数中,都使用了 concurrent.futures.ThreadPoolExecutor 创建线程池,并通过 loop.run_in_executor() 方法将阻塞的文件读写操作提交到线程池中执行。在 main 函数中,先调用 write_file_async 写入文件内容,再调用 read_file_async 读取文件内容并打印。

使用 aiomysql 实现数据库连接池的异步查询。

aiomysql 是一个用于异步操作 MySQL 数据库的 Python 库,借助它可以实现数据库连接池的异步查询。

首先,要安装 aiomysql 库,然后创建数据库连接池,使用连接池进行异步查询。以下是示例代码:

import asyncio
import aiomysql

async def query_data(pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM your_table")
            result = await cur.fetchall()
            for row in result:
                print(row)

async def main():
    pool = await aiomysql.create_pool(
        host='localhost',
        port=3306,
        user='your_user',
        password='your_password',
        db='your_database',
        autocommit=True
    )

    try:
        await query_data(pool)
    finally:
        pool.close()
        await pool.wait_closed()

asyncio.run(main())

在上述代码中,query_data 函数用于执行异步查询操作。在 main 函数中,使用 aiomysql.create_pool 方法创建数据库连接池,设置数据库的连接信息。然后调用 query_data 函数进行查询,最后关闭连接池。通过使用连接池,可以复用数据库连接,提高查询效率。

WebSocket 服务器开发:处理连接保持与心跳检测。

在 WebSocket 服务器开发中,连接保持和心跳检测是非常重要的功能。连接保持可以确保客户端和服务器之间的连接不会因为长时间无数据传输而断开,心跳检测可以及时发现断开的连接并进行处理。

以下是使用 websockets 库实现 WebSocket 服务器的连接保持与心跳检测的示例代码:

import asyncio
import websockets

async def heartbeat(websocket, path):
    try:
        while True:
            try:
                # 发送心跳消息
                await websocket.send('ping')
                # 等待客户端响应
                pong_waiter = await websocket.ping()
                await asyncio.wait_for(pong_waiter, timeout=5)
            except asyncio.TimeoutError:
                print('心跳检测超时,关闭连接')
                break
            await asyncio.sleep(10)
    except websockets.exceptions.ConnectionClosedOK:
        print('连接正常关闭')

start_server = websockets.serve(heartbeat, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在上述代码中,heartbeat 函数用于实现心跳检测。在函数中,服务器每隔 10 秒向客户端发送一个 ping 消息,并等待客户端的 pong 响应。如果在 5 秒内没有收到响应,则认为连接超时,关闭连接。websockets.serve 方法用于启动 WebSocket 服务器,监听本地的 8765 端口。

实现异步 DNS 解析器(结合 aiodns 库)。

aiodns 是一个用于异步 DNS 解析的 Python 库,结合 asyncio 可以实现异步 DNS 解析器。

首先,要安装 aiodns 库,然后使用它进行异步 DNS 解析。以下是示例代码:

import asyncio
import aiodns

async def resolve_dns(resolver, domain):
    try:
        result = await resolver.query(domain, 'A')
        for record in result:
            print(f'{domain} 的 IP 地址是: {record.host}')
    except aiodns.error.DNSError as e:
        print(f'解析 {domain} 时出错: {e}')

async def main():
    resolver = aiodns.DNSResolver()
    domains = ['example.com', 'google.com']
    tasks = [resolve_dns(resolver, domain) for domain in domains]
    await asyncio.gather(*tasks)

asyncio.run(main())

在上述代码中,resolve_dns 函数用于异步解析指定域名的 IP 地址。在 main 函数中,创建了 aiodns.DNSResolver 对象,然后创建了多个解析任务,使用 asyncio.gather 方法并发执行这些任务。通过这种方式,可以高效地实现异步 DNS 解析。

分享

以下是对 5 道 asyncio 面试题的回答:

如何统一捕获协程链中未处理的异常

asyncio中,可以使用try-except块来捕获协程链中的异常。通过在最外层的协程中使用try-except,可以捕获整个协程链中未处理的异常。

import asyncio

async def inner_coroutine():
    # 这里模拟一个会抛出异常的操作
    raise ValueError("这是一个测试异常")

async def outer_coroutine():
    await inner_coroutine()

async def main():
    try:
        await outer_coroutine()
    except ValueError as e:
        print(f"捕获到异常: {e}")

asyncio.run(main())

还可以使用asyncio.Taskadd_done_callback方法来捕获异常。当任务完成时,如果任务中抛出了未处理的异常,可以在回调函数中获取到异常信息。

import asyncio

async def coroutine_with_exception():
    raise ValueError("这是一个测试异常")

async def main():
    task = asyncio.create_task(coroutine_with_exception())

    def handle_exception(task):
        if task.exception() is not None:
            print(f"捕获到异常: {task.exception()}")

    task.add_done_callback(handle_exception)
    await task

asyncio.run(main())

使用 asyncio.get_event_loop ().set_exception_handler () 定制异常日志

asyncio.get_event_loop().set_exception_handler()方法可以用来设置一个自定义的异常处理函数,用于处理事件循环中未捕获的异常。以下是一个示例:

import asyncio

def exception_handler(loop, context):
    # 这里可以根据需要进行更详细的日志记录
    print(f"异常发生: {context['exception']}")
    print(f"任务: {context['task']}")

async def coroutine_with_exception():
    raise ValueError("这是一个测试异常")

async def main():
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(exception_handler)

    task = asyncio.create_task(coroutine_with_exception())
    await task

asyncio.run(main())

在上述代码中,定义了一个exception_handler函数,它接受loopcontext两个参数。context是一个字典,包含了有关异常的信息,如异常对象、触发异常的任务等。通过设置这个异常处理函数,当事件循环中出现未捕获的异常时,就会调用这个函数来进行定制的日志记录或其他处理操作。

调试技巧:通过 task.print_stack () 追踪协程挂起点

task.print_stack()asyncio中用于调试的一个方法,它可以打印出协程当前的堆栈信息,帮助开发者追踪协程的执行路径和挂起点。以下是一个简单的示例:

import asyncio

async def my_coroutine():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程结束")

async def main():
    task = asyncio.create_task(my_coroutine())
    await asyncio.sleep(0.5)
    task.print_stack()
    await task

asyncio.run(main())

在代码中,创建了一个协程my_coroutine,它会暂停 1 秒。在main函数中,创建了一个任务并在 0.5 秒后调用task.print_stack()。此时,协程应该还在await asyncio.sleep(1)这一行暂停,task.print_stack()会打印出当前协程的堆栈信息,显示出协程暂停的位置。

如何处理因任务取消引发的 CancelledError

当使用asyncio时,任务可能会被取消,这会引发CancelledError异常。可以在协程中使用try-except块来捕获CancelledError异常,以进行适当的处理。

import asyncio

async def my_coroutine():
    try:
        # 这里模拟一个长时间运行的操作
        await asyncio.sleep(5)
        print("任务完成")
    except asyncio.CancelledError:
        print("任务被取消")

async def main():
    task = asyncio.create_task(my_coroutine())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

在代码中,my_coroutine协程会暂停 5 秒,在main函数中,创建任务后等待 2 秒然后取消任务。当任务被取消时,my_coroutine中的try-except块会捕获CancelledError异常,并打印出 "任务被取消"。

使用 contextvars 实现异步上下文中的请求 ID 透传

contextvars是 Python 的一个标准库,用于在异步上下文中传递数据。以下是一个使用contextvars实现请求 ID 透传的示例:

import asyncio
import contextvars

# 创建一个上下文变量
request_id_var = contextvars.ContextVar('request_id')

async def handle_request(request_id):
    # 设置当前上下文的请求ID
    request_id_var.set(request_id)
    print(f"处理请求 {request_id_var.get()}")
    await asyncio.sleep(1)
    print(f"请求 {request_id_var.get()} 处理完成")

async def main():
    # 模拟多个请求
    tasks = [asyncio.create_task(handle_request(i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())

在上述代码中,首先创建了一个contextvar对象request_id_var,用于存储请求 ID。在handle_request协程中,通过request_id_var.set(request_id)将请求 ID 设置到当前上下文中,然后可以通过request_id_var.get()获取当前上下文的请求 ID。这样,在不同的协程中都可以获取到相同的请求 ID,实现了请求 ID 在异步上下文中的透传。

分享

以下是对三道 asyncio 面试题的回答:

如何通过 asyncio.Semaphore 限制并发连接数避免资源耗尽?

asyncio.Semaphore是一个用于控制并发访问资源的同步原语,通过它可以方便地限制并发连接数,从而避免资源耗尽。以下是具体的原理和实现方式:

  • 原理asyncio.Semaphore内部维护了一个计数器,用于表示可用资源的数量。当协程想要访问受限制的资源时,需要先获取信号量。如果计数器大于 0,则获取成功,计数器减 1;如果计数器为 0,则协程会被阻塞,直到其他协程释放信号量,使计数器大于 0。
  • 代码示例

import asyncio

# 模拟网络连接任务
async def connect_to_server(semaphore, url):
    async with semaphore:
        print(f'开始连接 {url}')
        # 这里模拟连接耗时
        await asyncio.sleep(2)
        print(f'完成连接 {url}')

async def main():
    # 创建信号量,限制并发连接数为3
    semaphore = asyncio.Semaphore(3)
    urls = [f'http://example.com/{i}' for i in range(5)]
    tasks = [asyncio.create_task(connect_to_server(semaphore, url)) for url in urls]
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

在上述代码中,创建了一个信号量semaphore,并将其并发连接数限制为 3。然后,为每个 URL 创建一个连接任务,这些任务在获取信号量后才会执行连接操作。由于信号量的限制,同一时间最多只有 3 个任务可以进行连接操作,从而避免了资源耗尽。

使用 cProfile 分析异步程序的性能瓶颈

cProfile是 Python 标准库中的一个性能分析工具,可以用来分析程序中各个函数的执行时间、调用次数等信息,从而帮助找到性能瓶颈。在分析异步程序时,cProfile同样可以发挥作用,以下是使用步骤:

  • 导入必要的模块:首先需要导入cProfile和要分析的异步函数所在的模块。
  • 定义分析函数:创建一个函数,在函数内部调用要分析的异步函数,并使用asyncio.run()来运行异步函数。
  • 使用 cProfile.run () 进行分析:在主程序中,使用cProfile.run()来运行分析函数,它会执行异步函数,并输出性能分析结果。
  • 分析结果cProfile的输出结果包含了各个函数的执行时间、调用次数等信息。重点关注cumtime列,它表示函数及其调用的所有子函数的累计执行时间。通常,累计执行时间较长的函数可能是性能瓶颈所在。

以下是一个简单的示例:

import asyncio
import cProfile

async def async_task():
    await asyncio.sleep(1)

async def main():
    tasks = [asyncio.create_task(async_task()) for _ in range(5)]
    await asyncio.gather(*tasks)

def profile_main():
    asyncio.run(main())

if __name__ == '__main__':
    cProfile.run('profile_main()')

在实际应用中,可以根据cProfile的输出结果,进一步优化异步程序中耗时较长的函数或操作,例如优化数据库查询、减少网络请求次数等。

解释 asyncio 与 gevent 在实现协程时的底层差异

asyncio 和 gevent 都是 Python 中用于实现异步编程的库,它们在实现协程时存在一些底层差异:

  • 事件循环
    • asyncio:是 Python 标准库的一部分,其事件循环基于操作系统的 I/O 多路复用机制,如selectpollepoll等。它提供了一个较为底层和灵活的事件驱动框架,开发者可以直接操作事件循环,注册和管理各种 I/O 事件和回调函数。
    • gevent:使用了greenlet库来实现轻量级的协程,其事件循环是基于libevlibuv等 C 库,提供了高效的 I/O 多路复用和事件驱动机制。gevent 的事件循环对开发者来说相对更透明,开发者主要通过 gevent 提供的高层 API 来操作协程和事件循环。
  • 协程切换
    • asyncio:协程的切换是通过await关键字来显式地暂停和恢复协程的执行。当协程遇到await表达式时,它会将控制权交回给事件循环,事件循环会在适当的时候调度其他协程执行,当await的条件满足时,再恢复该协程的执行。
    • gevent:协程的切换是通过greenletswitch方法来实现的,在 gevent 中,协程函数内部通过gevent.sleep()等方式来主动让出执行权,触发协程的切换。gevent 会自动检测 I/O 操作等可能导致阻塞的情况,并在阻塞发生时自动切换到其他协程。
  • 对标准库的修改
    • asyncio:尽量保持与 Python 标准库的兼容性,对标准库的修改和替换较少。在使用 asyncio 时,通常需要使用asyncawait关键字来定义和使用协程,并且需要使用 asyncio 提供的异步版本的 I/O 操作函数和工具。
    • gevent:会对 Python 标准库中的一些 I/O 相关的模块进行猴子补丁(monkey patching),将标准的阻塞 I/O 操作替换为非阻塞的版本,使得在使用标准库的 I/O 函数时,能够自动实现异步操作,无需显式地修改代码中的 I/O 调用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983924.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux学习篇】--开发工具第一期

目录 1. Linux编辑器的使用--vim使用 1.1 vim的基本概念 1.2 vim基本操作 1.3 vim正常模式&#xff08;指令模式&#xff09;命令集 1.4 vim末行模式命令集 1.5 vim配置 2. Linux编译器-gcc/g使用 2.1 背景知识 2.2 gcc如何完成 2.3 gcc选择项 1. Linux编…

Elastic:AI 会开始取代网络安全工作吗?

作者&#xff1a;来自 Elastic Joe DeFever 不会&#xff0c;但它正在从根本上改变这些工作。 生成式 AI (GenAI) 正迅速成为日常安全工作流程中的一个重要组成部分。那么&#xff0c;它是合作伙伴还是竞争对手&#xff1f; GenAI 技术在安全堆栈几乎每个方面的广泛应用&#…

Windows 11 IoT 企业版 LTSC 2025 特制适度 22635.5025

文件: Windows 11 IoT 企业版 LTSC 2025 特制适度 22635.5025 install.esd 大小: 2.57G&#xff08;2768694310 字节&#xff09; 修改时间: 2025年3月9日, 星期日, 11 : 40 : 15 MD5: BFCB23BC2F78CA9243FFA68D5DDDDFC1 SHA1: C4D8BBF8B8D8E0E8E49DE5E9CC8D7F77385A745A CRC32…

Lab18_ SQL injection with filter bypass via XML encoding

文章目录 前言&#xff1a;进入实验室构造 payload 前言&#xff1a; 实验室标题为&#xff1a; 通关 XML 编码绕过过滤器的 SQL 注入 简介&#xff1a; 此实验室的库存检查功能中存在 SQL 注入漏洞。查询结果在应用程序的响应中返回&#xff0c;因此您可以使用 UNION 攻击…

kali虚拟机登录页面发癫 大写锁定输入不了密码

不知道怎么了 总是发癫 重启切换太麻烦了 还有时候不成功 kali其实可以开启虚拟键盘 如下 就解决的 发癫kali 发癫 发癫

【汇编语言】单片机程序执行过程

一、任务需求 指示灯LED4闪烁&#xff0c;亮0.5秒&#xff0c;灭0.5秒&#xff0c;无限循环 二、针对硬件的编程 1、确定原理图2、确定硬件的物理关系 三、设计步骤 1.用自己的语言描述工作流程 1.1指示灯LED4亮1.2延时0.5秒1.3指示灯LED4灭1.4延时0.5秒1.5跳转到1.1步 …

【Linux篇】调试器-gdb/cgdb使用

&#x1f4cc; 个人主页&#xff1a; 孙同学_ &#x1f527; 文章专栏&#xff1a;Liunx &#x1f4a1; 关注我&#xff0c;分享经验&#xff0c;助你少走弯路&#xff01; 文章目录 1. 前言2.关于gdb2.1 快速认识gdb2.2 安装cgdb2.3 gdb命令2.4 调试 & 断点 3.常见技巧3.…

ThinkPhp 5 安装阿里云内容安全(绿化)

composer require alibabacloud/green-20220302 首先要把php5(不支持php7)的执行文件设置到PATH环境变量 此外还要先执行composer update php5.5和php5.6的区别 5.5认为 <? 开头的也是php文件&#xff0c;包括 <?php 5.6认为 <? 开头的不是php文件&#xff0c;只…

Level DB --- 写流程计算

写流程架构Level DB --- 写流程架构-CSDN博客已经介绍&#xff0c;写流程计算包括写日志&#xff0c;和将kv插入到memtable中。 写日志和写memtable 用户端插入的kv数据&#xff0c;既要写日志同时也要写memtable。写日志是指kv记录要同步到日志文件中&#xff1b;写memtable…

JavaWeb-servlet6中过滤器和监听器

Servlet 过滤器 Servlet 过滤器&#xff08;Filter&#xff09;与 Servlet 十分相似&#xff0c;但 Filter 具有拦截客户端请求的功能&#xff0c; Filter 可以改变请求中的内容&#xff0c;以便满足实际开发中的需要。对于程序开发人员而言&#xff0c; Filter 实质上就是 We…

SQL PLUS与Oracle数据库的交互

一、SQL Plus与数据库的交互 可以 使用2种基本类型的命令与数据库进行交互&#xff1a; 服务器执行的命令&#xff1a;SQLQ命令&#xff08;以&#xff1b;结束&#xff09;和PL/SQL程序块&#xff08;以/结束&#xff09; 本地命令&#xff1a;SQL Plus命令 二、设置SQL Pl…

Git系列之git tag和ReleaseMilestone

以下是关于 Git Tag、Release 和 Milestone 的深度融合内容&#xff0c;并补充了关于 Git Tag 的所有命令、详细解释和指令实例&#xff0c;条理清晰&#xff0c;结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交&#xff08;commit&#xf…

WinForm模态与非模态窗体

1、模态窗体 1&#xff09;定义&#xff1a; 模态窗体是指当窗体显示时&#xff0c;用户必须先关闭该窗体&#xff0c;才能继续与应用程序的其他部分进行交互。 2&#xff09;特点&#xff1a; 窗体以模态方式显示时&#xff0c;会阻塞主窗体的操作。用户必须处理完模态窗体上…

关闭Windows安全中心,解析与实操指南

在这个数字化时代&#xff0c;Windows操作系统作为我们日常工作和娱乐的基石&#xff0c;其内置的Windows安全中心&#xff08;Windows Defender Security Center&#xff09;在保护系统安全方面扮演着重要角色。然而&#xff0c;对于某些高级用户或特定需求场景&#xff0c;关…

【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JSP 标签库:自定义标签的开发与应用

<前文回顾> 点击此处查看 合集 https://blog.csdn.net/foyodesigner/category_12907601.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12907601&sharereferPC&sharesourceFoyoDesigner&sharefromfrom_link <今日更新> 一、JSP 标签…

IDEA与Maven使用-学习记录(持续补充...)

1. 下载与安装 以ideaIU-2021.3.1为例&#xff0c;安装步骤&#xff1a; 以管理员身份启动ideaIU-2021.3.1修改安装路径为&#xff1a;D:\Program Files\JetBrains\IntelliJ IDEA 2021.3.1勾选【创建桌面快捷方式】&#xff08;可选&#xff09;、【打开文件夹作为项目】&…

JS中的闭包(closures)一种强大但易混淆的概念

JavaScript 中的闭包&#xff08;closures&#xff09;被认为是一种既强大又易混淆的概念。闭包允许函数访问其外部作用域的变量&#xff0c;即使外部函数已执行完毕&#xff0c;这在状态维护和回调函数中非常有用。但其复杂性可能导致开发者的误解&#xff0c;尤其在变量捕获和…

根据指定 Excel 模板将 Excel 明细数据生成新的 Excel 文档

在日常工作中&#xff0c;我们常常需要生成 Excel 文档&#xff0c;例如将 Excel 数据 自动转换为独立的 Excel 文件。在这种需求下&#xff0c;如何高效地将 Excel 表格 中的每一条数据生成一个对应的 Excel 文档&#xff0c;并且让文档中的内容根据 Excel 数据动态变化&#…

前端杂的学习笔记

什么是nginx Nginx (engine x) 是一个高性能的HTTP和反向代理web服务器 Nginx是一款轻量级的Web 服务器/反向代理服务器&#xff0c;处理高并发能力是十分强大的&#xff0c;并且支持热部署&#xff0c;启动简单&#xff0c;可以做到7*24不间断运行 正代和反代 学习nginx&a…

C#控制台应用程序学习——3.8

一、语言概述 1、平台相关性 C# 主要运行在.NET 平台上。.NET 提供了一个庞大的类库&#xff0c;C# 程序可以方便地调用这些类库来实现各种功能&#xff0c;如文件操作、数据库访问、网络通信等。 2、语法风格 C# 的语法与 C、C 和 Java 有一定的相似性。例如&#xff0c;它使用…