如何使用队列处理 API 速率限制

对于遇到速率限制的应用程序来说也是一个挑战,因为它需要“放慢速度”或暂停。这是一个典型的场景:

  • 初始请求:当应用程序发起与 API 的通信时,它会请求特定的数据或功能。
  • API 响应: API 处理请求并响应请求的信息或执行所需的操作。
  • 速率限制:如果应用程序已达到限制,通常需要等到下一个指定的时间范围(例如一分钟到一小时)才能发出其他请求。如果它是“软”速率限制并且时间范围已知并且是线性的,则更容易处理。通常,每个区块的等待时间都会增加,需要对每个 API 进行完全不同的自定义处理。
  • 处理超出速率限制:如果应用程序超出速率限制,它可能会收到来自 API 的错误响应(例如“429 Too Many Requests”状态代码)。应用程序需要优雅地处理这个问题,可能是通过对请求进行排队、实施退避策略(在重试之前等待逐渐延长的时间)或通知用户已达到速率限制。

为了在速率限制内有效运行,应用程序通常采用以下策略:

  • 限制:调节传出请求的速率以符合 API 的速率限制。
  • 缓存:将频繁请求的数据存储在本地,以减少重复 API 调用的需要。
  • 指数退避:实施一种策略,使应用程序在达到速率限制后在后续重试之间等待的时间越来越长,以减少服务器负载并防止立即重试。
  • 队列? 下一节将详细介绍

使用队列

由于队列能够系统地处理任务,因此可以作为出色的“助手”或工具来帮助服务管理速率限制。然而,虽然它提供了显着的好处,但它并不是用于此目的的独立解决方案。

在构建健壮的架构时,用于与受速率限制的外部 API 交互的服务或应用程序通常会异步处理任务。该服务通常由从队列派生的任务启动。当服务遇到速率限制时,它可以轻松地将作业返回到主队列或将其分配到指定用于延迟任务的单独队列,并在特定的等待时间(例如 X 秒)后重新访问它。

这种对队列系统的依赖是非常有利的,主要是因为它的临时性质和排序。然而,仅靠队列并不能完全解决速率限制问题;它需要额外的功能或服务本身的帮助才能有效地处理这些限制。

使用队列时可能会出现挑战

  • 重新进入队列的任务可能会比必要的时间更早返回,因为它们的时间不直接由您的服务控制。
  • 由于在有限时间内频繁拨打电话而超出速率限制。这可能需要实施睡眠或等待机制,由于它们对性能和响应能力的潜在影响,通常被认为是不好的做法。

RabbitMQ

const amqp = require('amqplib');
const axios = require('axios');

// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
  try {
    const response = await axios.get(url);
    console.log('API Response:', response.data);
  } catch (error) {
    console.error('API Error:', error.message);
  }
}

// Connect to RabbitMQ server
async function connect() {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'rateLimitedQueue';
    channel.assertQueue(queue, { durable: true });

    // Consume messages from the queue
    channel.consume(queue, async msg => {
      const { url, delayInSeconds } = JSON.parse(msg.content.toString());

      // Simulating rate limitation
      await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));

      await makeAPICall(url); // Make the API call

      channel.ack(msg); // Acknowledge message processing completion
    });
  } catch (error) {
    console.error('RabbitMQ Connection Error:', error.message);
  }
}

// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {
  try {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();

    const queue = 'rateLimitedQueue';
    channel.assertQueue(queue, { durable: true });

    const message = JSON.stringify({ url, delayInSeconds });
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });

    console.log('Task added to the queue');
  } catch (error) {
    console.error('RabbitMQ Error:', error.message);
  }
}

// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds

// Start the consumer
connect();

Kafka

const { Kafka } = require('kafkajs');
const axios = require('axios');

// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
  try {
    const response = await axios.get(url);
    console.log('API Response:', response.data);
  } catch (error) {
    console.error('API Error:', error.message);
  }
}

// Kafka configuration
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'], // Replace with your Kafka broker address
});

// Create a Kafka producer
const producer = kafka.producer();

// Connect to Kafka and send messages
async function produceToKafka(topic, message) {
  await producer.connect();
  await producer.send({
    topic,
    messages: [{ value: message }],
  });
  await producer.disconnect();
}

// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });

// Consume messages from Kafka topic
async function consumeFromKafka(topic) {
  await consumer.connect();
  await consumer.subscribe({ topic });
  await consumer.run({
    eachMessage: async ({ message }) => {
      const { url, delayInSeconds } = JSON.parse(message.value.toString());

      // Simulating rate limitation
      await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));

      await makeAPICall(url); // Make the API call
    },
  });
}

// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {
  const message = JSON.stringify({ url, delayInSeconds });
  await produceToKafka(topic, message);
  console.log('Message added to Kafka topic');
}

// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);

// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds

