架构师系列-消息中间件(九)- RocketMQ 进阶(三)-消费端消息保障

5.2 消费端保障
5.2.1 注意幂等性

应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样,这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。

“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系,要做到不丢消息就无法避免消息重复消费,原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误,从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错,为了确保不丢消息,重发消息是唯一的选择。

有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。

5.2.2 消息消费模式

从不同的维度划分,Consumer支持以下消费模式:

  • 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;
  • 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。
5.2.2.1 集群消费

使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点

 

注意事项

  • 消费端集群化部署, 每条消息只需要被处理一次。
  • 由于消费进度在服务端维护, 可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
5.2.2.2 广播消费

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

 

注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
5.2.2.3 集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

 

注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。
5.2.3 消息消费模式

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

  • 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
  • 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。
5.2.3.1 推模式(PUSH)

我们上面使用的消费者都是PUSH模式,也是最常用的消费模式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

实现方式,代码上使用 DefaultMQPushConsumer

consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。

5.2.3.2 拉模式(PULL)

RocketMQ的PUSH模式是由PULL模式来实现的

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。

5.2.3.3 注意事项

注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer

DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

 

 

DefaultLitePullConsumer

该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理

5.2.4 消息确认机制

consumer的每个实例是靠队列分配来决定如何消费消息的,那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)

为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

5.2.4.1 确认消费

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
        execute();//执行真正消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
})
5.2.4.2 消费异常

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
        execute();//执行真正消费
        return ConsumeConcurrentlyStatus.RECONSUME_LATER
    }
})

 

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。

5.2.5 消息重试机制
5.2.5.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

5.2.5.2 无序消息的重试

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

5.2.5.3 重试次数

消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。

5.2.5.4 和生产端重试区别

消费者和生产者的重试还是有区别的,主要有两点

  • 默认重试次数:Product默认是2次,而Consumer默认是16次
  • 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。

注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。

5.2.5.5 重试配置方式

需要重试

消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 方式1:返回RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常

