Python 协程:使用 `asyncio` 高效管理并发任务

Python 协程:使用 asyncio 高效管理并发任务

随着现代应用程序对并发执行的需求越来越高,异步编程(Asynchronous Programming)成为了提高性能的重要工具。Python 通过 asyncio 提供了内置的支持,允许我们以非阻塞的方式处理 I/O 密集型任务。

本文将带你从基础到进阶,了解如何使用 Python 协程与 asyncio 库管理并发任务。我们将通过实际的代码示例,并对每个示例进行优化,增加任务耗时的打印,帮助你更好地理解异步编程。


1. 协程与异步编程简介

在 Python 中,协程是一个可以暂停执行并在以后恢复执行的函数。你可以通过 async def 来定义一个协程函数,而 await 用于暂停协程的执行,直到某个异步操作完成。

异步编程的核心优势是能够并发地执行 I/O 密集型任务,而不需要阻塞主线程。例如,进行大量的文件读写或网络请求时,异步编程能够帮助你高效地利用系统资源,提高程序的响应速度。


2. 协程基本用法

2.1 启动单个协程

首先,我们来看一个简单的异步任务。下面的代码展示了如何定义和执行一个基本的异步任务,使用 asyncio.sleep() 模拟一个耗时的 I/O 操作。

import asyncio
import time

async def main():
    print("Start")
    start_time = time.time()
    await asyncio.sleep(1)  # 模拟一个异步操作,例如 I/O 操作
    print(f"End (took {time.time() - start_time:.2f} seconds)")

# 运行事件循环
asyncio.run(main())

输出:

Start
End (took 1.01 seconds)

在这个例子中,main 是一个简单的协程函数,模拟了一个耗时 1 秒的操作。通过 await asyncio.sleep(1) 暂停协程的执行,直到这个异步操作完成。

2.2 多个协程的串行执行

如果你有多个异步任务,并且它们依赖于前一个任务的完成,那么你可以串行地执行它们。下面是一个串行执行的例子:

import asyncio
import time

async def main():
    print("Start")
    start_time = time.time()
    await asyncio.sleep(1)  # 模拟第一个异步操作
    await asyncio.sleep(1)  # 模拟第二个异步操作
    print(f"End (took {time.time() - start_time:.2f} seconds)")

# 运行事件循环
asyncio.run(main())

输出:

Start
End (took 2.02 seconds)

在这个例子中,我们增加了一个额外的 await asyncio.sleep(1),模拟了连续的异步操作。这两个操作是串行执行的,因此总共耗时 2 秒。


3. 创建并发任务

3.1 使用 asyncio.create_task() 创建并发任务

asyncio.create_task() 是一个非常重要的函数,它用于将协程包装成任务并安排执行。与 asyncio.gather() 不同,create_task() 是一种更灵活的方式来创建和调度异步任务。

下面是使用 asyncio.create_task() 创建并发任务的一个简单示例:

import asyncio
import time

async def task(name):
    await asyncio.sleep(1)
    print(f"{name} completed")

async def main():
    print("Start")
    start_time = time.time()
    
    # 使用 asyncio.create_task 创建并发任务
    task1 = asyncio.create_task(task("task1"))
    task2 = asyncio.create_task(task("task2"))
    
    # 等待所有任务完成
    await task1
    await task2
    
    print(f"End (took {time.time() - start_time:.2f} seconds)")

# 运行事件循环
asyncio.run(main())

输出:

Start
task1 completed
task2 completed
End (took 1.01 seconds)

在这个例子中,task1task2 是通过 asyncio.create_task() 创建的,它们会并发执行。


3.2 使用 asyncio.gather() 创建并发任务

如果你希望同时执行多个任务,使用 asyncio.gather() 可以让你并发地运行多个协程,直到它们全部完成。下面是一个例子:

import asyncio
import time

async def task(name):
    await asyncio.sleep(1)
    print(f"{name} completed")

async def main():
    print("Start")
    start_time = time.time()
    # 并发执行两个任务
    await asyncio.gather(task('task1'), task('task2'))
    print(f"End (took {time.time() - start_time:.2f} seconds)")

# 运行事件循环
asyncio.run(main())

输出:

Start
task1 completed
task2 completed
End (took 1.01 seconds)

