Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

一、使用场景

在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例:

PulsarClient pulsarClient = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();
Producer<byte[]> producer = pulsarClient.newProducer()
    .topic("search-activities")
    .create();
try {
    MessageId messageId = producer.send(/* message payload here */);
    log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {
    log.error("Failed to record search activity", e);
}

在这个场景中,pulsarClientproducer 支持复用,推荐这么做,这里只是为了演示写到了一起。producer.send 是阻塞方式发送消息,线程会卡在这里等待发送结果返回。在现实中,根据消息在实际业务中的需要,可以选择阻塞和非阻塞两种方式。例如,业务上对搜索请求事件并无强依赖,因此使用阻塞方式发消息不太适合,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性。因此,可以优化为非阻塞方式,将记录搜索事件放到其他线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {
    if (ex != null) {
        log.error("Failed to record search activity", ex);
    } else {
        log.debug("Search activity messageId={}", msgId);
    }
});

在高TPS(例如单实例超过1000QPS)和大消息内容(例如100KB甚至1MB)的情况下,上述代码可能会遇到 MemoryBufferIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full

此外,如果服务与Pulsar的broker之间出现网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,还可能会遇到 ProducerQueueIsFullError 异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full

二、Producer的内存控制

1. 配置项分析

在构建Producer时,ProducerBuilder 中与内存使用有关的配置项包括:

  • maxPendingMessages(int maxPendingMessages):控制producer内部队列中正在发送但还没有接收到broker确认的消息数量。若队列大小超出这个限制,默认行为是抛出 ProducerQueueIsFullError 异常。可以通过设置 blockIfQueueFull=true 调整为阻塞等待队列中空出新的空间。
  • maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions):控制整个topic所有分区的总pending消息数量。最终到各个分区内部producer取 maxPendingMessages 和 maxPendingMessagesAcrossPartitions / partitions 的较小值。

2. 内存限制配置

在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用是不切实际的。因此,在 PIP-74 中,ClientBuilder 提供了一个面向整个client实例统一的内存限制配置:

ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认会抛出 MemoryBufferIsFullError 异常。若同时配置了 blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

3. blockIfQueueFull 配置的使用

blockIfQueueFull 配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息。然而,一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send 方法还是非阻塞的 Producer.sendAsync 方法都会出现阻塞等待,这可能会阻塞当前线程,对于某些业务场景是不可接受的。

4. 默认配置

PIP-120 对 2.10.0 以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为 64MB,同时默认禁用了 maxPendingMessagesmaxPendingMessagesAcrossPartitions 配置(默认值修改为0),并将 maxPendingMessagesAcrossPartitions 配置标记为 Deprecated

三、Consumer的内存控制

1. 配置项分析

在构造一个Consumer时,ConsumerBuilder 提供的与内存使用有关的选项包括:

  • receiverQueueSize(int receiverQueueSize):控制每个分区consumer的接收队列大小。
  • maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions):控制所有分区consumer和parent consumer的接收队列总大小。

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用 Consumer#receive 或者 Consumer#receiveAsync 方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。

2. 自动扩展接收队列

在 PIP-74 中提出了一个新的控制Consumer内存使用的方案,即 autoScaledReceiverQueueSizeEnabled

ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用这个特性后,receiverQueueSize 会从1开始呈2的指数倍增长,直至达到 receiverQueueSize 的限制或达到client的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

四、番外:ackTimeout 和 ackTimeoutTickTime 的配置

除了Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时 ackTimeoutackTimeoutTickTime 的配置如果不匹配,会消耗较多堆内内存。

ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了 ackTimeout 并且配置了较大的时间窗口(例如1小时或者更长),应适当调大 ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若 ackTimeout 时间窗口很大,ackTimeoutTickTime 仍然使用其默认值 1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码 UnAckedMessageTracker.java

五、总结

  1. 使用 sendAsync 非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了 blockIfQueueFull 之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用 memoryLimit 导致相互影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/952691.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter项目适配鸿蒙

Flutter项目适配鸿蒙 前言Flutter项目适配鸿蒙新工程直接支持ohos构建新项目编译运行 适配已有的Flutter项目 前言 目前市面上使用Flutter技术站的app不在少数&#xff0c;对于Flutter的项目&#xff0c;可能更多的是想直接兼容Harmonyos&#xff0c;而不是直接在重新开发一个…

我的128天创作之路:回顾与展望

大家好呀&#xff01;今天来和你们分享一下我的创作历程&#x1f601;。 一、机缘 最开始创作呢&#xff0c;是因为在学习 C 的 STL 时&#xff0c;像 string、list、vector 这些模板可把我折腾得够呛&#xff0c;但也让我学到了超多东西&#xff01;我就想&#xff0c;要是把我…

AI刷题-数列推进计算任务、数组中的幸运数问题

目录 一、数列推进计算任务 问题描述 测试样例 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 优化思路 最终代码&#xff1a; 运行结果&#xff1a; 二、数组中的幸运数问题 问题描述 测试样例 解题思路&#xff1a; 问题理解 数据结构选择 算法步…

Helm部署activemq

1.helm create activemq 创建helm文件目录 2.修改values.yaml 修改image和port 3. helm template activemq 渲染并输出 4. helm install activemq activemq/ -n chemical-park // 安装 5.启动成功

Windows 下Mamba2 / Vim / Vmamba 环境安装问题记录及解决方法终极版(无需绕过triton)

