消息队列批量收发消息,请避开这 5 个坑!

大家好,我是君哥。

使用消息队列时,为了提高生产和消费的性能,有时会开启批量处理。

在生产端,生产者发送的消息先发送到一个消息列表,积累到一定的消息量之后再批量发送给 Broker,如下图:

图片

在消费端,消费者拉取消息后先不立即处理,而是把消息转存到一个内存队列或数据库,由业务线程去处理,如下图:

图片

无论是生产者做批量发送,还是消费者做批量处理,都需要考虑使用批量消息的业务场景,避免踩坑。下面看一下批量操作可能会遇到哪些坑。

批量大小

当生产者采用批量发送的方式来提高发送性能时,一定要考虑发送消息的批量大小。下面是 RocketMQ 批量发送的官方示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

RocketMQ 默认消息大小是 4M,由 maxMessageSize 参数控制,如果批量消息大小超过 maxMessageSize,则会抛出异常。

如果遇到消息大小超过 maxMessageSize 的情况时,可以用下面方法进行处理:

  • 把这个参数改大,但需要考虑 Broker 的性能和网络带宽;

  • 将消息进行拆分后分批发送;

  • 对消息进行压缩处理。

RabbitMQ 相关的 API 则提供了更加灵活的批量控制,对消息数量和消息大小都做了控制,下面看一下源码:

图片

图片

幂等

消费端可以批量拉取消息进行消费,这样可以减少拉取消息时的 RPC 次数,提升消费性能。比如在 RocketMQ 中,可以通过 Consumer 中的 pullBatchSize 来设置一次拉取的消息数量,通过 consumeMessageBatchMaxSize 参数来设置一次消费的消息数量。

但需要注意的是,如果批量消息中一条消息消费失败了,这一批消息都需要进行重试,已经消费成功的消息会被重复消费,带来业务问题。

为了不对业务造成影响,必须考虑幂等。一个简单的方法是在消息中增加全局唯一 id 属性,对消息消费结果进行记录,消费成功后保存 id。这样在消费消息之前先查询是否存在消费成功的记录,如果存在则直接返回处理成功。

时延

在使用消息队列进行批量操作时,必须要考虑到时延问题。比如我们设置一个批次 100 条消息,积累够 100 条消息后再发送,在消息量小的情况下,可能积累够 100 条消息会很长时间,导致消费端拉取到一条消息时延很大。

虽然消息队列的一个重要作用是削峰填谷,但在一些场景下,对消息的实时性也有要求。比如在车联网的充电场景,车联网平台需要实时感知充电桩的状态,如果充电桩积累够一批消息再上报平台,平台获取到的状态会不准确,如果心跳消息延时太久,平台会认为充电桩离线。

对于有时延要求又需要批量操作的场景,可以设置一个超时时间,超时后即使消息数量不够,也会发送出去。看下 RabbitMQ 的处理:

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
  throws AmqpException {
 if (correlationData != null) {
  //...
  super.send(exchange, routingKey, message, correlationData);
 }
 else {
  if (this.scheduledTask != null) {
   this.scheduledTask.cancel(false);
  }
  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
  if (batch != null) {
   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
  }
  //这里获取到超时时间,到达超时时间后使用定时器将消息发送出去
  Date next = this.batchingStrategy.nextRelease();
  if (next != null) {
   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
  }
 }
}

可靠性

使用批处理一定要考虑可靠性的问题。

在消费端,消费者批量拉取一批消息后把消息暂存到一个内存临时队列,然后多线程去临时队列消费消息,如果服务宕机,临时队列中的消息会丢失。

为了避免宕机引发的损失,可以拉取一批消息后保存到数据库,然后给 Broker 返回 ACK,之后业务代码去数据库查询消息并消费,不过要考虑数据库大事务、锁竞争等问题。

当然,对于一些消息丢失不敏感的场景,比如日志收集之类的,可靠性这个指标是不用太关注的。

特殊场景

因为批量消息有一些复杂性,消息队列的部分特性不支持。

事务消息

