流畅的Python(十七)-使用future处理并发

一、核心要义

主要以三个模拟网络下载的代码-分别是依序下载、使用concurrent.futures模块中的Executor.map方法、以及使用该模块的executor.submitfutures.as_completed方法,来展示Python实现并发编程的其中一种方式。

二、代码示例

1、依序下载的脚本

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/4 20:10
# @Author  : Maple
# @File    : 01-依序下载的脚本.py
# @Software: PyCharm


from random import randint
from concurrent import futures
import time

"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""

# 使用futures的map函数


MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]


def get_randint():
    return randint(1, 10)


def download_one_img(id):
    print('***', id, '号图片开始下载***')
    download_time = get_randint()
    time.sleep(download_time)
    print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
    return str(id) + '号图片内容'


def download_many_img(cc_list):

    for id in cc_list:
        download_one_img(id)

    return len(list(cc_list))


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many_img)

2、futures模块中Executor.map方法实现并发

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/4 20:13
# @Author  : Maple
# @File    : 02-使用future实现并发下载(1).py
# @Software: PyCharm

from random import randint
from concurrent import futures
import time

"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""

# 使用futures的map函数


MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]


def get_randint():
    return randint(1, 10)


def download_one_img(id):
    print('***', id, '号图片开始下载***')
    download_time = get_randint()
    time.sleep(download_time)
    print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
    return str(id) + '号图片内容'


def download_many_img(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    # excutor的__exit__方法会在调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程
    with futures.ThreadPoolExecutor(workers) as excutor:
        # map方法返回一个生成器,因此可以通过迭代,获取每个函数的返回值
        # res是一个迭代器,通过遍历获取每一个函数的返回值
        # 调用map函数,任务立刻就开始执行,但是因为workers = 3,所以最开始只会有3个任务启动执行
        #  直到某一个任务执行完成,该线程才会退让给另外一个任务
        res = excutor.map(download_one_img, cc_list)

    return len(list(res))


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many_img)

执行阶段1:3个线程,开始执行前3个任务 

执行阶段2:1号图片完成下载,被占用的线程被释放,开始下载4号图片

后续阶段,与上面类似。总之,最多同时只有3个任务执行。

3、futures模块中Executor.submit和as_completed方法实现并发

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/4 20:13
# @Author  : Maple
# @File    : 02-使用future实现并发下载(1).py
# @Software: PyCharm

from random import randint
from concurrent import futures
import time
import threading

"""使用concurrent.futures模块,该类的主要特色:
1. ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口分别能在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列
"""

# 使用futures的submit和as_completed函数

MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]


def get_randint():
    return randint(1, 10)


def download_one_img(id):
    t = threading.current_thread()
    print('***{}号图片开始下载,线程id为{}***'.format(id,t.ident))
    download_time = get_randint()
    time.sleep(download_time)
    print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))
    return str(id) + '号图片内容'


def download_many_img(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    # excutor的__exit__方法会在调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程
    with futures.ThreadPoolExecutor(max_workers=workers) as excutor:
        to_do = []
        for cc in sorted(cc_list):
            # 获取当前线程
            t = threading.current_thread()
            # 提交任务,任务立刻就开始执行,但是因为workers = 3,所以最开始只会有3个任务启动执行
            # 直到某一个任务执行完成,该线程才会退让给另外一个任务
            future = excutor.submit(download_one_img, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {},thread_id:{}'

            print(msg.format(cc, future,t.ident))

        results = []
        for furture in futures.as_completed(to_do):
            res = furture.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)

        return len(results)


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many_img)

注意观察结果:

(1) 与Executor.map方法类似,必须等到3号图片下载完成,才会开始下载4号图片

(2) 通过观察线程id,也可以发现一共有3个线程负责图片下载任务

(3) download_one_img的调用方download_many_img,启动的则是另外一个单独的线程

4、futures模块中Executor.map方法详测

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/5 19:50
# @Author  : Maple
# @File    : 04-补充测试Executor-map方法(1).py
# @Software: PyCharm


