Node.JS多线程PromisePool之promise-pool库实现

什么是Promise Pool

Map-like, concurrent promise processing for Node.js.

Promise-Pool是一个用于管理并发请求的JavaScript库,它可以限制同时进行的请求数量,以避免过多的请求导致服务器压力过大。使用Promise-Pool可以方便地实现对多个异步操作的并发控制。

Promise Pool “承诺池” 包允许您批量运行许多承诺。

承诺池确保并发处理任务的最大数量。

承诺池中的每个任务都是其他任务,这意味着一旦一个任务完成,池就开始处理下一个任务。

此处理可确保了为您的任务进行最佳的批处理。

 

Promise Pool - NPMJS

@supercharge/promise-pool - npm (npmjs.com)icon-default.png?t=N7T8https://www.npmjs.com/package/@supercharge/promise-pool

Promise Pool - Document

Promise Poolicon-default.png?t=N7T8https://superchargejs.com/docs/3.x/promise-pool

 

怎么使用PromisePool

Install 安装

so easy , just install it

npm i @supercharge/promise-pool

Usage用例

Using the promise pool is pretty straightforward. The package exposes a class and you can create a promise pool instance using the fluent interface.

使用promise pool承诺池非常简单。该包公开了一个类,您可以使用流畅的接口创建一个承诺池实例。

Here’s an example using a concurrency of 2:

import { PromisePool } from '@supercharge/promise-pool'

const users = [
  { name: 'Marcus' },
  { name: 'Norman' },
  { name: 'Christian' }
]

const { results, errors } = await PromisePool
  .withConcurrency(2)
  .for(users)
  .process(async (userData, index, pool) => {
    const user = await User.createIfNotExisting(userData)

    return user
  })

The promise pool uses a default concurrency of 10

默认是十个线程,请按照自己的实际情况(业务+架构)处理

 

在以下示例中,我们创建了一个包含5个worker的线程池。然后,我们向线程池添加了10个任务。线程池会并发执行这些任务,但最多只能有5个任务同时运行。当一个任务完成时,线程池会自动分配下一个任务给空闲的worker。

const PromisePool = require('promise-pool');

// 创建一个包含5个worker的线程池
const pool = new PromisePool(5, (task) => {
  return new Promise((resolve, reject) => {
    // 模拟一个耗时操作
    setTimeout(() => {
      console.log('Task completed:', task);
      resolve();
    }, 1000);
  });
});

// 添加任务到线程池
for (let i = 0; i < 10; i++) {
  pool.addTask(i).then(() => {
    console.log('Task finished:', i);
  }).catch((err) => {
    console.error('Error:', err);
  });
}

//zhengkai.blog.csdn.net

Manually Stop the Pool 手工停止

You can stop the processing of a promise pool using the pool instance provided to the .process() and .handleError() methods. Here’s an example how you can stop an active promise pool from within the .process() method:

await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    if (condition) {
      return pool.stop()
    }

    // processes the `user` data
  })

You may also stop the pool from within the .handleError() method in case you need to:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .handleError(async (error, user, pool) => {
    if (error instanceof SomethingBadHappenedError) {
      return pool.stop()
    }

    // handle the given `error`
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Bring Your Own Error Handling

The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the .handleError(handler).

If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself.

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

承诺池允许自定义错误处理。

您可以通过使用.手柄错误(处理程序)实现错误处理程序来接管错误处理。

如果您提供了一个错误处理程序,则承诺池不会收集任何错误。

然后,您必须自己收集错误。

提供了一个自定义的错误处理程序,允许您通过抛出错误处理程序函数来提前退出承诺池。

抛出错误与Node.js错误处理使用异步/等待相一致。

import { PromisePool } from '@supercharge/promise-pool'

try {
  const errors = []

  const { results } = await PromisePool
    .for(users)
    .withConcurrency(4)
    .handleError(async (error, user) => {
      if (error instanceof ValidationError) {
        errors.push(error) // you must collect errors yourself
        return
      }

      if (error instanceof ThrottleError) { // Execute error handling on specific errors
        await retryUser(user)
        return
      }

      throw error // Uncaught errors will immediately stop PromisePool
    })
    .process(async data => {
      // the harder you work for something,
      // the greater you’ll feel when you achieve it
    })

  await handleCollected(errors) // this may throw

  return { results }
} catch (error) {
  await handleThrown(error)
}

Callback for Started and Finished Tasks 开始和结束任务的回调

You can use the onTaskStarted and onTaskFinished methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

您可以使用任务启动和任务完成的方法来连接到任务的处理中。

当任务启动/完成处理时,将调用为每个方法提供的回调:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted((item, pool) => {
    console.log(`Progress: ${pool.processedPercentage()}%`)
    console.log(`Active tasks: ${pool.processedItems().length}`)
    console.log(`Active tasks: ${pool.activeTasksCount()}`)
    console.log(`Finished tasks: ${pool.processedItems().length}`)
    console.log(`Finished tasks: ${pool.processedCount()}`)
  })
  .onTaskFinished((item, pool) => {
    // update a progress bar or something else :)
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })
You can also chain multiple onTaskStarted and onTaskFinished handling (in case you want to separate some functionality):

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted(() => {})
  .onTaskStarted(() => {})
  .onTaskFinished(() => {})
  .onTaskFinished(() => {})
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Task Timeouts 超时设置

有时,配置一个任务必须完成处理的超时时间是很有用的。

一个超时的任务被标记为失败。

您可以使用与任务超时(<毫秒>)方法来配置任务的超时:

Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the withTaskTimeout(<milliseconds>) method to configure a task’s timeout:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .withTaskTimeout(2000) // milliseconds
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Notice: a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.

注意:为每个任务配置了一个已配置的超时,而不是为整个池。

该示例为池中的每个任务配置一个2秒的超时。

Correspond Source Items and Their Results 正确响应每个请求

有时,您希望处理后的结果与源项保持一致。

结果项在结果数组中的位置应该与其相关的源项相同。

使用使用对应结果方法来应用此行为:

Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the results array as their related source items. Use the useCorrespondingResults method to apply this behavior:

import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'

const { results } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number, index) => {
    const value = number * 2

    return await setTimeout(10 - index, value)
  })

