Apache pulsar 技术系列-- 消息重推的几种方式

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要通过 MQ 实现消息的重新推送能力,比如超时重推、处理异常时重推等,本文介绍 Apache Pulsar 提供的几种消息重推方案。

在 MQ 实际的使用中,用户消费数据时,可能会遇到消息处理异常或者需要推迟处理的场景,这里就涉及到消息的重推逻辑,Pulsar 自己提供了消息重推的能力。本文主要介绍 Pulsar 的消息重推机制。

消息获取(拉取/推送)机制

Pulsar 的消费采用了推、拉结合的消息获取机制,Consumer 获取消息之前会首先通知 Broker(FLOW 请求),Broker 会根据配置的 ReceiveQueue 大小以及 Consumer 当前可以接收的消息数量来推送消息给 Consumer。

详细的交互流程如下图所示:

图片

  1. Consumer 在创建之后,会以 MaxReceiveQueue 的大小作为 Permit 值,这个值就是 Consumer 可以缓存的的最大消息条数。

  2. 然后,Consumer 向 Broker 发起 FLOW 请求,携带 Permit 信息(Consumer Permit 减少到 0),Broker 接收之后会记录这个 Permit 作为 Consumer 的 AvailablePermit,AvailablePermit 决定 Broker 可以向 Consumer 发送数据的数量(实际是在读取数据时判断)。

  3. 如果 AvailablePermit > 0, Broker 开始读取数据(假设有 N 条),然后推送给 Consumer,推送之后,AvailablePermit 自减 N。

  4. Consumer 接收到消息之后,并不会直接返回给用户,而是放在 ReceiveQueue 中,当用户调用 Receive() 方法来获取消息时,Consumer 将 Permit + 1。

  5. 当 Permit > MaxReceiveQueueSize / 2,Consumer 会再次发起 Flow 请求,并且携带当前的 Permit 值。

上述流程,就是 Consumer 和 Broker 的消息传递过程。

在默认的情况下,数据推送给 Consumer 之后,就完全交给用户处理,数据不会重复推送。这种方式满足不了需要重推的场景,下面介绍目前 Pulsar 的几种重推机制。

SDK 统一的重推

一个比较直观的做法是超过一定时间,如果消息没有 Ack 就重新推送。

目前 Pulsar 提供了通过超时时间来控制数据重推的能力,Consumer 可以配置 AckTimeout(默认关闭),在设置了 AckTimeout 之后,Client 会构建一个 UnAckedMessageTracker ,用户 Receive() 的所有的消息都会被 UnAckedMessageTracker 跟踪。用户 Ack 消息时,会从 UnAckedMessageTracker 删除,对于没有 Ack 的消息,UnAckedMessageTracker 会有定时任务来检查,如果已经超过了 AckTimeout 时间,则会触发重推。

重推是通过 RedeliverUnackMessage 来实现的,UnAckedMessageTracker 会主动发起 Redeliver 的请求,Broker 会根据请求的 MessageId 信息重新推送。

AckTimeout 在 Consumer 初始化时设置:

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                  .ackTimeout(10, TimeUnit.SECOND)

用户决策的重推 – NegativeAck

通过 AckTimeout 实现的重推,是 SDK 内部统一实现的,用户不能控制重推的行为,如果用户希望根据自己的使用场景,决定哪些消息需要重推,Pulsar 提供了 NegativeAck 的能力。

NegativeAck 和 AckTimeout 方式类似,有一个 NegativeAcksTracker 来管理消息的重推,NegativeAcksTracker 只会跟踪用户主动调用 NegativeAcknowledge() 方法的 MessageID,重推的逻辑也是通过 RedeliverUnackMessage 实现。

NegativeAck 可以设置 Redelivery 的 Delay 时间。

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)

使用的时候,需要明确调用。

// call the API to send negative acknowledgmentconsumer.negativeAcknowledge(message);

用户决策的重推 – RLQ

除了 NegativeAck 的方式,用户还可以通过重试队列( RLQ )来实现主动的消息重推,RLQ 一般会使用在用户暂时不能处理某些消息,并且希望之后再处理的场景。

Pulsar 提供了 ReconsumeLater() 方法来实现重试队列,和 Negative 不同的是,RLQ 会创建一个新的 Topic,Topic 的格式是 TopicName-SubscriptionName_RLQ , 每次 ReconsumeLater() 时,都会产生一个新的消息写入到 RLQ Topic 中,并且会对之前的消息 Ack。

设置了 RLQ 的 Consumer,SDK 内部默认会启动 RLQ 的订阅,所以 RLQ 的消息也会被 Consumer 消费到。

RLQ 是通过 DeadLetterPolicy 来配置的(DLQ 下文会解释)。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)     .topic("my-topic")     .subscriptionName("my-subscription")    .subscriptionType(SubscriptionType.Shared)     .enableRetry(true)    .deadLetterPolicy(DeadLetterPolicy.builder()     .maxRedeliverCount(maxRedeliveryCount)    .build())     .subscribe();