在这个例子中,task1task2 会并发执行,因此总的执行时间为 1 秒,而不是 2 秒。


3.3 使用 asyncio.TaskGroup 创建并发任务

在 Python 3.11 中,asyncio 引入了 TaskGroup,这是一个更高级的任务管理工具。通过 TaskGroup,你可以动态创建和管理多个异步任务,同时确保所有任务在退出时都已完成或抛出异常。

下面是一个使用 TaskGroup 执行并发任务的例子:

import asyncio
import time

async def task(name):
    await asyncio.sleep(1)
    print(f"{name} completed")
    return name

async def main():
    print("Start")
    start_time = time.time()
    
    # 使用 TaskGroup 并发执行任务
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task("task1"))
        tg.create_task(task("task2"))
    
    print(f"End (took {time.time() - start_time:.2f} seconds)")

# 运行事件循环
asyncio.run(main())

输出:

Start
task1 completed
task2 completed
End (took 1.01 seconds)

在这个例子中,我们使用 async with asyncio.TaskGroup() 创建了一个任务组,并通过 create_task 动态地将任务添加到任务组中。所有任务会并发执行,并且任务组会确保在所有任务完成后才会退出。

4. 任务异常处理

如果任务中的任何一个发生异常,TaskGroup 会自动取消所有其他未完成的任务,并抛出第一个异常。这是 TaskGroup 提供的一个非常实用的功能,它保证了任务的原子性。

import asyncio
import time

async def task(name):
    await asyncio.sleep(1)
    if name == "task2":
        raise ValueError("An error occurred in task2")
    print(f"{name} completed")
    return name

async def main():
    print("Start")
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task("task1"))
            tg.create_task(task("task2"))
    except Exception as e:
        print(f"Error: {e}")
    print("End")

# 运行事件循环
asyncio.run(main())

输出:

Start
task1 completed
Error: unhandled errors in a TaskGroup (1 sub-exception)
End

5. 回调函数的使用

如果你希望在任务完成后执行某些操作(如日志记录或清理工作),你可以为任务设置回调函数。回调函数会在任务完成时被调用,传递任务的结果。

import asyncio
import time

async def task(name):
    await asyncio.sleep(1)
    print(f"{name} completed")
    return name

def callback(task):
    print(f"Callback: {task.result()}")

async def main():
    print("Start")
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(task("task1"))
        task2 = tg.create_task(task("task2"))
        
        # 为任务添加回调函数
        task1.add_done_callback(callback)
        task2.add_done_callback(callback)
    
    print("End")

# 运行事件循环
asyncio

.run(main())

输出:

Start
task1 completed
task2 completed
Callback: task1
Callback: task2
End

5. 超时与等待

5.1 asyncio.wait_for

使用 asyncio.wait_for() 可以为协程设置超时时间,超过这个时间后会抛出 TimeoutError

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} completed"

async def main():
    try:
        result = await asyncio.wait_for(task('task1', 3), timeout=2)  # 设置 2 秒超时
        print(result)
    except asyncio.TimeoutError:
        print("The task timed out!")

asyncio.run(main())

5.2 asyncio.wait

asyncio.wait 允许你同时等待多个任务的完成,并且可以设置超时时间。

asyncio.wait 是 Python 中 asyncio 库的一个函数,它用于等待多个异步任务的完成。与 asyncio.gather 不同,asyncio.wait 返回的是两个集合,分别是已完成(done)和未完成(pending)任务的集合,可以让你更加灵活地处理任务的结果。

asyncio.wait 函数的签名:

asyncio.wait(fs, timeout=None, return_when=ALL_COMPLETED)
参数:
  • fs:一个可迭代对象,包含多个协程任务、FutureTask 对象。
  • timeout(可选):一个浮动的时间值(以秒为单位),如果在此时间内任务未完成,则函数会返回,且未完成的任务仍然处于挂起状态。
  • return_when(可选):确定什么时候返回。可取以下值:
    • asyncio.ALL_COMPLETED(默认):等待所有任务完成。
    • asyncio.FIRST_COMPLETED:等待第一个任务完成。
    • asyncio.FIRST_EXCEPTION:等待第一个任务完成或抛出异常。
返回:

返回一个元组 (done, pending),其中:

  • done 是已完成的任务集合。
  • pending 是未完成的任务集合。

示例代码

1. 等待所有任务完成
import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"{name} completed")

