Promise的并发控制 - 从普通并发池到动态并发池

一、场景

        给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。

        这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制 

二、普通并发池的实现

        主要思路就是,判断当前队列是否满,满则等待,有空闲则补齐。

        利用 Promise.race 方法,可以判断一个Promise数组中 “谁最先完成” ,从而让等待中的函数开始运行。

/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
 * @param taskList 任务列表
 * @param max 最大并发数量
 * @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条
 * @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
 */
export const promisePool = <T>(taskList: task<T>[], limit: number) => {
    return new Promise<T[]>(async (resolve, reject) => {
        try {
            const length = taskList.length
            /**当前并发池 */
            const pool: Promise<T>[] = []
            /**结果数组 */
            const res = new Array<T>(length)
            /**完成的数量 */
            let count = 0

            for (let i = 0; i < length; i++) {
                const task = taskList[i]();
                //promise结束的回调
                const handler = (info: T) => {
                    pool.splice(pool.indexOf(task), 1) //任务执行完就删除
                    res[i] = info //不能使用res.push,否则不能保证结果顺序
                    count++
                    if (count === length) {
                        resolve(res)
                    }
                }
                task.then((data) => {
                    handler(data)
                    console.log(`第${i}个任务完成,结果为`, data);
                }, (err) => {
                    handler(err)
                    console.log(`第${i}个任务失败,原因为`, err);
                })


                pool.push(task)

                //如果到达了并发限制,就等到池子中任意一个结束
                if (pool.length >= limit) {
                    await Promise.race(pool)
                }
            }
        } catch (error) {
            console.error('并发池出错', error);
            reject(error)
        }
    })
}

测试用例:


        /**创造一个1s后得到结果的Promise */
    const getTask = () => {
        return async () => {
            await new Promise((resolve) => setTimeout(resolve, 1000))
            return new Date()
        }
    }

//测试用例:
const testIt = async () => {
    const list = new Array(20).fill(0).map(() => getTask())
    const res = await promisePool(list, 5)
    console.log('res', res);
}
testIt()

打印结果:(观察控制台,可以发现是五个五个出现的)

三、让并发池可中断

        好,现在来了个新要求,用户点击了取消按钮后,你需要中断继续往并发池添加任务。 (常见场景:分片上传时,用户点击取消上传按钮)

        问题的关键核心就是,如何从外部 让内部的循环终止。 其实也很简单,设置一个变量,初始为false,当用户点击取消按钮时,变量变为true。在for循环中检测这个变量的值,为true就退出循环

        但是我们不能将这个变量设置为全局变量!否则如果有多处需要使用这个并发池,一处中断,全部遭殃。 在这里,我们就可以利用面向对象的思想,把这个变量作为对象内部的值,每个实例之间独立。“你终止你的,关我什么事? ” 

/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{
    /**是否取消。在循环中若发现这个变成了true,就会中断 */
    private isStop = false
    /**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
     * @param taskList 任务列表
     * @param max 最大并发数量
     * @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
     */
    run = async (taskList: task<T>[], max: number) => {
        return new Promise<Array<T | Err>>(async (resolve, reject) => {
            type resType = T | Err
            try {
                this.isStop = false //开始的时候设为false
                const length = taskList.length
                const pool: Promise<resType>[] = []//并发池 
                let count = 0//当前结束了几个
                const res = new Array<resType>(length)
                for (let i = 0; i < length; i++) {
                    let task = taskList[i]();
                    if (this.isStop) return reject('并发池终止')
                    //成功和失败都要执行的函数
                    const handler = (_res: resType) => {
                        pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务
                        res[i] = _res //放入结果数组
                        count++
                        if (count === length) {
                            return resolve(res)
                        }
                    }

                    task.then((data) => {
                        handler(data)
                        console.log(`第${i}个任务完成,结果为`, data);
                    }, (err) => {
                        handler(err)
                        console.log(`第${i}个任务失败,原因为`, err);
                    })

                    pool.push(task);

                    if (pool.length === max) {
                        //利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满
                        await Promise.race(pool)
                    }
                }

            } catch (error) {
                console.error('promise并发池出错', error);
                reject(error)
            }
        })
    }
    /**停止并发池运行 */
    stop = () => {
        this.isStop = true
    }
}

测试用例:

