RocketMQ负载均衡机制解析

消费者在消费消息的时候,需要知道从Broker的哪一个消息队列中去获取消息。

所以,在消费者端必须要做负载均衡,即Broker端中多个消费队列分配给同一个消费者组中的哪些消费者消费。

RocketMQ中,在消费者端有一个:Rebalance负载均衡组件

  • 他负责相对均匀的给消费者分配需要拉取的队列信息。

消费者负载均衡

指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息。

  • 这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡。

而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。

RocketMQ5.0以前是按照队列粒度进行负载均衡的,5.0以后提供了按消息粒度进行负载均衡。

队列粒度负载均衡

队列粒度负载均衡策略中,同一消费者组内的多个消费者将按照队列粒度消费消息,每个队列只能被其中一个消费者消费。

队列粒度负载均衡是在每个消费者端进行的,并不是由某个节点统一进行负载均衡之后将分配结果通知到每个消费者。

消费者增加或者减少会影响消息队列的分配,所以Broker需要感知消费者的上下线情况。

消费者在启动时会向所有的Broker发送心跳包进行注册,通知Broker消费者上线,下线的时候也会向Broker发送取消注册的请求。

Broker会维护消费者信息的注册信息,在消费者发生变更时会通知消费者进行负载均衡。

Rebalance触发时机

消费者启动时触发:

消费者在启动时会进行一次负载均衡,为自己分配消息队列。

Broker发现消费组变更时触发:

处于以下两种情况之一时会被判断为消费组发生了变化,需要进行负载均衡:

  • 某个消费组内有新的消费者向Broker进行了注册。

    • 比如某个消费组原来有两个消费者,现在新增了一个消费者,新增的消费者启动时会向Broker发送注册请求。

  • 消费组订阅的主题信息发生了变化。

    • 比如消费组新增订阅了某个主题或者取消某个主题的订阅,会被判断为主题订阅信息发生了变化。

被判定为变化之后,会触发变更事件,向该消费者下的所有消费者发送发送变更请求,通知组下每个消费者进行负载均衡。

Broker收到消费者下线时触发:

如果有消费者向Broker发送UNREGISTER_CLIENT取消注册请求,并且开启了允许通知变更,会触发变更事件。

变更事件同上,Broker会通知该消费者组下的所有消费者进行一次负载均衡。

消费者定时触发:

消费者本身也会定时执行负载均衡,默认是20s执行一次。

消息粒度负载均衡

RocketMQ5.0之后,增加了消息粒度负载均衡策略,默认且仅使用消息粒度负载均衡策略。

消息粒度负载均衡策略中,同一消费组内的多个消费者将按照消息粒度平均分摊主题中的所有消息。

  • 即同一个队列中的消息,可被平均分配给组内多个消费者共同消费。

消息粒度负载均衡策略保证同一个队列的消息可以被组内多个消费者共同处理。

但是该策略使用的消息分配算法结果是随机的,不能指定消息被哪一个特定的消费者处理。

  • 当消费者获取到某条消息后,服务端会对该消息加锁,保证该消息对其他消费者不可见,直到消息消费成功或者超时。

所以多个消费者同时消费同一个消息队列中的消息,服务端也可以保证消息不会被多个消费者重复消费。

消息粒度负载均衡策略适用于绝大多数在线处理的业务场景,对于流式处理、聚合计算等场景,更适合队列粒度的负载均衡策略。

执行流程

负载均衡服务执行逻辑在doRebalance函数,里面会对每个消费者组执行负载均衡操作。

consumerTable这个map对象里存储了消费者组对应的的消费者实例。

private ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
 