RLQ Topic 中的消息属性中会添加一下信息:

Special propertyDescription
REAL_TOPIC原始 Topic 名称
ORIGIN_MESSAGE_ID原始 MessageId
RECONSUMETIMES重复消费的次数
DELAY_TIME投递的延迟时间

RLQ 也需要主动调用: consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS)。

为重推次数加上限制–DLQ

对于数据持续处理失败,一直重试并不是一个很好的策略,此时死信队列(DLQ)就是一个比较好的选择,DLQ 允许用户将持续处理失败的数据写入到一个独立的 Dead Letter Topic 中,DLQ 的数据需要单独的订阅来消费。

DLQ Topic 的格式为 TopicName-SubscriptionName_DLQ。DLQ 需要为重试设置一个上限,当重试次数超过上限之后,就会被写入到 DLQ Topic 中。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)       .topic("my-topic")      .subscriptionName("my-subscription")      .subscriptionType(SubscriptionType.Shared)         .deadLetterPolicy(DeadLetterPolicy.builder()             .maxRedeliverCount(maxRedeliveryCount)             .build())         .subscribe();

几种重推和 DLQ 的关系

如果配置了 DLQ,那么使用 AckTimeout、NegativeAck 或者 ReconsumeLater 引起的数据重推都会触发 DLQ,也就是说重试的次数达到上限之后,都会被写入到 DLQ topic 里。

重试次数的统计有所区别:

AckTimeout 和 NegativeAck 都是通过 Redelivery 机制来计数的,SDK 发起 Redelivery 请求之后,Broker 侧的 RedeliveryTracker 会记录重推的次数,并且在推送给 Consumer 的 Message 中会包含 RedeliveryCount 的字段。

对于 RLQ,则是从 RECONSUMETIMES 属性中获取重复消费的次数,这个属性在 Client 生成,并且也是在 Client 计数。

总的来说,Apache Pulsar 提供了多种消息重推的方式,用户可以结合自己的场景,灵活使用,满足自己的业务需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/45689.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RTI无线电层析成像Matlab仿真数据生成

文章目录 概述初始化环境参数 概述 无线电层析成像是一种通过获取一定区域内多对相对固定的无线通信节点间的某种测量数据后,按照一定的数学处理方法,对区域内的障碍物目标以图像的形式 展现出来的成像技术。 开山之作&#xff1a; J. Wilson and N. Patwari, “Radio tomogra…

教师ChatGPT的23种用法

火爆全网的ChatGPT&#xff0c;作为教师应该如何正确使用&#xff1f;本文梳理了教师ChatGPT的23种用法&#xff0c;一起来看看吧&#xff01; 1、回答问题 ChatGPT可用于实时回答问题&#xff0c;使其成为需要快速获取信息的学生的有用工具。 从这个意义上说&#xff0c;Cha…

天气越热越不能开空调,这是什么道理?

如今正值盛夏&#xff0c;炎热的太阳仿佛要把人烤化。相信很多小伙伴一回到家都会迫不及待地打开空调&#xff0c;在干爽的凉风中完成“自我复活”。然而需要警惕的是&#xff0c;相对密闭的空调房其实早已“暗藏杀机”&#xff0c;VOC、细菌、灰尘等室内“健康杀手”在房间里不…

Latex | 将MATLAB图并导入Latex中的方法

一、问题描述 用Latex时写paper时&#xff0c;要导入MATLAB生成的图进去 二、解决思路 &#xff08;1&#xff09;在MATLAB生成图片的窗口中&#xff0c;导出.eps矢量图 &#xff08;2&#xff09;把图上传到overleaf的目录 &#xff08;3&#xff09;在文中添加相应代码 三…

[ 容器 ] consul 容器服务更新与发现

目录 什么是服务注册与发现什么是consulconsul 部署consul 服务器 registrator服务器consul-templateconsul 多节点 什么是服务注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可用性&#xff0c;也不考虑服务的压力承…

ftp和sftp区别,以及xftp的使用

网上找链接找的很辛苦对吧&#xff01; 网上下载的破解版还不用。而且用没多久又说要更新了&#xff0c;又得重新找。 这下直接把官方免费获取链接发给你&#xff0c;就不用在被这种事情麻烦了。 家庭/学校免费 - NetSarang Website (xshell.com):家庭/学校免费 - NetSarang W…

CAN bus off ——ISO11898

什么是can bus off&#xff1f; CAN总线关闭&#xff08;CAN bus off&#xff09;是指CAN节点进入一种错误状态&#xff0c;无法继续正常的数据通信。当一个CAN节点的错误计数器超过了设定的阈值时&#xff0c;该节点将进入CAN总线关闭状态。在这种状态下&#xff0c;该节点将停…

NoSQL之Redis配置与优化

