Apache Pulsar源码解析之Lookup机制

引言

在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧

Lookup是什么

在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种

  • 应该跟哪台Broker建立连接
  • Topic的Schema信息
  • Topic的分区信息

其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
在这里插入图片描述

  1. 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
  2. Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
  3. 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
  4. 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
  5. 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入

补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。

客户端实现原理

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                        ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
				....
        //这里进去就是创建跟Broker的网络连接
        grabCnx();
    }

    void grabCnx() {
      	//实际上是调用ConnectionHandler进行的
        this.connectionHandler.grabCnx();
    }

		protected void grabCnx(Optional<URI> hostURI) {
  		....
      //这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法
      cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
  		....
    }


    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
      	//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

    public LookupService getLookup(String serviceUrl) {
        return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
            try {
              	//忽略其他的,直接跟这里进去
                return createLookup(serviceUrl);
            } catch (PulsarClientException e) {
                log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
                throw new IllegalStateException("Failed to update url " + url);
            }
        });
    }

    public LookupService createLookup(String url) throws PulsarClientException {
      	//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询
        if (url.startsWith("http")) {
            return new HttpLookupService(conf, eventLoopGroup);
        } else {
            return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }
    }

		
    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
            throws PulsarClientException {
      	//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数
        this.httpClient = new HttpClient(conf, eventLoopGroup);
        this.useTls = conf.isUseTls();
        this.listenerName = conf.getListenerName();
    }

    protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
			....
      //可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包
      httpClient = new DefaultAsyncHttpClient(config);
			....
    }

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

   public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
        //判断访问哪个版本的接口
        String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
        String path = basePath + topicName.getLookupName();
        path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
        //获取要访问的Broker地址
        return httpClient.get(path, LookupData.class)
                .thenCompose(lookupData -> {
            URI uri = null;
            try {
              	//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口
                InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                //HTTP通过Lookup方式访问服务端绝对不会走代理
                return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
                        false /* HTTP lookups never use the proxy */));
            } catch (Exception e) {
							....
            }
        });
    }

public class LookupTopicResult {
  	//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口
    private final InetSocketAddress logicalAddress;
    private final InetSocketAddress physicalAddress;
    private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节

服务端实现原理

服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url

    public void lookupTopicAsync(
            @Suspended AsyncResponse asyncResponse,
            @PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
            @QueryParam("listenerName") String listenerName,
            @HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
        TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
        if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
            listenerName = listenerNameHeader;
        }
        //可以看得到这里是获取Lookup的,跟踪进去看看
        internalLookupTopicAsync(topicName, authoritative, listenerName)
                .thenAccept(lookupData -> asyncResponse.resume(lookupData))
                .exceptionally(ex -> {
                    ....
                });
    }


 			protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
                            //获得目标Broker地址, 继续从这里进去
                            .getBrokerServiceUrlAsync(topicName,
                                    LookupOptions.builder()
                                            .advertisedListenerName(listenerName)
                                            .authoritative(authoritative)
                                            .loadTopicsInBundle(false)
                                            .build());
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
        long startTime = System.nanoTime();

        // 获取这个Topic所归属的Bundle
        CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
                .thenCompose(bundle -> {
                    //根据获得的bundle信息查询归属的Broker
                    return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
                      			//如果findRedirectLookupResultAsync方式没查到则走这里进行查询
                            return findBrokerServiceUrl(bundle, options); 
                    });
                });

        future.thenAccept(optResult -> {
            ....
        }).exceptionally(ex -> {
						....
        });

        return future;
    }

先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
        return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
          			//直接看findBundle,命名意思已经很清晰了
                .thenApply(bundles -> bundles.findBundle(topic));
    }

    public NamespaceBundle findBundle(TopicName topicName) {
        checkArgument(nsname.equals(topicName.getNamespaceObject()));
      	//同理,继续跟踪进去
        return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);
    }

    public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
        //计算Topic名称的哈希值
        long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
        //根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的
        NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);
        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
            bundle.setHasNonPersistentTopic(true);
        }
        return bundle;
    }

    protected NamespaceBundle getBundle(long hash) {
        //通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle
      	//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息
        int idx = Arrays.binarySearch(partitions, hash);
        int lowerIdx = idx < 0 ? -(idx + 2) : idx;
        return bundles.get(lowerIdx);
    }