public void doRebalance() {
    //每个消费者组都有负载均衡
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

由于每个消费者组可能会消费很多topic,每个topic都有自己的不同队列,最终是按topic的维度进行负载均衡。

public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //按topic维度执行负载均衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}

最终负载均衡逻辑处理的实现在:

  • org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

其中分为广播消息和集群消息模型两种情况处理。

负载均衡核心功能的主流程,主要做了4件事情:

负载均衡策略原理

负载均衡策略顶层接口:

 
/**
 * Strategy Algorithm for message allocating between consumers
 */
public interface AllocateMessageQueueStrategy {
 
    /**
     * Allocating by consumer id
     * 给消费者id分配消费队列
     */
    List<MessageQueue> allocate(
        final String consumerGroup, //消费者组
        final String currentCID, //当前消费者id
        final List<MessageQueue> mqAll, //所有的队列
        final List<String> cidAll //所有的消费者
    );
 
}

他默认共有7种负载均衡策略实现。

最常用的两种平均分配算法:

AllocateMessageQueueAveragely

是用总数除以消费者个数,余数按消费者顺序分配给消费者。

AlocateMessageQueueAveragelyByCircle

轮流一个一个分配。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/925064.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

架构-微服务-环境搭建

文章目录 前言一、案例准备1. 技术选型2. 模块设计3. 微服务调用 二、创建父工程三、创建基础模块四、创建用户微服务五、创建商品微服务六、创建订单微服务 前言 ‌微服务环境搭建‌ 使用的电商项目中的商品、订单、用户为案例进行讲解。 一、案例准备 1. 技术选型 maven&a…

Microsoft Excel如何插入多行

1.打开要编辑的excel表&#xff0c;在指定位置&#xff0c;鼠标右键点击“插入”一行 2.按住shift键&#xff0c;鼠标的光标箭头会变化成如下图所示 3.一直按住shift键和鼠标左键&#xff0c;往下拖动&#xff0c;直至到插入足够的行

【K8s】专题十五(4):Kubernetes 网络之 Calico 插件安装、切换网络模式、卸载

本文内容均来自个人笔记并重新梳理&#xff0c;如有错误欢迎指正&#xff01; 如果对您有帮助&#xff0c;烦请点赞、关注、转发、订阅专栏&#xff01; 专栏订阅入口 | 精选文章 | Kubernetes | Docker | Linux | 羊毛资源 | 工具推荐 | 往期精彩文章 【Docker】&#xff08;全…

Web 端语音对话 AI 示例:使用 Whisper 和 llama.cpp 构建语音聊天机器人

大语言模型&#xff08;LLM&#xff09;为基于文本的对话提供了强大的能力。那么&#xff0c;能否进一步扩展&#xff0c;将其转化为语音对话的形式呢&#xff1f;本文将展示如何使用 Whisper 语音识别和 llama.cpp 构建一个 Web 端语音聊天机器人。 系统概览 如上图所示&…

Excel把其中一张工作表导出成一个新的文件

excel导出一张工作表 一个Excel表里有多个工作表&#xff0c;怎么才能导出一个工作表&#xff0c;让其生成新的Excel文件呢&#xff1f; 第一步&#xff1a;首先打开Excel表格&#xff0c;然后选择要导出的工作表的名字&#xff0c;比如“Sheet1”&#xff0c;把鼠标放到“She…

Django+Nginx+uwsgi网站Channels+redis+daphne多人在线聊天实现粘贴上传图片

在DjangoNginxuwsgi网站Channelsredisdaphne多人在线的基础上&#xff08;详见DjangoNginxuwsgi网站使用Channelsredisdaphne实现简单的多人在线聊天及消息存储功能-CSDN博客&#xff09;&#xff0c;实现在输入框粘贴或打开本地图片&#xff0c;上传到网站后返回图片路径&…

【Git】Git 命令参考手册

目录 Git 命令参考手册1. 创建仓库1.1 创建一个新的本地仓库1.2 克隆一个仓库1.3 克隆仓库到指定目录 2. 提交更改2.1 显示工作目录中已修改的文件&#xff0c;准备提交2.2 将文件添加到暂存区&#xff0c;准备提交2.3 将所有已修改的文件添加到暂存区&#xff0c;准备提交2.4 …

TDengine在debian安装

参考官网文档&#xff1a; 官网安装文档链接 从列表中下载获得 Deb 安装包&#xff1b; TDengine-server-3.3.4.3-Linux-x64.deb (61 M) 进入到安装包所在目录&#xff0c;执行如下的安装命令&#xff1a; sudo dpkg -i TDengine-server-<version>-Linux-x64.debNOTE 当…

linux安装mysql8.0.40

一、下载MySQL安装包 1.查看glibc版本 rpm -qa | grep glibc 2.到mysql官网下载安装包 ​ 二、解压安装 1.上传压缩包纸/usr/local 目录下&#xff0c;解压&#xff1a; tar -xvf mysql-8.0.40-linux-glibc2.17-x86_64.tar.xz 2.重命名&#xff1a; mv mysql-8.0.40-linux-…

用Pycharm安装manim

由于版本和工具的差异&#xff0c;manim的安装方式不尽相同。本文用Pycharm来安装manim. 一、准备工作&#xff1a;安装相应版本的python、pycharm和ffmpeg. 此处提供一种安装ffmpeg的方式 下载地址&#xff1a;FFmpeg 下载后&#xff0c;解压到指定目录。 配置环境变量&am…

运维面试整理总结

面试题可以参考:面试题总结 查看系统相关信息 查看系统登陆成功与失败记录 成功&#xff1a;last失败&#xff1a;lastb 查看二进制文件 hexdump查看进程端口或连接 netstat -nltp ss -nltp补充&#xff1a;pidof与lsof命令 pidof [进程名] #根据 进程名 查询进程id ls…

Kubernetes 之 Ingress 和 Service 的异同点

1. 概念与作用 1.1 Ingress Ingress 是什么&#xff1f; Ingress主要负责七层负载&#xff0c;将外部 HTTP/HTTPS 请求路由到集群内部的服务。它可以基于域名和路径定义规则&#xff0c;从而将外部请求分配到不同的服务。 ingress作用 提供 基于 HTTP/HTTPS 的路由。 支持 …

初识 Django

声明 适用于想要快速入门的开发者&#xff0c;有前后端开发以及语言基础&#xff0c;想要学习语法或者特性。 想要学会快速开发&#xff0c;快速入门&#xff0c;请看博客【实用向】Django 框架入门并结合本篇文章。 命令 命令描述startproject创建一个 Django 项目startapp…

python简单算法

冒泡 def boll(lis):i 0while i<len(lis)-1:j 0while j<len(lis)-1-i:if lis[j] > lis[j1]:lis[j],lis[j 1] lis[j1],lis[j]j1i1选择排序 def selct1(lit):i 0while i<len(lit)-1:j i1min1 iwhile j < len(lit):if lit[j] < lit[min1]:min1 jj 1li…

【大模型】基于LLaMA-Factory的模型高效微调

LLaMA-Factory项目介绍 LLaMA Factory 是一个简单易用且高效的大型语言模型&#xff08;Large Language Model&#xff09;训练与微调平台。通过 LLaMA Factory&#xff0c;可以在无需编写任何代码的前提下&#xff0c;在本地完成上百种预训练模型的微调&#xff0c;框架特性包…

电脑中的vcruntime140_1.dll文件有问题要怎么解决?一键修复vcruntime140_1.dll

遇到“vcruntime140_1.dll无法继续执行代码”的错误通常表明电脑中的vcruntime140_1.dll文件有问题。这个文件属于Visual C Redistributable&#xff0c;对很多程序的运行至关重要。本文将提供几个步骤&#xff0c;帮助你迅速修复这一错误&#xff0c;使电脑恢复正常工作状态。…

鼠标前进后退键改双击,键盘映射(AutoHotkey)

初衷&#xff1a; 1.大部分鼠标为不可自定义按键&#xff0c;可以自定义的又很贵。 鼠标左键是双击是很频类很高的操作&#xff0c;鼠标前进/后退按键个人感觉使用频率很低&#xff0c;因此把鼠标前进/后退改为双击还是很合适的。 2.有些短款的键盘没有Home或End键&#xff0c;…

VSCode Terminal无法运行node以及node-gyp等指令

无法使用node指令&#xff0c;使用管理员权限启动VSCode即可&#xff0c;或者右键VSCode属性&#xff0c;修改兼容性中使用管理员权限打开。 运行node-gyp等指令出现因为在此系统上禁止运行脚本。有关详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/?LinkID1351…

npm install -g@vue/cli报错解决:npm error code ENOENT npm error syscall open

这里写目录标题 报错信息1解决方案 报错信息2解决方案 报错信息1 使用npm install -gvue/cli时&#xff0c;发生报错&#xff0c;报错图片如下&#xff1a; 根据报错信息可以知道&#xff0c;缺少package.json文件。 解决方案 缺什么补什么&#xff0c;这里我们使用命令npm…

Elasticsearch:Retrievers 介绍

检索器&#xff08;retrievers&#xff09;是 Elasticsearch 中搜索 API 中添加的新抽象层。它们提供了在单个 _search API 调用中配置多阶段检索管道的便利。此架构通过消除对复杂搜索查询的多个 Elasticsearch API 调用的需求&#xff0c;简化了应用程序中的搜索逻辑。它还减…