批量消息会增加消息重试的难度,所以对于事务消息,建议使用单条消息,一条消息对应一个事务。

顺序消息

顺序消息的实现思路一般是生产者将消息发送到同一个分区,消费者绑定这个分区并使用单线程消费这个分区的消息。如果对同一个 Topic 下的同一个分区来实现批量发送,难度会增大。所以建议顺序消息使用单条消息进行发送。

延时消息

如果延时消息使用批量进行发送,这一批消息的延时时间必须相同,同时要考虑批量消息的超时时间,超时时间太大会影响延时时间的准确性,生产端实现复杂度大大增加。

总结

使用批量消息,在一定程度上可以提高性能和吞吐量,但是确实也会存在一些问题,使用的时候要结合业务场景避开这些坑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/233823.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【实战教程】PHP与七牛云的完美对接,你值得拥有!

前言&#xff1a; 随着互联网的迅速发展&#xff0c;越来越多的网站和应用程序需要处理大量的图片、视频和其他文件。为了有效地存储和管理这些文件&#xff0c;并提供快速的内容分发服务&#xff0c;开发者们常常依赖于云存储和CDN服务提供商。 七牛云是一家领先的云存储和C…

[LeetCode周赛复盘] 第 375 场周赛20231210

[LeetCode周赛复盘] 第 375 场周赛20231210 一、本周周赛总结100143. 统计已测试设备1. 题目描述2. 思路分析3. 代码实现 100155. 双模幂运算1. 题目描述2. 思路分析3. 代码实现 100137. 统计最大元素出现至少 K 次的子数组1. 题目描述2. 思路分析3. 代码实现 100136. 统计好分…

047:vue加载循环倒计时 示例

第047个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…

智能优化算法应用:基于飞蛾扑火算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于飞蛾扑火算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于飞蛾扑火算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.飞蛾扑火算法4.实验参数设定5.算法结果6.…

vue3封装接口

在src下面创建一个文件夹任意名称 我拿这个名字举例子了apiService 相当于创建一个新的文件 // 封装接口 // apiService.js import axios from axios;// 接口前缀 const API_BASE_URL 前缀;接口后缀export const registerUser async (fileData) > {try {const response …

eclipse中maven的配置

Maven下载地址&#xff1a;https://maven.apache.org/download.cgi 下载完成以后解压到非中文目录&#xff0c;建议放一个比较大的盘符下&#xff0c;因为Maven会一直从网上更新各种库存放在这个目录下&#xff0c;慢慢的会变得很大。 Maven环境变量配置 创建环境变量 在桌…

file-saver 的使用

简介 FileSaver.js 是在客户端保存文件的解决方案&#xff0c;非常适合在客户端生成文件的 Web 应用程序 基本使用 以下内容基于官方文档&#xff0c;官方文档传送门https://gitcode.net/mirrors/eligrey/FileSaver.js 注意&#xff1a;存在文件保存的大小限制&#xff0c;具…

1688API接口系列,商品详情数据丨搜索商品列表丨商家订单类丨1688开放平台接口使用方案

1688商品详情接口是指1688平台提供的API接口&#xff0c;用于获取商品详情信息。通过该接口&#xff0c;您可以获取到商品的详细信息&#xff0c;包括商品标题、价格、库存、描述、图片等。 要使用1688商品详情接口&#xff0c;您需要先申请1688的API权限&#xff0c;并获取ac…

Nginx【通俗易懂】《上篇》

目录 1.什么是Nginx&#x1f495;&#x1f495;&#x1f495; 2.Nginx的基本目录&#x1f495;&#x1f495;&#x1f495; 3.基本原理图 &#x1f495;&#x1f495;&#x1f495; 4.Nginx配置 &#x1f495;&#x1f495;&#x1f495; 5.日志的分析 &#x1f495;&…

Matlab数学建模算法之小波神经网络详解

&#x1f517; 运行环境&#xff1a;Matlab &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 &#x1f510;#### 防伪水印——左手の明天 ####&#x1f510; &#x1f497; 大家…

Linux6-配置网络、源码包的编译和安装