目录 关系数据库和非关系数据库 关系型数据库 非关系型数据库 关系数据库和非关系数据库的区别 Redis安装部署 优点 Redis数据库常用命令 Redis持久化 Redis性能管理 非关系数据库产生背景 总结关系与非关系 了解redis redis优点 redis为什么这么快 1、线程池优化…

Doris(二) -通过外部表同步数据

前言 参考网址 1.官网 2.ODBC External Table Of Doris 3.Apache doris ODBC外表使用方式 第一步 创建 RESOURCE DROP RESOURCE IF EXISTS mysql_test_odbc; CREATE EXTERNAL RESOURCE mysql_test_odbc PROPERTIES ( "type" "odbc_catalog", "…

java 支持jsonschema

入参校验产品化 schema_xsd可视化编辑器_个人渣记录仅为自己搜索用的博客-CSDN博客 jsonchema的生成 支持v4的jackson-jsonSchema GitHub - mbknor/mbknor-jackson-jsonSchema: Generate JSON Schema with Polymorphism using Jackson annotations jackson-module-jsonSchema …

WPF实现DiagramChart

1、文件架构 2、FlowChartStencils.xaml <ResourceDictionary xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:s"clr-namespace:DiagramDesigner"xmlns:c&…

大语言模型LLM技术赋能软件项目管理和质量保障︱微软中国高级研发经理步绍鹏

微软中国高级研发经理步绍鹏先生受邀为由PMO评论主办的2023第十二届中国PMO大会演讲嘉宾&#xff0c;演讲议题&#xff1a;大语言模型LLM技术赋能软件项目管理和质量保障。大会将于8月12-13日在北京举办&#xff0c;敬请关注&#xff01; 议题内容简要&#xff1a; 本次分享将…

DSSAT模型教程

详情点击链接&#xff1a;R语言与作物模型&#xff08;DSSAT模型&#xff09;教程 前言 随着基于过程的作物生长模型&#xff08;Process-based Crop Growth Simulation Model&#xff09;的发展&#xff0c;R语言在作物生长模型和数据分析、挖掘和可视化中发挥着越来越重要的…

mac brew安装 node 踩坑日记- n切换node不生效

最近用了一个旧电脑开发&#xff0c;发现里面node管理混乱&#xff0c;有nvm、n和homebrew&#xff0c;导致切换node 切换不了&#xff0c;开发也有莫名其妙的错误。所以我打算重新装一下node&#xff0c;使用n做为管理工具。 1. 删除nvm cd ~ rm -rf .nvm2. 删除n sudo rm -…

只需要5个技巧,就能让你的独立站更吸引客户

人类很简单——我们都更喜欢第一眼看起来令人心情愉悦的的东西。独立站卖家自由度更高&#xff0c;可以添加很多令人惊叹的产品、调整更优惠的价格并设置多种语言选择来吸引访问者购买&#xff0c;但网站的设计仍然是大多数访问者判断独立站品牌的第一要素。根据可靠调研得知&a…

SpringBoot实战(二十二)集成 Sleuth、Zipkin

目录 一、简介1.Sleuth2.Zipkin 二、搭建 zipkin-server1.jar包启动2.docker启动3.自己搭建**Maven依赖**添加启动类注解 4.页面截图 三、搭建 sleuth-zipkin1.Maven 依赖2.yaml配置3.代码实现DemoController.javaDemoFeignClient.java 4.测试 一、简介 1.Sleuth 官方文档&am…

7-17-7.24 三维重建

3D重建 context capture中无人机航拍建模的基本原理 如何测量物体的实际长宽数据 1.添加用户控制点 2.添加范围约束&#xff0c;标定两个控制点之间的实际距离 无人机航拍中坐标和经纬度信息是如何确定的 pos数据由飞控系统在相机拍照时生成&#xff0c;与相片一一对应&#…

python与深度学习(五):CNN和手写数字识别

目录 1. 说明2. 卷积运算3. 填充4. 池化5. 卷积神经网络实战-手写数字识别的CNN模型5.1 导入相关库5.2 加载数据5.3 数据预处理5.4 数据处理5.5 构建网络模型5.6 模型编译5.7 模型训练、保存和评价5.8 模型测试5.9 模型训练结果的可视化 6. 手写数字识别的CNN模型可视化结果图7…

数学建模学习(2):数学建模各类常用的算法全解析

一、评价类算法 常见的评价算法 1.层次分析法 基本思想 是定性与定量相结合的多准则决策、评价方法。将决策的有关元素分解成 目标层、准则层和方案层 &#xff0c;并通过人们的 判断对决策方案的 优劣进行排序 &#xff0c;在此基础上进行定性和定量分析。它把人的思维过程…

AWS IAM介绍

前言 AWS是世界上最大的云服务提供商&#xff0c;它提供了很多组件供消费者使用&#xff0c;其中进行访问控制的组件叫做IAM(Identity and Access Management)&#xff0c; 用来进行身份验证和对AWS资源的访问控制。 功能 IAM的功能总结来看&#xff0c;主要分两种&#xff1…