from random import randint
from time import strftime, sleep
from concurrent import futures


def get_randint():
    return randint(1, 10)


def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s'
    display(msg.format('\t' * n, n, n))
    if n == 0:
        sleep(10)
    else:
        sleep(n)
    msg = '{} loiter({}): done'
    display(msg.format('\t' * n, n))
    return n * 10


def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)

    # executor.map,被调用函数返回结果的顺序和调用的顺序必然一致
    # 比如此例中第一个调用的函数时loiter(0),因为其会sleep(10),导致虽然在这10s内其它的函数已经执行完毕了
    # 都必须等其执行返回,返回结果后,才依序返回其它函数的结果

    results = executor.map(loiter, range(5))
    display('results:', results)
    display('waiting for individual result')
    for i, result in enumerate(results):
        display('result {}:{}'.format(i, result))


if __name__ == '__main__':

    main()

 执行阶段1:主线程先打印Script starting, 然后执行到executor.map(开启另外3个子线程, 由于最大线程数是3,会立刻先启动3个task) , 然后主程序继续向下执行,打印results:xx 和 waiting for....

执行阶段2:等到第一个子线程的任务执行完成(注意,此时并没有返回结果),该线程会开启下一个任务,之后的阶段都相同。

执行阶段3: 按照调用任务的顺序,返回任务的结果。注意:特意为`编号为0`的任务设置了一个比较长的休眠时间(10s), 导致后面所有任务的返回结果都阻塞, 要一直等到该任务休眠结束,返回结果,后面任务的结果才依序返回。

5、futures模块中Executor.submit和as_complete方法详测

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/5 19:50
# @Author  : Maple
# @File    : 04-补充测试Executor-map方法(1).py
# @Software: PyCharm


from random import randint
from time import strftime, sleep
from concurrent import futures


def get_randint():
    return randint(1, 10)


def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s'
    display(msg.format('\t' * n, n, n))
    if n == 0:
        sleep(10)
    else:
        sleep(n)
    msg = '{} loiter({}): done'
    display(msg.format('\t' * n, n))
    # 返回一个元组:第一个元素记录调用函数的id
    return (n,n * 10)


def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    to_do = []



    for i in sorted(range(5)):
        # 通过submit和as_completed方式,函数返回结果的顺序,不受调用顺序的限制:谁先执行完毕,就可以先返回结果
        future = executor.submit(loiter, i)
        to_do.append(future)

    for i,furture in enumerate(futures.as_completed(to_do)):
        res = furture.result()
        # 记录返回结果的id序号及结果
        display('result {}:{}'.format(i,res))



if __name__ == '__main__':

    main()

第一阶段:与Executor.map方法相同

第二阶段:与Executor.map方法不同的地方在于,任务1执行完毕(done)后,直接先返回结果了。不需要等任务编号为0的任务休眠完毕(10s),再返回result结果。

6、futures并发编程捕获异常

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2024/3/6 20:46
# @Author  : Maple
# @File    : 07-并发异常捕获.py
# @Software: PyCharm
import threading
import time
from concurrent import futures
from random import randint

MAX_WORKERS = 3
CC_LIST = [1, 2, 3, 4, 5, 6]


class Status_404(Exception):
    def __init__(self,msg):
        self.msg = msg

class MyConnectionError(Exception):
    def __init__(self,msg):
        self.msg = msg

def get_randint():
    return randint(1, 10)


def download_one_img(id):
    try:
        if id !=3 and id !=4:
            print('***', id, '号图片开始下载***\n')
        elif id == 4:
            # 给4号图片制造一个连接异常
            print('***', id, '号图片开始下载***\n')
            raise MyConnectionError("遇到一个连接异常")
        else:
            # 给3号图片制造一个404异常
            print('***', id, '号图片开始下载***\n')
            raise Status_404("遇到了一个404异常")

    except Status_404 as e:
        # 捕获404异常
        print(e.msg)

    except Exception as e:
        # 其它异常往上抛(调用方)
        raise

    else:
        download_time = get_randint()
        time.sleep(download_time)
        print('***{}号图片下载完成,花费时长{}s***'.format(id, download_time))

    return str(id) + '号图片内容'