/**可终止的并发池测试用例 */
const promisePoolStaticTest = () => {
    const list = new Array(18).fill(0).map(() => getTask())
    const pool = new PromisePoolStatic()
    pool.run(list, 3).catch((err) => {
        console.log('可终止的并发池测试用例出错 -- ', err)
    })
    //18个任务,每个花费1s完成,并发数量为3,共需要6s完成
    //我们在第三秒的时候中断
    setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()

结果如下:

        可以看到第九个任务结束之后,并发池没有进入新的任务了。 但是为什么已经终止了,还有Promise完成的回调打印出来? 因为执行终止函数时,并发池内仍有三个函数在运行,而正在运行的Promise无法终止,所以只能阻止新任务进入并发池  (虽然无法终止Promise,但是可以终止Promise完成后的操作,这里不阐述)

四、动态并发池

        现在前面完成的操作,都是已经确定好了任务列表,才进行并发控制。如果我们需要动态添加任务的效果,如果队列没满就运行,队满则挂起等待,应该怎么做呢? (常见场景:全局axios请求并发控制

        主要思路: 队未满则直接运行,队满则加入等待队列。任务完成后,检查等待队列是否有任务


type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }


/**动态并发池 */
export class PromisePoolDynamic<T> {
    /**最大并发数量 */
    private limit: number;
    /**当前正在跑的数量 */
    private runningCount: number;
    /**等待队列 */
    private queue: Array<taskWithCallbacks<T>>;

    /**动态并发池 - 构造函数
     * @param maxConcurrency 最大并发数量
     */
    constructor(maxConcurrency: number) {
        this.limit = maxConcurrency;
        this.runningCount = 0;
        this.queue = [];
    }

    /**添加任务
     * @param task 任务,() => Promise<T>
     * @returns 结果
     */
    addTask(task: task<T>) {
        //返回一个新的Promise实例,在任务完成前,会一直是pending状态
        return new Promise<T>((resolve, reject) => {
            const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;
            if (this.runningCount < this.limit) {//并发数量没满则运行
                console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');
                this.runTask(taskWithCallbacks);
            } else {//并发数量满则加入等待队列
                console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');
                this.queue.push(taskWithCallbacks);
            }
        });
    }
    /**运行任务
     * @param taskWithCallback 带有resolve和reject的任务
     */
    private runTask(taskWithCallback: taskWithCallbacks<T>) {
        this.runningCount++;//当前并发数++
        taskWithCallback.task()//从对象中取出任务执行
            .then(result => {
                this.runningCount--;
                taskWithCallback.resolve(result);
                console.log('任务完成', result, '当前并发数', this.runningCount);
                this.checkQueue();
            })
            .catch(error => {
                this.runningCount--;
                taskWithCallback.reject(error);
                this.checkQueue();
            });
    }
    /**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */
    private checkQueue() {
        if (this.queue.length > 0 && this.runningCount < this.limit) {
            const nextTask = this.queue.shift()!;
            console.log('并发池出现空位,取出等待队列的任务', nextTask);
            this.runTask(nextTask);
        }
    }
}

测试用例:

/**动态并发池的测试用例 */
const promisePoolDynamicTest = () => {
    const promisePoolDynamic = new PromisePoolDynamic(3) //一个最大并发3的动态并发池
    //最大并发3,我一次性添加7个任务
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()

测试结果:

五、结语

        关于并发池就到这里了。除了利用Promise.race,其实还可以递归等方式,不过Promise.race是最简单也是最容易理解的。

        如果代码中有哪里出现的不对,欢迎指出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/115902.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【word技巧】ABCD选项如何对齐?

使用word文件制作试卷&#xff0c;如何将ABCD选项全部设置对齐&#xff1f;除了一直按空格或者Tab键以外&#xff0c;还有其他方法吗&#xff1f;今天分享如何将ABCD选项对齐。 首先&#xff0c;我们打开【替换和查找】&#xff0c;在查找内容输入空格&#xff0c;然后点击全部…

基于野狗算法的无人机航迹规划-附代码

基于野狗算法的无人机航迹规划 文章目录 基于野狗算法的无人机航迹规划1.野狗搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用野狗算法来优化无人机航迹规划。 1.野狗搜索算法 …

微信小程序:实现多个按钮提交表单

效果 核心步骤 通过data-type给不同按钮进行设置&#xff0c;便于很好的区分不同按钮执行不同功能 data-type"" 完整代码 wxml <form action"" bindsubmit"formSubmit"><button style"margin-bottom:5%" data-type"pa…

​C++内存模型

c语言分区:栈、堆、全局/静态存储区、常量存储区、代码区(.text段)、自由存储区 1、栈区&#xff08;stack&#xff09;— 由编译器自动分配释放&#xff0c;存放函数的参数值&#xff0c;局部变量的值等。其操作方式类似于数据结构中的栈。向下生长 2、堆区&#xff08;heap&…

超声波俱乐部分享:百度世界大会点燃AI创业者新希望

10月22日&#xff0c;2023年第十三期超声波俱乐部内部分享会在北京望京举行。本期的主题是&#xff1a;百度世界大会点燃AI创业者新希望。 到场的嘉宾有&#xff1a;超声波创始人杨子超&#xff0c;超声波联合创始人、和牛商业创始人刘思雨&#xff0c;中国国际经济交流中心研…

MFC String类的初始化学习

之前写过CString的用法&#xff1b; VC CString 编程实例图解_bcbobo21cn, cstring-CSDN博客 下面单独看一下CString的各种初始化方式&#xff1b; void CTest2View::OnDraw(CDC* pDC) {CTest2Doc* pDoc GetDocument();ASSERT_VALID(pDoc);// TODO: add draw code for nati…

vscode1.83远程连接失败

&#xff08;报错信息忘记截图了 总之卡在vscode-server.tar.gz的下载那里&#xff0c;一直404&#xff0c;删了C:\Users\Administrator\.ssh\known_hosts也不管用 看了一下vscode1.83的commitID为a6606b6ca720bca780c2d3c9d4cc3966ff2eca12&#xff0c;网友说可以通过以下网…

C++笔记之动态数组的申请和手动实现一个简单的vector

C笔记之动态数组的申请和手动实现一个简单的vector code review! 文章目录 C笔记之动态数组的申请和手动实现一个简单的vector1.C语言中动态数组的申请与使用1.动态数组的申请使用new和delete使用std::vector 1.std::vector的底层实现2.手动实现一个简单的vector:使用一个指向…

Java设计模式之观察者模式

目录 定义 结构 案例 优点 缺点 使用场景 JDK源码解析 定义 又被称为发布-订阅&#xff08;Publish/Subscribe&#xff09;模式&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态变化时&#xff0c;会…

iSlide2024一款基于PPT的插件工具包含38个设计辅助功能

根据使用者情况表明iSlide 是一款拥有30W素材的PPT高效设计软件&#xff0c;可提高90%工作效率&#xff0c;现全球已有超过1400万使用者&#xff0c;智能排版原创高品模板可商用图形&#xff0c;真正摆脱PPT的束缚&#xff0c;把精力用在该用的地方。我们都明白islide插件功能特…

PYTHON学习

元组不可修改&#xff1a; 元组支持下标索引。 字符串也是容器&#xff0c;不支持修改。

selenium自动化测试入门 —— 设置等待时间

time.sleep(3) 固定等待3秒 driver.implicitly_wait(10) 隐性的等待&#xff0c;对应全局 WebDriverWait( driver, timeout).until(‘有返回值的__call__()方法或函数’) 显性的等待&#xff0c;对应到元素 一、time.sleep(seconds) 固定等待 import time time.sleep(3) #…

Gopro hero5运动相机格式化后恢复案例

Gopro运动相机以稳定著称&#xff0c;旗下的Hero系列销售全球。下面我们来看一个Hero5格式化后拍了少量素材的恢复案例。 故障存储:64G MicroSD卡 Exfat文件系统 故障现象: 64G的卡没备份数据时做了格式化操作又拍了一条&#xff0c;发现数据没有备份&#xff0c;客户自行使…

Pytorch 文本情感分类案例

一共六个脚本,分别是: ①generateDictionary.py用于生成词典 ②datasets.py定义了数据集加载的方法 ③models.py定义了网络模型 ④configs.py配置一些参数 ⑤run_train.py训练模型 ⑥run_test.py测试模型 数据集https://download.csdn.net/download/Victor_Li_/88486959?spm1…

《研发效能(DevOps)工程师》课程简介(三)丨IDCF

在研发效能领域中&#xff0c;【开发与交付】的学习重点在于掌握高效的开发工具和框架&#xff0c;了解敏捷开发方法&#xff0c;掌握持续集成与持续交付技术&#xff0c;以及如何保证应用程序的安全性和合规性等方面。 由国家工业和信息化部教育与考试中心颁发的职业技术证书…

【多线程】龟兔赛跑

package org.example;public class Race implements Runnable {//胜利者private static String winner;Overridepublic void run() {for(int i0;i<100;i){boolean flag gameOver(i);//如果flag>100,结束比赛if(flag){break;}System.out.println(Thread.currentThread().g…

Vue实现消费清单明细饼图展示

功能 可以进行消费项增删消费额大于500会标红消费金额合计饼图展示消费项 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-…

U盘显示无媒体怎么办?方法很简单

当出现U盘无媒体情况时&#xff0c;您可以在磁盘管理工具中看到一个空白的磁盘框&#xff0c;并且在文件资源管理器中不会显示出来。那么&#xff0c;导致这种问题的原因是什么呢&#xff1f;我们又该怎么解决呢&#xff1f; 导致U盘无媒体的原因是什么&#xff1f; 当您遇到上…

错误号码2058 Plugin caching_sha2_password could not be loaded:vX八白白白白白令自砸

sqlyog连接数据库时报错&#xff1a; 错误号码2058 Plugin caching_sha2_password could not be loaded:vX八白白白白白令自砸 网上查了资料&#xff0c;是MySQL 从 8.0 版本开始加密方式改变导致的原因。具体的咋也不再这里分析了&#xff0c;就直说如何解决这个问题。下面三…

软考之软件工程基础理论知识

软件工程基础 软件开发方法 结构化方法 将整个系统的开发过程分为若干阶段&#xff0c;然后依次进行&#xff0c;前一阶段是后一阶段的工作依据按顺序完成。应用最广泛。特点是注重开发过程的整体性和全局性。缺点是开发周期长文档设计说明繁琐&#xff0c;工作效率低开发前要…