这两种方法都是合法的,但它们需要您的服务包含“睡眠”机制。
借助 Memphis,您可以使用专门为此目的而设计的称为“延迟消息”的简单功能,将延迟从客户端转移到队列。当您的消费者应用程序需要额外的处理时间时,延迟消息允许您将收到的消息发送回代理。

孟菲斯实施的独特之处在于消费者能够独立且原子地控制这种延迟。
在站内,未消费消息的计数不会影响延迟消息的消费。例如,如果需要 60 秒的延迟,它会精确配置该特定消息的不可见时间。

Memphis.dev 延迟消息

  1. 消费者组收到一条消息。
  2. 发生事件,提示消费者组暂停处理消息。
  3. 假设maxMsgDeliveries尚未达到其限制,消费者将激活message.delay(delayInMilliseconds),绕过消息。代理不会立即重新处理同一消息,而是将其保留指定的持续时间。
  4. 后续消息将被消费。
  5. 一旦请求delayInMilliseconds通过,代理将停止主要消息流并将延迟的消息重新引入循环。

孟菲斯

const { memphis } = require('memphis-dev');

// Function to make API requests, simulating rate limitations 
async function makeAPICall(message) 
{ 
  try { 
    const response = await axios.get(message.getDataAsJson()['url']); 
    console.log('API Response:', response.data); 
    message.ack();
  } catch (error) { 
    console.error('API Error:', error.message); 
    console.log("Delaying message for 1 minute"); 
    message.delay(60000);
  } 
}

(async function () {
    let memphisConnection;

    try {
        memphisConnection = await memphis.connect({
            host: '<broker-hostname>',
            username: '<application-type username>',
            password: '<password>'
        });

        const consumer = await memphisConnection.consumer({
            stationName: '<station-name>',
            consumerName: '<consumer-name>',
            consumerGroup: ''
        });

        consumer.setContext({ key: "value" });
        consumer.on('message', (message, context) => {
            await makeAPICall(url, message);
        });

        consumer.on('error', (error) => { });
    } catch (ex) {
        console.log(ex);
        if (memphisConnection) memphisConnection.close();
    }
})();

结论

了解并遵守速率限制对于使用 API 的应用程序开发人员至关重要。它涉及管理请求频率、达到限制时处理错误、实施退避策略以防止 API 服务器过载以及利用 API 提供的速率限制信息来优化应用程序性能,现在您也知道如何使用队列来做到这一点!


作者:Idan Asulin

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/271826.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linxu重启网络服务失败——Failed to start LSB: Bring up/down networking.

一、出现问题的场景 在虚拟机中的Linux系统启动后&#xff0c;发现没有网络&#xff0c;执行ifconfig 发现自己配置的ens33网卡没有启动。 接着&#xff0c;执行systemctl restart network 重启网络服务失败 查看网络状态&#xff0c;执行 systemctl status network 发现报错…

有什么好用的C/C++源代码混淆工具?

​ 有什么好用的C/C源代码混淆工具&#xff1f; 开始使用ipaguard 前言 iOS加固保护是直接针对ios ipa二进制文件的保护技术&#xff0c;可以对iOS APP中的可执行文件进行深度混淆、加密。使用任何工具都无法逆向、破解还原源文件。对APP进行完整性保护&#xff0c;防止应用…

圣诞树(动态效果)

一、运行效果 二、制作方法 1.复制代码到Dreamweaver或HBuilder或vscode中 2.点击运行---运行到浏览器---选择你要打开的浏览器 3.打开后会出现这个界面&#xff0c;前四个是固定音乐&#xff0c;最后一个是自主选择的音乐&#xff0c;你可以选择你电脑上的歌曲&#xff0c…

CCF-CSP真题《202309-2 坐标变换(其二)》思路+python,c++满分题解

想查看其他题的真题及题解的同学可以前往查看&#xff1a;CCF-CSP真题附题解大全 试题编号&#xff1a;202309-2试题名称&#xff1a;坐标变换&#xff08;其二&#xff09;时间限制&#xff1a;2.0s内存限制&#xff1a;512.0MB问题描述&#xff1a; 问题描述 对于平面直角坐标…

vue项目中使用axios发送http请求添加header自定义变量出现跨域问题

request代码片段&#xff1a; export const request (api,method,params {},config,responseType {} ) > {let apiToken localStorage.getItem("token");let headers {Authorization: ${apiToken},};if (config?.headers) {headers {...headers,...config…

java:4-7运算符优先级

运算符优先级 运算符有不同的优先级&#xff0c;所谓优先级就是表达式运算中的运算顺序。如右表&#xff0c;上一行运算符总优先于下一行。只有单目运算符&#xff08;第二行&#xff09;、赋值运算符&#xff08;倒数3行&#xff09;是从右向左运算的。一览表, 不要背&#x…

SecuSphere:一款功能强大的一站式高效DevSecOps安全框架

