RocketMQ底层通信机制

        分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的TCP通信协议是很有挑战的,本节介绍RocketMQ是如何解决这个问题的。

1.Remoting模块

RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构,如图4-1所示。


 

图4-1 Remoting模块的类继承关系

RemotingService为最上层接口,定义了三个方法:

·void start();

·void shutdown();

·void registerRPCHook(RPCHook rpcHook);

RemotingClient和RemotingServer继承RemotingService接口,并增加了自己特有的方法。RemotingClient的主要函数定义如代码清单4-6所示。

代码清单4-6 RemotingClient主要函数定义

void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);
RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis);
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback);
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis);
void updateNameServerAddressList(final List<String> addrs);

 

然后看看具体的实现类,NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer,而且都继承了NettyRemotingAbstract类。通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成,各个模块间的通信实现简洁明了。比如NameServer模块中,NameServerController有一个remotingServer变量,NameServer在启动时初始化各个变量,然后启动remotingServer即可,剩下NameServer要做的是专心实现处理RemotingCommand的逻辑,如代码清单4-7所示。

代码清单4-7 NameServer处理主流程代码

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }
    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

 

在Consumer的源码中,获取消息的底层通信部分同样发送一个RemotingCommand请求,返回的response也是个RemotingCommand类型,如代码清单4-8所示。

代码清单4-8 Consumer请求消息底层实现代码

private PullResult pullMessageSync(//
    final String addr, // 1
    final RemotingCommand request, // 2
    final long timeoutMillis// 3
) throws RemotingException, InterruptedException, MQBrokerException {
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processPullResponse(response);
}

 

从源码中可以看出,RocketMQ中复杂的通信过程,被RemotingCommand统一起来,大部分的逻辑都是通过发送、接受并处理Command来完成的。

2.协议设计和编解码

        RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换。协议格式如图4-2所示。

图4-2 RocketMQ的通信协议

1)第一部分是大端4个字节整数,值等于第二、三、四部分长度的总和;

2)第二部分是大端4个字节整数,值等于第三部分的长度;

3)第三部分是通过Json序列化的数据;

4)第四部分是通过应用自定义二进制序列化的数据。

消息的解码过程在RemotingCommand的decode函数里,如代码清单4-9所示。

代码清单4-9 消息解码函数

ublic static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    int headerLength = getHeaderLength(oriHeaderLen);
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);
    RemotingCommand cmd = headerDecode(headerData, getProtocolType (oriHeaderLen));
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;
    return cmd;
}

 

对应的消息编码过程在RemotingCommand的encode函数中,如代码清单4-10所示。

代码清单4-10 消息编码函数

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;
    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;
    // 3> body data length
    if (this.body != null) {
        length += body.length;
    }
    ByteBuffer result = ByteBuffer.allocate(4 + length);
    // length
    result.putInt(length);
    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    // header data
    result.put(headerData);
    // body data;
    if (this.body != null) {
        result.put(this.body);
    }
    result.flip();
    return result;
}

 

3.Netty库

        RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket、NIO等复杂细节,用户只需用好Netty,就可以实现一个“网络编程专家+并发编程专家”水平的Server、Client网络程序。应用Netty有一定的门槛,需要了解它的EventLoopGroup、Channel、Handler模型以及各种具体的配置。RocketMQ利用Netty实现的通信类是NettyRemotingServer和NettyRemotingClient,用户也可以参考这两个类的实现来学习使用Netty。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/142910.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电脑技巧:推荐基于浏览器的远程桌面访问控制工具

一、软件简介 Getscreen.me是一个基于浏览器的远程桌面访问控制工具&#xff0c;可以轻松地远程访问控制特定设备。并且注册登录账户实现允许设置具有永久访问权限的设备&#xff0c;可以通过一键进行快速连接访问&#xff0c;无需共享 ID、密码或任何内容。 Getscreen.me采用…

7.jvm对象内存布局

目录 概述对象里的三个区对象头验证代码控制台输出分析 验证2代码控制台输出 实例数据对其填充 访问对象结束 概述 jvm对象内存布局详解。 相关文章在此总结如下&#xff1a; 文章地址jvm基本知识地址jvm类加载系统地址双亲委派模型与打破双亲委派地址运行时数据区地址运行时数…

光明源@为什么需要智慧厕所,智慧厕所是干什么的?

在当今数字化时代&#xff0c;城市的发展日新月异&#xff0c;城市居民对生活品质和城市服务的期望也与日俱增。在城市规划和基础设施建设中&#xff0c;智慧厕所作为一项创新性的举措&#xff0c;正逐渐崭露头角。本文将探讨为什么需要智慧厕所以及它们的实际功能和意义。 城市…

SAP中销售业务的查询修改及冲销操作手册

目的 物流在销售订单发货开票出问题时进行查询分析及处理冲销的相关操作 触发条件 销售业务出现变更导致需要重新做销售或人为错误 必要条件 订单&#xff0c;交货单&#xff0c;发票己完成并过账 有用提示 在实际冲销业务过程中需要去分析&#xff0c;在了解业务的情况下去…

asp.net实验管理系统VS开发sqlserver数据库web结构c#编程web网页设计

一、源码特点 asp.net 实验管理系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言开发。 asp.net实验管理系统1 应用技术&am…

P6入门:项目初始化6-项目详情之资金Funding