无需重试

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        //消息处理逻辑抛出异常,消息将重试。
        try {
            doConsumeMessage(list);
        }catch (Exception e){
            //捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        //业务方正常消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

 

5.3 死信队列

在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

5.3.1 死信特性
5.3.1.1 死信消息特性
  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除,故死信消息应在产生的 3 天内及时处理
5.3.1.2 死信队列特性
  • 一个死信队列对应一个消费者组,而不是对应单个消费者实例
  • 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
  • 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列

6. Redis 轮询队列

redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中

 

6.1 相关代码
6.1.1 redis获取车辆

从list左侧弹出一个车辆

 

/**
  * 从Redis List列表中拿取一个车辆ID
  * 如果没有获取到延时10S
  *
  * @return
  */
public String takeVehicle() {
    //从Redis List列表中拿取一个车辆ID
    return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}
6.1.2 redis压入车辆

检查车辆状态,并从右侧压入车辆

/**
  * 设置车辆状态为Ready
  *
  * @param vehicleId
  */
public void readyDispatch(String vehicleId) {
    //检查车辆状态
    DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
    //如果车辆时运行状态
    if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
        redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
        //从右侧压入车辆
        redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/574234.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开启医疗数据新纪元:山海鲸可视化智慧医疗解决方案

在数字化浪潮席卷而来的今天&#xff0c;智慧医疗作为医疗行业的创新力量&#xff0c;正以其独特的技术优势&#xff0c;推动着医疗服务的升级和变革。而在这场变革中&#xff0c;山海鲸可视化以其出色的数据可视化能力&#xff0c;为智慧医疗提供了强大的技术支持&#xff0c;…

用Python和Pygame实现简单贪吃蛇游戏

1.pip安装pygame pygam插件安装 pip install 插件名字 # 安装 pip uninstall 插件名字 # 卸载 pip install 插件名字 -i 指定下载的镜像网址 pip show 插件名字 # 查看插件名字 pip install pygame -i https://pypi.tuna.tsinghua.edu.cn/simple pip show p…

【网络编程】网络编程概念 | TCP和UDP的区别 | UDP数据报套接字编程 | Socket

文章目录 网络编程一、什么是网络编程1.TCP和UDP的区别 二、UDP数据报套接字编程DatagramSocketDatagramPacket回显服务器&#xff08;echo server&#xff09; 网络编程 一、什么是网络编程 通过网络&#xff0c;让两个主机之间能够进行通信。基于通信来完成一定的功能。 ​…

MacOS 下gif 文件的几种压缩方法

categories: Tips tags: Tips GIF 写在前面 最近想转换几个 tg 的 tgs 文件到 gif, 然后上传到微信, 所以又涉及到了 gif 的操作了. 工具介绍 安装 brew install imagemagick gifsicleimagemagick 是专业的图像处理工具, gifsicle 是专门处理 gif 的小工具 ,都是开源的. …

C++之AVL树的使用以及原理详解

1.AVL树 1.1AVL树的概念 1.2AVL树的定义 1.3AVL树的插入 1.4AVL树的旋转 1. 右单旋 2. 左单旋 3. 左右双旋 4. 右左双旋 1.5AVL树的验证 1.6AVL的实现 在之前对map/multimap/set/multiset进行了简单的介绍&#xff08;C之map_set的使用-CSDN博客&#xff09;&#xff0c;…

说说2024年暑期三下乡社会实践工作新闻投稿经验

作为一名在校大学生,我有幸自去年起参与学院组织的暑期大学生三下乡社会实践团活动。这项活动不仅是我们深入基层、服务社会的重要平台,也是展现当代大学生风采、传递青春正能量的有效途径。然而,如何将这些生动鲜活的实践故事、感人至深的瞬间传播出去,让更多人了解并受到启发…

火绒安全的应用介绍

火绒安全软件是一款集成了杀毒、防御和管控功能的安全软件&#xff0c;旨在为用户提供全面的计算机安全保障。以下是火绒安全软件的一些详细介绍&#xff1a; 系统兼容性强&#xff1a;该软件支持多种操作系统&#xff0c;包括Windows 11、Windows 10、Windows 8、Windows 7、…

xgp加速器免费 微软商店xgp用什么加速器

2001年11月14日深夜&#xff0c;比尔盖茨亲自来到时代广场&#xff0c;在午夜时分将第一台Xbox交给了来自新泽西的20岁年轻人爱德华格拉克曼&#xff0c;后者在回忆中说&#xff1a;“比尔盖茨就是上帝。”性能超越顶级PC的Xbox让他们趋之若鹜。2000年3月10日&#xff0c;微软宣…

25-代码随想录第454题.四数相加II

25-代码随想录第454题.四数相加II 给定四个包含整数的数组列表 A , B , C , D ,计算有多少个元组 (i, j, k, l) &#xff0c;使得 A[i] B[j] C[k] D[l] 0。 为了使问题简单化&#xff0c;所有的 A, B, C, D 具有相同的长度 N&#xff0c;且 0 ≤ N ≤ 500 。所有整数的范…

python 笔记ast.literal_eval

1 介绍 ast.literal_eval 是 Python 标准库 ast 模块中的一个函数&#xff0c;用于安全地评估表示 Python 字面量或容器&#xff08;如列表、字典、元组、集合&#xff09;的字符串 import ast # 解析并执行一个数字表达式 num ast.literal_eval("3.14") prin…

OpenFeign微服务调用组件!!!

1.Feign是什么 GitHub - OpenFeign/feign: Feign makes writing java http clients easierFeign makes writing java http clients easier. Contribute to OpenFeign/feign development by creating an account on GitHub.https://github.com/OpenFeign/feignFeign是Netflix开…

项目十一:爬取热搜榜(小白实战级)

首先&#xff0c;恭喜各位也恭喜自已学习爬虫基础到达圆满级&#xff0c;今后的自已python爬虫之旅会随着网络发展而不断进步。回想起来&#xff0c;我学过请求库requests模块、解析库re模块、lmxl模块到数据保存的基本应用方法&#xff0c;这一次的学习python爬虫之旅收获很多…

Vu3+QuaggaJs实现web页面识别条形码

一、什么是QuaggaJs QuaggaJS是一个基于JavaScript的开源图像识别库&#xff0c;可用于识别条形码。 QuaggaJs的作用主要体现在以下几个方面&#xff1a; 实时图像处理与识别&#xff1a;QuaggaJs是一款基于JavaScript的开源库&#xff0c;它允许在Web浏览器中实现实时的图像…

ASP.NET Core 3 高级编程(第8版) 学习笔记 03

本篇介绍原书的第 18 章&#xff0c;为 19 章 Restful Service 编写基础代码。本章实现了如下内容&#xff1a; 1&#xff09;使用 Entity Framework Core 操作 Sql Server 数据库 2&#xff09;Entity Framework Core 数据库迁移和使用种子数据的方法 3&#xff09;使用中间件…

Qt Quick centerIn和fill 的用法

1&#xff09;Qt Quick centerIn和fill 的用法&#xff1a; import QtQuick 2.5 Rectangle { width:300; height:200; Rectangle { color: "blue"; anchors.fill: parent; border.width: 6; border.co…

详解工业网关在线探测功能及用途

工业网关专为工业物联网应用设计&#xff0c;可实现包括不同通讯协议之间的兼容和转换&#xff0c;提供软硬件加密保障工业数据安全传输&#xff0c;发挥强大算力实现数据边缘预处理&#xff0c;联动联调工业网络设备实现高效协同等。在线探测功能是佰马工业网关的一项重要功能…

unity学习(89)——unity塞满c盘!--删除editor下的log文件

卸了一个视频后强制续命打开详细信息&#xff1a; 这个再往下找也是没用的&#xff01; 显示隐藏文件夹后&#xff01;执行如下操作&#xff01; 30个g&#xff01; 其中unity占23g editer占了21g 删除C:\Users\王栋林\AppData\Local\Unity\Editor下的log文件 恢复到之前的水…

使用 Flask 和 WTForms 构建一个用户注册表单

在这篇技术博客中&#xff0c;我们将使用 Flask 和 WTForms 库来构建一个用户注册表单。我们将创建一个简单的 Flask 应用&#xff0c;并使用 WTForms 定义一个注册表单&#xff0c;包括用户名、密码、确认密码、邮箱、性别、城市和爱好等字段。我们还将为表单添加验证规则&…

【C 数据结构】图

文章目录 【 1. 基本原理 】1.1 无向图1.2 有向图1.3 基本知识 【 2. 图的存储结构 】2.1 完全图2.2 稀疏图和稠密图2.3 连通图2.3.1 (普通)连通图连通图 - 无向图非连通图 的 连通分量 2.3.2 强连通图强连通图 - 有向图非强连通有向图 的 强连通分量 2.3.3 生成树 - 连通图2.3…

美区视频带货“一哥”,一周销量狂干三十万美金!

“超店有数显示&#xff0c;Tybuggy上周带货狂揽34.3万美金&#xff0c;超出第二名近30倍。” TikTok风波再起&#xff0c;4月17日&#xff0c;美众议院推出援乌援以军事议案&#xff0c;值得注意的是&#xff0c;TikTok剥离法案被“打包”夹带其中&#xff0c;以此加大再参议…