/**
 * source array: [1, 2, 3]
 * result array: [2, 4 ,6]
 * --> result values match the position of their source items
 */

For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index 0). The result for the second item from the source array is placed at the second position in the result array, and so on …

例如,您可能有三个要处理的项目。

使用相应的结果可以确保从源数组中得到的第一个项的处理结果位于结果数组中的第一个位置(=索引0)。

来自源数组的第二个项的结果被放置在结果数组中的第二个位置,以此类推。

Return Values When Using Corresponding Results 在使用相应的结果时,请返回相应的值

The results array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:

  • the actual value type: for results that successfully finished processing
  • Symbol('notRun'): for tasks that didn’t run
  • Symbol('failed'): for tasks that failed processing

The PromisePool exposes both symbols and you may access them using

  • Symbol('notRun'): exposed as PromisePool.notRun
  • Symbol('failed'): exposed as PromisePool.failed

处理后由承诺池返回的结果数组具有混合返回类型。

每个返回的项目都是以下类型之一:

实际值类型:对于成功完成处理的结果

符号(“notRun”):用于未运行的任务

符号(“failed”):用于处理失败的任务

承诺池公开了这两个符号,您可以使用

符号(“notRun”):公开为PromisePool.notRun

符号(“failed”):公开为PromisePool.failed

您可以对所有未运行或失败的任务重复处理:

You may repeat processing for all tasks that didn’t run or failed:

import { PromisePool } from '@supercharge/promise-pool'

const { results, errors } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number) => {
    // …
  })

const itemsNotRun = results.filter(result => {
  return result === PromisePool.notRun
})

const failedItems = results.filter(result => {
  return result === PromisePool.failed
})

When using corresponding results, you need to go through the errors array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.

当使用相应的结果时,您需要自己检查错误数组。

默认的错误处理(收集错误)保持不变,您可以按照上面描述的错误处理部分进行操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/521499.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Mysql中的 IFNULL 函数的详解

Mysql中的 IFNULL 函数的详解 概念 在mysql中IFNULL() 函数用于判断第一个表达式是否为 NULL&#xff0c;如果第一个值不为NULL就执行第一个值。第一个值为 NULL 则返回第二个参数的值。 语法 IFNULL(a, b)Demo 举例说明 创建表 create table student_one( sno varchar(20)…

Day106:代码审计-PHP原生开发篇文件安全上传监控功能定位关键搜索1day挖掘

目录 emlog-文件上传&文件删除 emlog-模板文件上传 emlog-插件文件上传 emlog-任意文件删除 通达OA-文件上传&文件包含 知识点&#xff1a; PHP审计-原生开发-文件上传&文件删除-Emlog PHP审计-原生开发-文件上传&文件包含-通达OA emlog-文件上传&文件…

HUD抬头显示器阳光倒灌实验一般步骤

概述 汽车HUD&#xff08;Head-Up Display&#xff0c;即抬头显示器&#xff09;阳光倒灌实验是一种用于评估汽车抬头显示器在阳光直射条件下显示效果的测试。该实验的目的是确保HUD系统在强烈的阳光下依然能够清晰地显示信息&#xff0c;不影响驾驶员的视线和驾驶安全。 一般…

单链表经典oj题 (一) 简单

1.删除指定节点数据&#xff08;非尾节点&#xff09;&#xff0c;要求时间复杂度为O(1) . - 力扣&#xff08;LeetCode&#xff09; 在之前我们将单链表删除指定节点的操作是先遍历找到pos的前一个结点&#xff0c;然后再进行删除&#xff0c;但是现在要求再O(1)时间内完成&am…

博客部署001-centos安装docker

1、安装docker 1.1 卸载旧版本的 Docker sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine1.2 设置 Docker 仓库 安装 Docker Engine 之前&#xff0c;首先需要设置…

力扣Lc29---- 541. 反转字符串 II(java版)-2024年4月06日