知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
        if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
            return CompletableFuture.completedFuture(Optional.empty());
        }
        return redirectManager.findRedirectLookupResultAsync();
    }

总结

以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519888.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[机器学习]人工智能为小米智架保驾护航

前言 小米汽车作为小米集团进军汽车行业的新尝试&#xff0c;吸引了广泛的关注。其结合了小米在科技和创新方面的优势&#xff0c;以及对智能出行的愿景&#xff0c;为汽车行业注入了新的活力。虽然小米汽车工厂还处于初期阶段&#xff0c;但其积极采用人工智能和机器学习等前沿…

基于Pytorch+昇腾NPU部署baichuan2-7B大模型

一、模型介绍 Baichuan 2 是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。Baichuan 2 在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。 它基于 Transformer 结构&#xff0c;在大约1.2万亿 tokens…

docker进行jenkins接口自动化测试持续集成实战

文章目录 一、接口功能自动化测试项目源码讲解二、接口功能自动化测试运行环境配置1、下载jdk&#xff0c;maven&#xff0c;git&#xff0c;allure并配置对应的环境变量2、使用docker安装jenkins3、配置接口测试的运行时环境选择对应节点4、jenkins下载插件5、jenkins配置环境…

解决element-plus table组件 fixed=“right“(left)浮动后横向滚动文字穿透的问题

BUG 版本&#xff1a;element-plus 2.6.1 浏览器&#xff1a;360极速浏览器22.1 (Chromium内核) 组件&#xff1a;el-table组件 问题&#xff1a;在头部/尾部浮动加上斑马条纹后&#xff0c;横向滚动存在文字穿透的问题。具体如图&#xff1a; 白色背景行的文字&#xff0c…

【关于窗口移动求和的两种计算方法】

窗口移动计算方法 例子方法1方法2运行结果: 例子 在很多算法中都会涉及到窗口滑动&#xff0c;比如基于新息序列更新的自适应卡尔曼滤波器算法中便会使用到。 已知一个数列&#xff1a;OCV [1;2;3;4;5;6;7;8;9;10;11;12;13;14;15]&#xff0c;定义窗口长度为5&#xff0c;每次…

Python自带的集成开发和学习环境IDLE 中安装工具包的pip文件修复和重置解决方法————以win 7系统下Python 3.8 32-bit为例

Python自带的集成开发和学习环境IDLE 中安装工具包的pip文件修复和重置解决方法————以win 7系统下Python 3.8 32-bit为例 目录 Python自带的集成开发和学习环境IDLE 中安装工具包的pip文件修复和重置解决方法————以win 7系统下Python 3.8 32-bit为例一、IDLE简介和特点…

软考111-上午题-【计算机网络】-URL和DNS

一、URL解析 org&#xff1a;各类组织结构&#xff08;非盈利团队&#xff09; 1-1、顶级域 顶级域名是域名的最后一个部分&#xff0c;即是域名最后一点之后的字母&#xff0c;例如&#xff1a;www.baidu.com这个域名中&#xff0c;顶级域是.com&#xff08;或.COM&#xff…

2024/4/5 ACM格式练习

一、知识点&#xff1a; &#xff08;1&#xff09;行数不固定&#xff1a;用Scanf的err返回值判断是否读到EOF。 &#xff08;2&#xff09;每行数据个数不固定&#xff1a;一个一个读数据和它后面的字符&#xff0c;判断后面的字符是否是换行符。如果是就说明读完了一行数据…

重读Java设计模式: 桥接模式详解

引言 在软件开发中&#xff0c;经常会遇到需要在抽象与实现之间建立连接的情况。当系统需要支持多个维度的变化时&#xff0c;使用传统的继承方式往往会导致类爆炸和耦合度增加的问题。为了解决这一问题&#xff0c;我们可以使用桥接模式。桥接模式是一种结构型设计模式&#…

计算机网络—HTTP协议:深入解析与应用实践

​ &#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;ヒステリックナイトガール 1:03━━━━━━️&#x1f49f;──────── 5:06 &#x1f504; ◀️ ⏸ ▶️ ☰…