关于SecuSphere SecuSphere是一款功能强大的一站式高效DevSecOps解决方案&#xff0c;DevSecOps作为一个经过针对性设计的集中式平台&#xff0c;可以帮助广大研究人员管理和优化漏洞管理、CI/CD管道集成、安全评估和DevSecOps实践。 SecuSphere是一个功能全面的DevSecOps平台…

公司办公文件数据\资料防泄密软件系统——自动智能透明加密保护核心数据

天锐绿盾办公文件数据防泄密软件系统是一款自动智能透明加密保护核心数据的软件系统。 PC访问地址&#xff1a; https://isite.baidu.com/site/wjz012xr/2eae091d-1b97-4276-90bc-6757c5dfedee 以下是该系统的几个核心特点&#xff1a; 自动智能加密&#xff1a;系统采用自动…

K8s 源码剖析及debug实战(一):Minikube 安装及源码准备

文章目录 0. 引言1. 什么是 Minikube2. 安装 Minikube3. 下载 Go4. 下载 Goland5. 下载 K8s 源码6. 后续 0. 引言 欢迎关注本专栏&#xff0c;本专栏主要从 K8s 源码出发&#xff0c;深入理解 K8s 一些组件底层的代码逻辑&#xff0c;同时借助 debug Minikube 来进一步了解 K8…

HarmonyOS Watch状态变量监听

今天 我们要将 Watch装饰器 状态变量更改通知 那么 关键点 状态变量 就是 更改后页面会响应式更新的响应式数据 我们可以这样写 Entry Component struct Index {State Watch("setName") name:string "小猫猫";setName() {console.log("变量改变&q…

为什么 C 语言被广泛应用于嵌入式系统开发?

为什么 C 语言被广泛应用于嵌入式系统开发&#xff1f; 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「C 语言的资料从专业入门到高级教程工具包」&#xff0c;点个关注&#xff0c;全部无偿共享给大家&a…

超纯水抛光树脂:光伏行业新技术应用

在清洁能源的领域中&#xff0c;高效太阳能电池&#xff0c;尤其是单晶硅电池&#xff0c;正日益受到重视。这些电池不仅转换效率高&#xff0c;而且耐用性强。然而&#xff0c;它们的制造过程对水质有着极高的要求&#xff0c;这就引入了超纯水的重要性。那么&#xff0c;超纯…

一套UWB精准定位系统源码,java语言开发,基于UWB技术自主研发的高精度人员定位系统源码

一套UWB精准定位系统源码&#xff0c;基于UWB技术自主研发的室内外人员定位系统源码 随着经济的高速发展&#xff0c;现代制造业生产设备日益繁多&#xff0c;生产车间面积广阔&#xff0c;生产工人数量多&#xff0c;存在难以进行有效管理和不便实施全方位风险管控的难题。 人…

kubernetes(k8s) Yaml 文件详解

YAML格式&#xff1a;用于配置和管理&#xff0c;YAML是一种简洁的非标记性语言&#xff0c;内容格式人性化&#xff0c;较易读。 1、查看API 资源版本标签 kubectl api-versions 2、编写资源配置清单 kubectl create -f nginx-test.yaml --validatefalse 2.3 查看创建的po…

MySQL报错:1054 - Unknown column ‘xx‘ in ‘field list的解决方法

我在操作MySQL遇到1054报错&#xff0c;报错内容&#xff1a;1054 - Unknown column Cindy in field list&#xff0c;下面演示解决方法&#xff0c;非常简单。 根据箭头指示&#xff0c;Cindy对应的应该是VARCHAR文本数字类型&#xff0c;字符串要用引号&#xff0c;所以解决方…

与供应商合作:成功供应商管理的六种最佳实践

许多企业低估了他们对外部供应商的依赖程度&#xff0c;也小看了这些供应商关系所涉及的风险。本文将探索企业与外部供应商合作的六种最佳实践&#xff0c;利用它们创建有效的供应商管理流程&#xff0c;从而降低成本和风险&#xff0c;并提高盈利能力。 供应商管理为何重要&a…

jsp页面bootstrap表格设置页面跳转一直在底部

首先介绍一下bootStrap和bootStrap table&#xff1a; bootStrap: Bootstrap是Twitter推出的一个用于前端开发的开源工具包。 它由Twitter的设计师Mark Otto和Jacob Thornton合作开发,是一个CSS/HTML框架。 bootStrap table: Bootstrap table 是一款基于 Bootstrap 的 jQue…

web组态--新一代全流程低代码物联网平台

先上图&#xff0c;实际完成效果&#xff1a; 1.添加应用图纸 登录by组态后台&#xff1a;http://www.byzt.net:90 ​ 点击组态管理-画面管理&#xff0c;先新建一个组态画面&#xff0c;填写画面名称&#xff0c;保存&#xff0c;进入组态画面。 ​ 选择画面管理&#xff0…