async def main():
    tasks = [
        asyncio.create_task(task('task1', 3)),
        asyncio.create_task(task('task2', 2)),
        asyncio.create_task(task('task3', 1))
    ]
    
    # 等待所有任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    
    print("All tasks completed")
    for t in done:
        print(f"{t.result()}")

# 运行事件循环
asyncio.run(main())

输出:

task3 completed
task2 completed
task1 completed
All tasks completed
Task finished
Task finished
Task finished

在这个例子中,我们使用 asyncio.wait 等待所有任务完成。它返回了两个集合,donepending。由于我们没有设置超时,因此它会等到所有任务完成后返回。

2. 等待第一个任务完成
import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"{name} completed")

async def main():
    tasks = [
        asyncio.create_task(task('task1', 3)),
        asyncio.create_task(task('task2', 2)),
        asyncio.create_task(task('task3', 1))
    ]
    
    # 等待第一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print("At least one task is completed")
    for t in done:
        print(f"{t.result()}")

# 运行事件循环
asyncio.run(main())

输出:

task3 completed
At least one task is completed
Task finished

在这个例子中,asyncio.wait 通过设置 return_when=asyncio.FIRST_COMPLETED,会在第一个任务完成时立即返回,不会等待所有任务完成。

3. 等待第一个抛出异常的任务
import asyncio

async def task(name, delay, should_fail=False):
    await asyncio.sleep(delay)
    if should_fail:
        raise Exception(f"{name} failed")
    print(f"{name} completed")
    return f"{name} finished"

async def main():
    tasks = [
        asyncio.create_task(task('task1', 3)),
        asyncio.create_task(task('task2', 2, should_fail=True)),
        asyncio.create_task(task('task3', 1))
    ]
    
    # 等待第一个任务完成或抛出异常
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    
    print("At least one task is completed or failed")
    for t in done:
        if t.exception():
            print(f"Task failed with exception: {t.exception()}")
        else:
            print(f"Task result: {t.result()}")

# 运行事件循环
asyncio.run(main())

输出:

task3 completed
task2 failed with exception: task2 failed
At least one task is completed or failed
Task result: task3 finished

在这个例子中,asyncio.wait 设置了 return_when=asyncio.FIRST_EXCEPTION,它会在第一个任务抛出异常时停止等待。返回的 done 集合会包含已完成的任务(包括抛出异常的任务)。我们可以通过 task.exception() 获取任务的异常信息。

4. 设置超时
import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} completed"

async def main():
    tasks = [
        asyncio.create_task(task('task1', 3)),
        asyncio.create_task(task('task2', 2)),
        asyncio.create_task(task('task3', 1))
    ]
    
    # 等待最多 2 秒,之后返回
    done, pending = await asyncio.wait(tasks, timeout=2)
    
    print("Waiting completed (with timeout)")
    for t in done:
        print(f"{t.result()}")

    # 检查未完成的任务
    for t in pending:
        print(f"Task {t.get_name()} is still pending.")

# 运行事件循环
asyncio.run(main())

输出:

task3 completed
task2 completed
Waiting completed (with timeout)
Task task1 is still pending.

在这个例子中,asyncio.wait 使用了 timeout=2 来设定最多等待 2 秒。在 2 秒内,task1 没有完成,因此它被放入了 pending 集合中,而其他任务已经完成。

总结

  • asyncio.wait() 允许你并行等待多个任务,它会返回两个集合:done(已完成的任务)和 pending(未完成的任务)。
  • 可以通过 return_when 控制何时返回,支持以下选项:
    • asyncio.ALL_COMPLETED:等待所有任务完成。
    • asyncio.FIRST_COMPLETED:等待第一个任务完成。
    • asyncio.FIRST_EXCEPTION:等待第一个任务抛出异常。
  • 你可以设置 timeout 来限制等待的最大时间。
  • done 集合中的任务可以通过 task.result() 获取任务的结果,或通过 task.exception() 获取任务抛出的异常。

5.3 asyncio.timeout(Python 3.11+)

Python 3.11 引入了 asyncio.timeout,这为超时控制提供了更加简洁的语法。

import asyncio

async def long_running_task():
    await asyncio.sleep(5)  # 模拟一个耗时的异步任务
    return "Task completed"