1.题目描述 2.知识点 &#xff08;1&#xff09;执行步骤如下&#xff1a; 初始化 s “abcdefg” 和 k 2 将字符串分割成长度为 2k 4 的块。 对每个块中的前 k 2 个字符进行反转。 执行过程 1&#xff09;第一次循环&#xff08;i 0&#xff09; start 0 end Math.min(0…

心法利器[112] | 考古RAG-20年RAG概念提出的论文

心法利器 本栏目主要和大家一起讨论近期自己学习的心得和体会。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。 2023年新的文章合集已经发布&#xff0c;获取方式看这里&#xff1a;又添十万字-CS的陋室2023年文章合集来袭&#xff0c;更…

计算机视觉——基于傅里叶幅度谱文档倾斜度检测与校正

概述 在计算机视觉领域&#xff0c;处理文档数据时&#xff0c;OCR算法的性能往往会受到文档的倾斜度影响。如果文档在输入到模型之前没有经过恰当的校正&#xff0c;模型就无法期待模型能够提供准确的预测结果&#xff0c;或者模型预测的精度会降低。例如&#xff0c;在信息提…

Qt Creator 新建项目

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;QT❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、使用 Qt Creator 新建项目 1、新建项目 2、选择项目模板 3、选择项目路径 4、选择构建系统 5…

usb_camera传输视频流编码的问题记录!

前言&#xff1a; 大家好&#xff0c;今天给大家分享的内容是&#xff0c;一个vip课程付费的朋友&#xff0c;在学习过程中遇到了一个usb采集的视频数据流&#xff0c;经过ffmpeg编码&#xff0c;出现了问题&#xff1a; 问题分析&#xff1a; 其实这个问题不难&#xff0c;关键…

漂亮国的无人餐厅的机器人骚操作

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;你的老朋友&#xff0c;老K。行业群 新书《智能物流系统构成与技术实践》 知名企业 读者福利&#xff1a; &#x1f449;抄底-仓储机器人-即买即用-免调试 智能制造-话题精读 1、西门子、ABB、汇川&#x…

Linux--03---虚拟机网络配置、拍摄快照和克隆

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.虚拟机网络配置1.虚拟机的联网模式模式1 仅主机模式特点模式2 桥接模式特点模式3 NAT模式特点关于模式的选择 2. 修改网络配置信息3.修改虚拟机ens33网卡的网络配…

【CNN】ConvMixer探究ViT的Patch Embedding: Patches Are All You Need?

Patches Are All You Need? 探究Patch Embedding在ViT上的作用&#xff0c;CNN是否可用该操作提升性能&#xff1f; 论文链接&#xff1a;https://openreview.net/pdf?idTVHS5Y4dNvM 代码链接&#xff1a;https://github.com/tmp-iclr/convmixer 1、摘要 ViT的性能是由于T…

【二分查找】Leetcode 在排序数组中查找元素的第一个和最后一个位置

题目解析 34. 在排序数组中查找元素的第一个和最后一个位置 我们使用暴力方法进行算法演化&#xff0c;寻找一个数字的区间&#xff0c;我们可以顺序查找&#xff0c;记录最终结果 首先数组是有序的&#xff0c;所以使用二分法很好上手&#xff0c;但是我们就仅仅使用上一道题…

2. Django配置信息

第2章 Django配置信息 Django的配置文件settings.py用于配置整个网站的环境和功能, 核心配置必须有项目路径, 密钥配置, 域名访问权限, App列表, 中间件, 资源文件, 模板配置, 数据库的连接方式.* 项目运行时, 如果修改代码, 项目会自动检测发现改动后会重新运行, 除非报错否…

xss.pwnfunction-Jefff

在eval中可以直接执行命令所以直接把"直接闭合在结尾再加上一个"因为后面的"没闭和会报错 ?jeffa";alert(1);" 或 ?jeffa"-alert(1)-" -是分隔符

根据状态转移表实现时序电路

描述 某同步时序电路转换表如下&#xff0c;请使用D触发器和必要的逻辑门实现此同步时序电路&#xff0c;用Verilog语言描述。 电路的接口如下图所示。 输入描述 input A ,input clk ,input rst_n 输出描述 output …

Windows11下Docker使用记录(二)

Docker使用记录&#xff08;二&#xff09; 1. 常用指令2. Dockerfile示例3. 构建docker image Docker中container&#xff0c;image&#xff0c;dockerfile 以及docker hub的关系如图所示&#xff0c;详细可见上一篇帖子。 本文主要记录Dockerfile相关。 1. 常用指令 常用指令…

matlab:五点中心差分求解Navier边界的Biharmonic方程(具有纳维尔边界的双调和方程)

我们考虑如下形式的双调和方程的数值解 其中&#xff0c;Ω是欧氏空间中的多边形或多面体域&#xff0c;在其中&#xff0c;d为维度&#xff0c;具有分段利普希茨边界&#xff0c;满足内部锥条件&#xff0c;f(x) ∈ L2(Ω)是给定的函数&#xff0c;∆是标准的拉普拉斯算子。算…

Open3D (C++) 从.txt文件中读取数据到矩阵

目录 一、算法概述二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法概述 在进行实验的时候有时需要借助不同的工具来实现一些比较复杂的操作,比如使用matlab中自带的拉…