def download_many_img(cc_list):

    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(max_workers=workers) as excutor:
        to_do = []
        for cc in sorted(cc_list):
            future = excutor.submit(download_one_img, cc)
            to_do.append(future)

        results = []
        for furture in futures.as_completed(to_do):
            try:
                res = furture.result()
                msg = '{} result: {!r}'
                print(msg.format(future, res))
                results.append(res)

            # 捕获连接异常
            except MyConnectionError as e:
                print(e.msg)

        return len(results)


def main(download_many_img):
    t0 = time.time()
    count = download_many_img(CC_LIST)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))



if __name__ == '__main__':

    main(download_many_img)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/434045.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JS数组,if等结构语序

目录 浏览器的断点调试: 流程控制: 顺序流程控制:流程代码会逐行向下进行。 分支流程控制: IF语句: Switch语句: Switch和if的区别: 三元表达式: 循环: for循环…

XSS漏洞--概念、类型、实战--分析与详解[结合靶场pikachu]

目录 一、XSS概念简述 1、XSS简介: 2、XSS基本原理: 3、XSS攻击流程: 4、XSS漏洞危害: 二、XSS类型: 1、反射型XSS: 2、存储型XSS: 3、DOM型XSS: 三、靶场漏洞复现(pikach…

数据结构之顺序表及其实现!

目录 ​编辑 1. 顺序表的概念及结构 2. 接口的实现 2.1 顺序表的初始化 2.2 检查顺序表容量是否已满 2.3 顺序表的尾插 ​编辑 2.4 顺序表的尾删 2.5 顺序表的头插 2.6 顺序表的头删 2.7 顺序表在pos位置插入 2.8 顺序表在pos位置删除 2.9 顺序表的查找 2.10 顺…

喜报|3DCAT成为国内首批适配Vision Pro内容开发者

近日,苹果在上海总部举办了国内首场 Apple Vision Pro 开发者实验室活动,3DCAT作为国内领先的实时渲染云平台参与了此次活动,成为国内首批适配 Vision Pro 的内容开发者之一。 Vision Pro是苹果于2023年6月发布的首个空间计算设备&#xff0…

Vue+SpringBoot打造超市账单管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统设计3.1 总体设计3.2 前端设计3.3 后端设计在这里插入图片描述 四、系统展示五、核心代码5.1 查询供应商5.2 查询商品5.3 新增超市账单5.4 编辑超市账单5.5 查询超市账单 六、免责说明 一、摘要 1.1 项目介绍 基于…

Redis的三种集群模式(图解)

主从复制模式 一个主节点和多个从节点。主节点提供写入和读取功能,但是从属节点只提供读取功能。 主从复制的数据同步过程如下: (1)首先主节点启动,然后从属节点启动,从属节点会连接主节点并发送SYNC命令以…

JAVA WEB案例-登录校验-日志记录

一 前言 在现代社会中,随着互联网的快速发展,WEB应用的安全性问题变得越来越突出。作为一名程序员,我们不仅要注重WEB应用的功能实现,还需要重视安全性问题。在实际开发中,登录校验是非常重要的安全措施,能…

element-ui配置

全局配置 完整引入 Element: import Vue from vue; import Element from element-ui; Vue.use(Element, { size: small, zIndex: 3000 });按需引入 Element Vue.prototype.$ELEMENT { size: small, zIndex: 3000 };如果是vue.config.js中配置了externals 使用按…

【论文精读】Attention Bottlenecks for Multimodal Fusion 视频分类任务

本文并非逐句翻译,添加个人理解与疑惑,如有需要,请自行阅读原文。 Attention Bottlenecks for Multimodal Fusion 多模态融合的注意力瓶颈 会议:NIPS2021 Benchmark:Audioset、Epic Kitchens和VGGSound等 Backbone&…

数组常见算法

一、数组排序 冒泡排序 本篇我们介绍最基本的排序方法:冒泡排序。 实现步骤 1、比较两个相邻元素,如果第一个比第二个大,就交换位置 2、对每一对相邻元素进行同样的操作,除了最后一个元素 特点 每一轮循环后都会把最大的一个…

2024/3/6打卡最短编辑距离---线性DP

题目: 给定两个字符串 A 和 B,现在要将 A 经过若干操作变为 B,可进行的操作有: 删除–将字符串 A 中的某个字符删除。插入–在字符串 A 的某个位置插入某个字符。替换–将字符串 A 中的某个字符替换为另一个字符。 现在请你求出&a…

蓝桥杯练习题——前缀和

1.壁画 思路 1.求最坏情况下&#xff0c;画的墙总和是多少 2.画的墙在中间连续一段&#xff0c;画了的墙长度是 n / 2 向上取整 3.取最大的 n / 2 向上取整区间和 #include<iostream> using namespace std; const int N 5e6 10; char s[N]; int a[N]; int t, n;int m…

重磅:2024广州国际酒店工程照明展览会

2024广州国际酒店工程照明展览会 Guangzhou international hotel engineering lighting exhibition 2024 时间&#xff1a;2024年12月19-21日 地点&#xff1a;广州.中国进出口商品交易会展馆 承办单位&#xff1a;广州佛兴英耀展览服务有限公司 上海昶文展览服务有限公司…

【详识C语言】自定义类型之二:枚举

本章重点 枚举 枚举类型的定义 枚举的优点 枚举的使用 枚举 枚举顾名思义就是一一列举。 把可能的取值一一列举。 比如我们现实生活中&#xff1a; 一周的星期一到星期日是有限的7天&#xff0c;可以一一列举。 性别有&#xff1a;男、女、保密&#xff0c;也可以一一列举。…

【深度学习笔记】5_8 网络中的网络NiN

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;部分标注了个人理解&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 5.8 网络中的网络&#xff08;NiN&#xff09; 前几节介绍的LeNet、AlexNet和VGG在设计上的共同之处是&#xff1a;先以由卷积层构成的…

Vue.js+SpringBoot开发森林火灾预警系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 系统基础模块2.3 烟雾传感器模块2.4 温度传感器模块2.5 历史记录模块2.6 园区数据模块 三、系统设计3.1 用例设计3.1.1 森林园区基础系统用例设计3.1.2 森林预警数据用例设计 3.2 数据库设计3.2.1 烟雾…

[PyQt5]PyQt5连接MYSQL时显示Driver not loaded解决方案

在第一次用PyQt5的 QSqlDatabase.addDatabase 连接mysql的时候&#xff0c;可能会出现Driver not loaded的情况&#xff0c;如下&#xff1a; from PyQt5.QtSql import QSqlQuery, QSqlDatabase from PyQt5.QtWidgets import QApplication import sysapp QApplication(sys.ar…

2024.3.6

#include<myhead.h>int do_add(sqlite3* ppDb) {int numb;char name[10];int salary;printf("请输入员工的信息&#xff1a;");scanf("%d %s %d",&numb, name, &salary);//将员工的信息存储到数组char sql_add[128] "";sprintf(s…

【K哥爬虫普法】二十五岁 人大本硕 腾讯在职 爬虫被捕!

我国目前并未出台专门针对网络爬虫技术的法律规范&#xff0c;但在司法实践中&#xff0c;相关判决已屡见不鲜&#xff0c;K 哥特设了“K哥爬虫普法”专栏&#xff0c;本栏目通过对真实案例的分析&#xff0c;旨在提高广大爬虫工程师的法律意识&#xff0c;知晓如何合法合规利用…

你知道katalon studio 如何完成 get/post 请求发送吗?

katalon studio作为目前最火的自动化测试工具之一&#xff0c;不仅仅只能完成webUI自动化&#xff0c;更是能完成api、app以及桌面应用程序的自动化测试。 本文将讲解一下katalon studio是如果完成接口测试的。 请求发送 get请求 1、先在object repository里new一个请求 2、…