async def main():
    try:
        async with asyncio.timeout(3):  # 设置 3 秒的超时
            result = await long_running_task()  # 如果超时,将抛出 TimeoutError
            print(result)
    except asyncio.TimeoutError:
        print("Task timed out!")

asyncio.run(main())

6. 总结

本文展示了如何使用 Python 3.11 中的 asyncio 进行异步编程。我们从基础的单个任务执行开始,逐步介绍了如何并发执行多个任务、如何使用 TaskGroup 管理任务、以及如何通过 asyncio.create_task() 创建并发任务并调度执行。

asyncio.create_task() 是非常常用的方式,它能够直接将协程转换成任务,并将任务加入事件循环。它比 asyncio.gather() 更灵活,适用于需要更复杂任务调度和控制的场景。希望本文能够帮助你更好地理解 Python 协程和 asyncio 库的使用,提升你在处理并发任务时的效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916954.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux下MySQL的简单使用

Linux下MySQL的简单使用 导语MySQL安装与配置MySQL安装密码设置 MySQL管理命令myisamchkmysql其他 常见操作 C语言访问MYSQL连接例程错误处理使用SQL 总结参考文献 导语 这一章是MySQL的使用,一些常用的MySQL语句属于本科阶段内容,然后是C语言和MySQl之…

动态规划-背包问题——1049.最后一块石头的重量II

1.题目解析 题目来源 1049.最后一块石头的重量II——力扣 测试用例 2.算法原理 首先需要将该问题转化为0-1背包问题后再做分析 1.状态表示 根据数学中的知识我们知道将一个数字分为两个子数后求这两个子数的最小差值,那么就要求这两个子数尽可能接近于原数字的一…

MarkDown语法入门【保姆级教程】

MarkDown语法介绍 Markdown是一种轻量级标记语言 关于MarkDown语法的定义,官方已经有概述了: Markdown是一种轻量级标记语言,排版语法简洁,让人们更多地关注内容本身而非排版。它使用易读易写的纯文本格式编写文档,可…

5G与4G互通的桥梁:N26接口

5G的商用部署进程将是一个基于4G系统进行的长期的替换、升级、迭代的过程,4G系统是在过渡到5G全覆盖过程中,作为保障用户业务连续性体验这一目的的最好补充。 因此4G/5G融合组网,以及互操作技术将是各大运营商在网络演进中需要重点考虑的问题…

Transformer中的算子:其中Q,K,V就是算子

目录 Transformer中的算子 其中Q,K,V就是算子 一、数学中的算子 二、计算机科学中的算子 三、深度学习中的算子 四、称呼的由来 Transformer中的算子 其中Q,K,V就是算子 “算子”这一称呼源于其在数学、计算机科学以及深度学习等多个领域中的广泛应用和特定功能。以下是…

【UGUI】Unity 游戏开发:背包系统初始化道具教程

在游戏开发中,背包系统是一个非常常见的功能模块。它允许玩家收集、管理和使用各种道具。今天,我们将通过一个简单的示例来学习如何在 Unity 中初始化一个背包系统。我们将使用 Unity 2021.3.7 版本,并结合 C# 脚本来实现这一功能。 1. 场景…

Web端、App端的日志查看

开发和测试过程中,日志是定位问题的重要工具之一。无论是Web端还是App端,日志的作用如同医生的诊断报告,可以帮我们快速找到问题的根源。那么,如何高效查看并分析这些日志呢? 面对Web端和App端的不同特点,…

机器学习基础02_特征工程

目录 一、概念 二、API 三、DictVectorize字典列表特征提取 四、CountVectorize文本特征提取 五、TF-IDF文本1特征词的重要程度特征提取 六、无量纲化预处理 1、MinMaxScaler 归一化 2、StandardScaler 标准化 七、特征降维 1、特征选择 VarianceThreshold 底方差…

SpringCloud-使用FFmpeg对视频压缩处理

在现代的视频处理系统中,压缩视频以减小存储空间、加快传输速度是一项非常重要的任务。FFmpeg作为一个强大的开源工具,广泛应用于音视频的处理,包括视频的压缩和格式转换等。本文将通过Java代码示例,向您展示如何使用FFmpeg进行视…

释放高级功能:Nexusflows Athene-V2-Agent在工具使用和代理用例方面超越 GPT-4o