vue 实现的h5 页面,如何设置页面中的 title

修改页面中的title 公共修改方式在App.vue 中&#xff1a; created() {document.title "测试标题"; },单个页面修改&#xff0c;就在单个页面编写就ok

二百二十九、离线数仓——离线数仓Hive从Kafka、MySQL到ClickHouse的完整开发流程

一、目的 为了整理离线数仓开发的全流程&#xff0c;算是温故知新吧 离线数仓的数据源是Kafka和MySQL数据库&#xff0c;Kafka存业务数据&#xff0c;MySQL存维度数据 采集工具是Kettle和Flume&#xff0c;Flume采集Kafka数据&#xff0c;Kettle采集MySQL数据 离线数仓是Hi…

JVM专题——垃圾回收

本文部分内容节选自Java Guide和《深入理解Java虚拟机》, Java Guide地址: https://javaguide.cn/java/jvm/jvm-garbage-collection.html &#x1f680; 基础&#xff08;上&#xff09; → &#x1f680; 基础&#xff08;中&#xff09; → &#x1f680;基础&#xff08;下&…

2024-04-05 问AI: 介绍一下深度学习中的Leaky ReLU函数

文心一言 Leaky ReLU&#xff08;Leaky Rectified Linear Unit&#xff09;函数是深度学习领域中的一种激活函数&#xff0c;主要用于解决ReLU&#xff08;Rectified Linear Unit&#xff09;函数在负值区域为零的问题。ReLU函数在输入为正时保持原样&#xff0c;而在输入为负…

(学习日记)2024.04.07:UCOSIII第三十五节:互斥量实验

写在前面&#xff1a; 由于时间的不足与学习的碎片化&#xff0c;写博客变得有些奢侈。 但是对于记录学习&#xff08;忘了以后能快速复习&#xff09;的渴望一天天变得强烈。 既然如此 不如以天为单位&#xff0c;以时间为顺序&#xff0c;仅仅将博客当做一个知识学习的目录&a…

通过 Cookie、Redis共享Session 和 Spring 拦截器技术,实现对用户登录状态的持有和清理(三)

本篇内容对应 “2.4 生成验证码” 小节 和 “4.7 优化登陆模块”小节 视频链接 1 Kaptcha介绍 Kaotcga是一个生成验证码的工具。 你的网站验证码是什么&#xff1f; 在我们这个牛客论坛项目&#xff0c;验证码分为两部分 给用户看的是图片&#xff0c;用户根据图片上显示的…

跨境电商独立站是什么?为什么要做独立站?

跨境电商独立站就是跨境电商自行搭建的销售网站&#xff0c;服务器、域名都是自主购买的&#xff0c;并由跨境电商独立运营与营销推广。 近些年来&#xff0c;各类第三方电商平台虽然流量大&#xff0c;但是随着进驻电商数量的增加&#xff0c;流量竞争也愈发激烈&#xff0c;…

基于顺序表实现通讯管理系统!(有完整源码!)

​​​​​​​ 个人主页&#xff1a;秋风起&#xff0c;再归来~ 文章专栏&#xff1a;C语言实战项目 个人格言&#xff1a;悟已往之不谏&#xff0c;知来者犹可追 克心守己&#xff0c;律己则安&#xff01;​​​​​​​ 目录 1、实现思路 ​…

C语言中strlen函数的实现

C语言中strlen函数的实现 为了便于和strlen函数区别&#xff0c;以下命令为_strlen。 描述&#xff1a;实现strlen&#xff0c;获取字符串的长度&#xff0c;函数原型如下&#xff1a; size_t strlen(const char *str);_strlen实现&#xff1a; size_t _strlen(const char*…

彩虹聚合DNS管理系统,附带系统搭建教程

聚合DNS管理系统&#xff0c;可以实现在一个网站内管理多个平台的域名解析&#xff0c;目前已支持的域名平台有&#xff1a;阿里云、腾讯云、华为云、西部数码、CloudFlare。 本系统支持多用户&#xff0c;每个用户可分配不同的域名解析权限&#xff1b;支持API接口&#xff0…