Consumer的负载均衡

        想要提高Consumer的处理速度,可以启动多个Consumer并发处理,这个时候就涉及如何在多个Consumer之间负载均衡的问题,接下来结合源码分析Consumer的负载均衡实现。

要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

1.DefaultMQPushConsumer的负载均衡

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。

如图7-2所示,具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系影响负载均衡结果。

图7-2 RocketMQ客户端负载均衡策略

以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。

2.DefaultMQPullConsumer的负载均衡

Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。

DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,如代码清单7-5所示。

代码清单7-5 registerMessageQueueListener

Consumer.registerMessageQueueListener("TOPICNAME", new MessageQueue-Listener() {
public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) }

 

registerMessageQueueListener函数在有新的Consumer加入或退出时被触发。另一个辅助工具是MQPullConsumerScheduleService类,使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者,如代码清单7-6所示。

代码清单7-6 使用MQPullConsumerScheduleService示例

public class PullConsumerServiceTest {
    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPull-ConsumerScheduleService("PullConsumerService1");
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("localh-ost:9876");
        scheduleService.setMessageModel(MessageModel.CLUSTERING );
        scheduleService.registerPullTaskCallback("testPullConsumer", new PullTaskCallback() {
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer Consumer = context.getPullConsumer();
                try {
                    long Offset = Consumer.fetchConsumeOffset(mq, false);
                    if (Offset < 0)
                        Offset = 0;
                    PullResult pullResult = Consumer.pull(mq, "*", Offset, 32);
                    System.out.printf("%s%n", Offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    Consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    context.setPullNextDelayTimeMillis(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        scheduleService.start();
    }
}

 

然后我们看一看在MQPullConsumerScheduleService类的实现里,实现负载均衡的代码,如代码清单7-7所示。

代码清单7-7 MQPullConsumerScheduleService的负载均衡实现

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel MessageModel =
            MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
        switch (MessageModel) {
            case BROADCASTING:
                MQPullConsumerScheduleService.this.putTask(Topic, mqAll);
                break;
            case CLUSTERING :
                MQPullConsumerScheduleService.this.putTask(Topic, mqDivided);
                break;
            default:
                break;
        }
    }
}

 

从源码中可以看出,用户通过更改MessageQueueListenerImpl的实现来做自己的负载均衡策略。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/156075.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Accelerate 0.24.0文档 四:Megatron-LM

参考《Megatron-LM》 文章目录 一、Megatron-LM集成简介二、环境配置设置conda环境的步骤&#xff1a; 二、Accelerate Megatron-LM Plugin三、自定义训练过程四、检查点转换五、文本生成六、支持ROPE 、 ALiBi和Multi-Query Attention七、注意事项 一、Megatron-LM集成简介 在…

SystemVerilog学习(8)——包的使用

目录 一、包的定义 二、导出包的内容 1、可以通过域的索引符::号直接引用 2、可以指定索引一些需要的包中定义的类型到指定的容器中 3、通过通配符*来将包中所有的类别导入到指定容器中 三、包的使用 在进行本文的学习之前&#xff0c;首先需要对SV中类相关的内容有充分的认识…

mount /dev/mapper/centos-root on sysroot failed处理

今天发现centos7重启开不进去系统 通过查看日志主要告警如下 修复挂载目录 xfs_repair /dev/mapper/centos-root不行加-L参数 xfs_repair -L /dev/mapper/centos-root重启 reboot

2023年中国开式冷却塔应用现状及行业市场规模前景分析[图]

开式塔是目前应用最广、类型最多的一种冷却系统。循环水移走工艺介质或换热设备所散发的热量后成为热水&#xff0c;热水进入冷却塔后和空气直接接触&#xff0c;大部分热水得到冷却后&#xff0c;再循环使用。开式冷却塔又可以分为逆流式冷却塔和横流式冷却塔&#xff0c;按照…

基于Vue+SpringBoot的海南旅游景点推荐系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四、核心代码4.1 随机景点推荐4.2 景点评价4.3 协同推荐算法4.4 网站登录4.5 查询景点美食 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的海南旅游推荐系统&#xff…

【Linux】一

本文使用的是云服务器来获取Linux环境 (使用虚拟机同样可以学习使用命令), 并且介绍了常用的Linux 命令. 获取Linux环境 使用xshell连接到云服务器 1.新建会话 输入主机号(云服务器的外网ip) 2.输入用户名/密码 centos的用户名:root 密码就是在后台设置的 3.成功进入 ~描…

01ctfer 文件上传

01ctfer 文件上传 启动靶场 访问该地址 代码审计 <?php header("Content-Type:text/html; charsetutf-8"); // 每5分钟会清除一次目录下上传的文件 require_once(pclzip.lib.php);if(!$_FILES){echo <!DOCTYPE html> <html lang"zh">…

【Linux】Linux进程间通信(二)

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;Linux &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 上一篇博客&#xff1a;【Linux】…

VUE指令、computed计算属性和watch 侦听器(附带详细案例)

文章目录 前言一、指令补充1. 指令修饰符2. v-bind对于样式操作的增强 - class3. 案例 - 京东秒杀 tab 导航高亮4. v-bind对于样式操作的增强 - style5. v-model应用于其他表单元素 二、computed计算属性1. 基础语法2. 计算属性 vS method 方法3. 完整写法4. 成绩案例 三、watc…

android PopupWindow设置

记录一个小功能&#xff0c;使用场景&#xff0c;列表项点击弹出 如图&#xff1a; java类代码&#xff1a; public class PopupUtil extends PopupWindow {private Activity context;private View view;private ListView listView;private TextView m_tv_reminderm, m_tv_Wa…

4.1 Windows驱动开发:内核中进程与句柄互转

在内核开发中&#xff0c;经常需要进行进程和句柄之间的互相转换。进程通常由一个唯一的进程标识符&#xff08;PID&#xff09;来标识&#xff0c;而句柄是指对内核对象的引用。在Windows内核中&#xff0c;EProcess结构表示一个进程&#xff0c;而HANDLE是一个句柄。 为了实…

Java集合List报错,java.lang.UnsupportedOperationException

目录 一、点击Arrays.asList源码&#xff0c;一探究竟二、习惯了Arrays.asList&#xff0c;就是想用.add()添加元素&#xff0c;怎么办&#xff1f;三、又有一个同事&#xff0c;是这样写的四、重新点击Arrays.asList源码&#xff0c;一探究竟五、全是坑&#xff0c;怎么办&…

muduo源码剖析之TcpServer服务端

简介 TcpServer拥有Acceptor类&#xff0c;新连接到达时new TcpConnection后续客户端和TcpConnection类交互。TcpServer管理连接和启动线程池&#xff0c;用Acceptor接受连接。 服务端封装 - muduo的server端维护了多个tcpconnection 注意TcpServer本身不带Channel&#xff0…

element-plus使用el-date-picker组件时,如何禁止用户选择当前时间之后的日时分秒

element-plus使用el-date-picker组件时&#xff0c;如何禁止用户选择当前时间之后的日时分秒 例&#xff1a; 当前时间为2023-11-15 14.24&#xff0c;不能选择这之后的时分秒。&#xff08;禁止用户选择2023-11-15 14.28&#xff09; <el-date-pickerv-model"form.s…

腾讯混元大模型与GPT3.5代码能力对比

今日&#xff0c;别的事情不干&#xff0c;来使用一下"腾讯混元大模型"。对比一下"GPT3.5"&#xff0c;看看效果。据说"腾讯混元大模型"代码方面是强项&#xff0c;特意申请了一个来体验一波。 ✨本章仅以Python为主题&#xff0c;展开体验。最后…

解决Jira导出csv最大限度是1000的问题

JIRA为了防止过多影响性能&#xff0c; 设置了导出CSV的上线为1000&#xff0c;影响了搜索结果导出以及RestAPI。 可以通过以下配置参数修改此限制&#xff1a; 通过JIRA管理界面的"高级设置 “设置以下参数 系统管理 > 系统 > 一般设置>高级设置找到 jira.sea…

如何将图片转为excel或word?(客户端)

演示软件&#xff1a;金鸣表格文字识别大师3.6.1&#xff08;新版本界面可能会略有不同&#xff09; 第一部分 将图片转为excel或文表混合的word 一般的软件要将图片转为可编辑的excel&#xff0c;都需要待识别的图片要有明显清晰的表格线&#xff0c;但我们程序现已克服了这…

vue-组件生命周期+网络请求

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容:vue-组件生命周期网络请求 目录 组件生命周期 1. Vue的生命周期 2. Vue 子组件和父组件执行顺序…

消息通讯-MQTT WebHookSpringBoot案例

一、EMQX WebHook介绍 1、EMQX WebHook 是由 emqx_web_hook (opens new window)插件提供的将EMQX中的钩子事件通知到某个Web服务的功能。 2、WebHook 的内部实现是基于钩子&#xff0c;借助 Webhook 可以完成设备在线、上下线记录&#xff0c;订阅与消息存储、消息送达确认等诸…

【开源】基于Vue和SpringBoot的固始鹅块销售系统

项目编号&#xff1a; S 060 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S060&#xff0c;文末获取源码。} 项目编号&#xff1a;S060&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 鹅块类型模块2.3 固…