在不断发展的人工智能领域,Nexusflows 推出了 Athene-V2-Agent 作为其模型系列的强大补充。这种专门的代理模型设计用于在功能调用和代理应用中发挥出色作用,突破了人工智能所能达到的极限。 竞争优势 Athene-V2-Agent 不仅仅是另一种人工智能模型&…

自己动手写Qt Creator插件

文章目录 前言一、环境准备1.先看自己的Qt Creator IDE的版本2.下载源码 二、使用步骤1.参考原本的插件2.编写自定义插件1.cmakelist增加一个模块2.同理,qbs文件也增加一个3.插件源码 三、效果总结 前言 就目前而言,Qt Creator这个IDE,插件比…

网上商城系统设计与Spring Boot框架

3 系统分析 当用户确定开发一款程序时,是需要遵循下面的顺序进行工作,概括为:系统分析–>系统设计–>系统开发–>系统测试,无论这个过程是否有变更或者迭代,都是按照这样的顺序开展工作的。系统分析就是分析系…

【时间之外】IT人求职和创业应知【37】-AIGC私有化

目录 新闻一:2024智媒体50人成都会议暨每经20周年财经媒体峰会召开 新闻二:全球机器学习技术大会在北京召开 新闻三:区块链技术在金融领域的应用取得新突破 不知不觉的坚持了1个月,按照心理学概念,还要坚持2个月&am…

双子数(枚举素数)

#include <iostream> #include <vector> #include <cmath> using namespace std;vector<long long> generate(long long n) {vector<bool> is(n 1, true);// 标记是否为素数&#xff0c;初始值全为 truevector<long long> v;is[0] is[1]…

硬盘物理故障的表现、原因和解决方法全解析

硬盘作为计算机数据存储的核心部件&#xff0c;其稳定性和可靠性直接关系到数据的完整性和系统的正常运行。然而&#xff0c;硬盘在使用过程中可能会遇到各种故障&#xff0c;其中物理故障是最具破坏性和难以修复的一类。 一、硬盘物理故障的表现 1、异常声音 硬盘在运行时发…

如何查看电脑关机时间

要查看电脑的关机时间&#xff0c;可以按照以下步骤进行操作&#xff1a; 1. 打开事件查看器&#xff1a;按下键盘上的Windows键R键&#xff0c;然后在弹出的运行对话框中输入"eventvwr.msc"&#xff0c;并按下Enter键。 2. 在事件查看器窗口中&#xff0c;单击左侧窗…

【MyBatis源码】深入分析TypeHandler原理和源码

&#x1f3ae; 作者主页&#xff1a;点击 &#x1f381; 完整专栏和代码&#xff1a;点击 &#x1f3e1; 博客主页&#xff1a;点击 文章目录 原始 JDBC 存在的问题自定义 TypeHandler 实现TypeHandler详解BaseTypeHandler类TypeReference类型参考器43个类型处理器类型注册表&a…

对话 OpenCV 之父 Gary Bradski:灾难性遗忘和持续学习是尚未解决的两大挑战 | Open AGI Forum

作者 | Annie Xu 采访、责编 | Eric Wang 出品丨GOSIM 开源创新汇 Gary Bradski&#xff0c;旺盛的好奇心、敢于冒险的勇气、独到的商业视角让他成为计算视觉、自动驾驶领域举重若轻的奠基者。 Gary 曾加入 Stanley 的团队&#xff0c;帮助其赢得 2005 年美国穿越沙漠 DA…

IDEA 开发工具常用快捷键有哪些?

‌在IDEA中&#xff0c;输出System.out.println()的快捷键是sout&#xff0c;输入后按回车&#xff08;或Tab键&#xff09;即可自动补全为System.out.println()‌‌。 此外&#xff0c;IDEA中还有一些其他常用的快捷键&#xff1a; 创建main方法的快捷键是psvm&#xff0c;代…

el-table合并单元格之后,再进行隔行换色的且覆盖表格行鼠标移入的背景色的实现

el-table 中有现成的隔行换色功能&#xff0c;只要增加 stripe 属性即可。但是如果有单元格合并的话&#xff0c;这个属性就不可用了。这时候我们就需要动点小心思了。 基于相同字段进行合并 单元格合并&#xff1a;基于表头中的某一列&#xff0c;具有相同值的个数相加进行合…