导航 安装教程导航 Mamba 及 Vim 安装问题参看本人博客&#xff1a;Mamba 环境安装踩坑问题汇总及解决方法&#xff08;初版&#xff09;Linux 下Mamba 及 Vim 安装问题参看本人博客&#xff1a;Mamba 环境安装踩坑问题汇总及解决方法&#xff08;重置版&#xff09;Windows …

力扣25. K个一组反转链表

给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。 示例 1&#xff1a; 输入&#xff…

动态规划练习五(子序列问题)

一、解题心得 首先子序列是不连续的&#xff0c;所以一定不会在 i - 1 位置去推 i 位置的 dp 值了&#xff0c;所以一般子序列问题是 O(n^2) 复杂度。但是可以通过哈希表等方式降成 O(n)。 以我带来的例题其实子序列问题可以分成两种&#xff1a; 1、以 i 位置为结尾&#x…

图像处理|膨胀操作

在图像处理领域&#xff0c;形态学操作是一种基于图像形状的操作&#xff0c;用于分析和处理图像中对象的几何结构。**膨胀操作&#xff08;Dilation&#xff09;**是形态学操作的一种&#xff0c;它能够扩展图像中白色区域&#xff08;前景&#xff09;或减少黑色区域&#xf…

汉图科技XP356DNL高速激光打印一体机综合性能测评

汉图科技XP356DNL高速激光打印一体机效率方面表现出色&#xff0c;支持A4纸型的高速打印&#xff0c;单面打印速度高达35页/分钟&#xff0c;自动双面打印速度可达32面/分钟&#xff0c;这样的速度在日常办公中能够极大地提高打印效率&#xff0c;减少等待时间&#xff0c;满足…

Unity + Firebase + GoogleSignIn 导入问题

我目前使用 Unity版本&#xff1a;2021.3.33f1 JDK版本为&#xff1a;1.8 Gradle 版本为&#xff1a;6.1.1 Firebase 版本: 9.6.0 Google Sign In 版本为&#xff1a; 1.0.1 问题1 &#xff1a;手机点击登录报错 apk转化成zip&#xff0c;解压&#xff0c;看到/lib/armeabi-v…

如何搭建 Vue.js 开源项目的 CI/CD 流水线

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

Elasticsarch:使用全文搜索在 ES|QL 中进行过滤 - 8.17

8.17 在 ES|QL 中引入了 match 和 qstr 函数&#xff0c;可用于执行全文过滤。本文介绍了它们的作用、使用方法、与现有文本过滤方法的区别、当前的限制以及未来的改进。 ES|QL 现在包含全文函数&#xff0c;可用于使用文本查询过滤数据。我们将回顾可用的文本过滤方法&#xf…

ISP流程--去马赛克详解

前言 本期我们将深入讨论ISP流程中的去马赛克处理。我们熟知&#xff0c;彩色图像由一个个像元组成&#xff0c;每个像元又由红、绿、蓝&#xff08;RGB&#xff09;三通道构成。而相机传感器只能感知光的强度&#xff0c;无法直接感知光谱信息&#xff0c;即只有亮暗而没有颜色…

晨辉面试抽签和评分管理系统之七:面试成绩核算的三种方式

晨辉面试抽签和评分管理系统&#xff08;下载地址:www.chenhuisoft.cn&#xff09;是公务员招录面试、教师资格考试面试、企业招录面试等各类面试通用的考生编排、考生入场抽签、候考室倒计时管理、面试考官抽签、面试评分记录和成绩核算的面试全流程信息化管理软件。提供了考生…

FastApi Swagger 序列化问题

问题 错误现象&#xff1a; fastapi的 swagger 界面无法正常打开控制台报错&#xff1a;raise PydanticInvalidForJsonSchema(fCannot generate a JsonSchema for {error_info}) 详细报错&#xff1a; File "d:\Envs\miniconda3\envs\xdagent\lib\site-packages\pydan…

Browser-Use Web UI:浏览器自动化与AI的完美结合

Browser-Use Web UI:浏览器自动化与AI的完美结合 前言简介一、克隆项目二、安装与环境配置1. Python版本要求2. 安装依赖3. 安装 Playwright4. 配置环境变量(非必要步骤)三、启动 WebUI四、配置1. Agent设置2. 大模型设置3. 浏览器相关设置4. 运行 Agent结语前言 Web UI是在…

秒懂虚拟化(三):桌面拟化、用户体验虚拟化、应用程序虚拟化全解析,通俗解读版

秒懂虚拟化&#xff08;二&#xff09;&#xff1a;服务器虚拟化、操作系统虚拟化、服务虚拟化全解析&#xff0c;通俗解读版-CSDN博客这篇文章学习了服务器虚拟化、操作系统虚拟化、服务器虚拟化&#xff0c;本节将继续学习桌面虚拟化、用户体验虚拟化、应用程序虚拟化。 1、…

UVM RAL Register Abstraction Layer:寄存器抽象层

topic 没有RAL的TB 有RAL的TB RAL介绍 summary

扬帆数据结构算法之舟,启航C++探索征途——LeetCode深度磨砺:顺序表技术精进实践

人无完人&#xff0c;持之以恒&#xff0c;方能见真我&#xff01;&#xff01;&#xff01; 共同进步&#xff01;&#xff01; 文章目录 顺序表练习1.移除数组中指定的元素方法1&#xff08;顺序表&#xff09;方法2&#xff08;双指针&#xff09; 2.删除有序数组中的重复项…

【Linux网络编程】网络层 | IP协议 | 网段划分 | 私有IP和公有IP | NAT技术

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站 &#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系…