配置 linux 网络 配置主机名 修改/etc/hostname 配置文件&#xff0c;永久配置主机名 [rootlocalhost ~]# vim /etc/hostname svr7.tedu.cn [rootlocalhost ~]# cat /etc/hostname svr7.tedu.cn [rootlocalhost ~]# reboot #重启生效命令行永久修改主机名 [rootlocalhost ~…

vivado时序方法检查11

TIMING-47 &#xff1a; 同步时钟之间的伪路径、异步时钟组或仅最 大延迟数据路径约束 在 <clock_group> 与 <clock_group> 这两个时钟之间设置了 <message_string> 时序约束 &#xff08; 请参阅 VivadoIDE 的“ Timing Constraint ”窗口中的约束位…

Docker部署开源分布式任务调度平台DolphinScheduler并实现远程访问办公

文章目录 前言1. 安装部署DolphinScheduler1.1 启动服务 2. 登录DolphinScheduler界面3. 安装内网穿透工具4. 配置Dolphin Scheduler公网地址5. 固定DolphinScheduler公网地址 前言 本篇教程和大家分享一下DolphinScheduler的安装部署及如何实现公网远程访问&#xff0c;结合内…

整合,降价,官司……2023休闲零食行业大变局

休闲零食行业正从暗战走向明战。 11月27日&#xff0c;宁波广源聚亿投资有限公司&#xff08;以下简称“广源聚亿”&#xff09;因被投企业宜春赵一鸣食品科技有限公司&#xff08;以下简称“赵一鸣”&#xff09;在双方合作期内&#xff0c;刻意隐瞒公司重大事项&#xff0c;…

什么叫应用加速,什么情况需要用到应用加速

应用加速的定义 应用加速依赖节点之间的高速通道、转发集群及智能路由技术&#xff0c;实现各地用户的就近接入&#xff0c;通过高速通道直达源站区域&#xff0c;帮助业务解决用户访问卡顿或者延迟过高的问题。使用高速网络和全球范围内的节点技术&#xff0c;确保传输时间和…

1829_org-mode的标记语法

org-mode的标记语法 Grey 全部学习汇总&#xff1a; GitHub - GreyZhang/g_org: my learning trip for org-mode 如果用org-mode来做笔记&#xff0c;现在看起来还是有一些短板让我觉得不是很方便。不过&#xff0c;我发现采用这种方式整理信息的时候会有较为深度的思考。因…

flstudio21.3完整高级版怎么下载?有哪些新功能

flstudio高级版是一款适用于广泛领域的音频编辑软件。它支持多通道混音器和VST插件&#xff0c;包括数百种乐器和效果插件。它还为您提供了一个乐谱编辑器&#xff0c;需要对不同乐器的节奏进行必要的编辑。Flstudio具有许多内置电子合成声音&#xff0c;可提供更广泛的电子声音…

MySQL - 表达式With as 语句的使用及练习

目录 8.1 WITH AS 的含义 8.2 WITH AS语法的基本结构如下&#xff1a; 8.3 练习题1 8.4 牛客练习题 8.1 WITH AS 的含义 WITH AS 语法是MySQL中的一种临时结果集&#xff0c;它可以在SELECT、INSERT、UPDATE或DELETE语句中使用。通过使用WITH AS语句&#xff0c;可以将一个查…

荒野大镖客emp.dll文件丢失怎么办?6种有效修复方法教你如何解决问题

在计算机运行软件游戏的使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“找不到emp.dll&#xff0c;无法继续执行代码”。这个问题的出现往往会导致程序无法正常运行&#xff0c;给用户带来困扰。本文将详细介绍emp.dll丢失的原因、对电脑的影响以…

力扣思维题/经典面试题——下一个排序

https://leetcode.cn/problems/next-permutation/description/ 字节面试题&#xff0c;非常经典的逻辑思维题 1、找到第一个下降点&#xff0c;说明这个点可以变得稍微大一点以至于让整个排列变得更加大 为什么&#xff0c;仔细想想&#xff0c;后面都是倒序了怎么都不可能变得…