前言 使用项目详细信息查看和编辑有关所选项目的详细信息&#xff0c;在项目创建完成后&#xff0c;初始化项目是一项非常重要的工作&#xff0c;涉及需要设置的内容包括项目名&#xff0c;ID,责任人&#xff0c;日历&#xff0c;预算&#xff0c;资金&#xff0c;分类码等等&…

百度文心一言

1分钟了解一言是谁&#xff1f; 一句话介绍【文心一言】 我是百度研发的人工智能模型&#xff0c;任何人都可以通过输入【指令】和我进行互动&#xff0c;对我提出问题或要求&#xff0c;我能高效地帮助你们获取信息、知识和灵感哦 什么是指令&#xff1f;我该怎么和你互动&am…

SAP-SD-外向交货单交期不符

创建外向交货单时报错 销售订单的交期还没到&#xff0c;所以不能做外向交货单 但是货已经加工完成&#xff0c;现在想交货 查看销售订单的交货期为12月15日&#xff08;va03&#xff09; 在VL01N里修改“选择日期为12月15日”就可以了。

7天入门python系列之第五天python项目练习

第七天 Python项目实操 编者打算开一个python 初学主题的系列文章&#xff0c;用于指导想要学习python的同学。关于文章有任何疑问都可以私信作者。对于初学者想在7天内入门Python&#xff0c;这是一个紧凑的学习计划。但并不是不可完成的。 学到第7天说明你已经对python有了一…

11.13堆的各种操作算法,二叉树的一些性质

算法 二叉堆的上调 在树上进行的插入排序 。循环次数不会超过树的高度&#xff0c;即插入交换次数不会超过ologn&#xff0c;n是结点个数 要么到根节点&#xff0c;即i1结束&#xff0c;要么当前元素还比上面的元素小&#xff0c;直到不比上面的元素小&#xff0c;即h[i/2]&l…

关于三维模型几何坐标修正的技术方法探究

关于三维模型几何坐标修正的技术方法探究 倾斜摄影三维模型的几何坐标修正是保证模型准确性和一致性的重要步骤。下面将探讨几种常见的技术方法用于倾斜摄影三维模型几何坐标修正。 1、块内坐标转换&#xff1a;在倾斜摄影中&#xff0c;可以将整个场景划分为多个块&#xff0…

k8s的service自动发现服务:实战版

Service服务发现的必要性: 对于kubernetes整个集群来说&#xff0c;Pod的地址也可变的&#xff0c;也就是说如果一个Pod因为某些原因退出了&#xff0c;而由于其设置了副本数replicas大于1&#xff0c;那么该Pod就会在集群的任意节点重新启动&#xff0c;这个重新启动的Pod的I…

LLM App ≈ 数据ETL管线

虽然现有的 LLM 应用程序工具&#xff08;例如 LangChain 和 LlamaIndex&#xff09;对于构建 LLM 应用程序非常有用&#xff0c;但在初始实验之外不建议使用它们的数据加载功能。 当我构建和测试我的LLM应用程序管道时&#xff0c;我能够感受到一些尚未开发和破解的方面的痛苦…

Spring事务之AOP导致事务失效问题

情况说明 首先开启了AOP&#xff0c;并且同时开启了事务。下面这个TransactionAspect就是一个简单的AOP切面&#xff0c;有一个Around通知。 Aspect Component public class TransactionAspect {Pointcut("execution(* com.qhyu.cloud.datasource.service.TransactionSe…

基于 Letterize.js + Anime.js 实现炫酷文本特效

如上面gif动图所示&#xff0c;这是一个很炫酷的文字动画效果&#xff0c;文字的每个字符呈波浪式的扩散式展开。本次文章将解读如何实现这个炫酷的文字效果。 基于以上的截图效果可以分析出以下是本次要实现的主要几点&#xff1a; 文案呈圆环状扩散开&#xff0c;扩散的同时…

YOLOv8-Seg改进:分割注意力系列篇 | 轻量级上采样CARAFE算子,助力小目标分割

🚀🚀🚀本文改进:轻量级上采样CARAFE算子,引入到YOLOv8,neck处 Upsanple替换为CARAFE 🚀🚀🚀CARAFE在小目标分割领域中应用广泛 🚀🚀🚀YOLOv8-seg创新专栏:http://t.csdnimg.cn/KLSdv 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1)手把手教…

LeetCode | 20. 有效的括号

LeetCode | 20. 有效的括号 OJ链接 这道题可以使用栈来解决问题~~ 思路&#xff1a; 首先我们要使用我们之前写的栈的实现来解决此问题~~如果左括号&#xff0c;就入栈如果右括号&#xff0c;出栈顶的左括号跟右括号判断是否匹配 如果匹配&#xff0c;继续如果不匹配&#…

算法笔记-第五章-质因子分解

算法笔记-第五章-质因子分解 小试牛刀质因子2的个数丑数 质因子分解最小最大质因子约数个数 小试牛刀 质因子2的个数 #include<cstdio> int main() {int n; scanf_s("%d", &n); int count 0; while (n % 2 0) {count; n / 2; }printf("%…

【Mysql系列】Mysql基础篇

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

部署 KVM 虚拟化平台

虚拟化技术的演变过程分为软件模拟、虚拟化层翻译、容器虚拟化三个阶段 1 软件模拟的技术方式 软件模拟是通过软件完全模拟CPU、网卡、芯片组、磁盘等计算机硬件&#xff0c;因为是软件模拟&#xff0c;所以理论上可以模拟任何硬件&#xff0c;甚至不存在的硬件。但是由于是软…