Pulsar服务端处理消费者请求以及源码解析

引言

处理读写是Pulsar服务端最基本也是最重要的逻辑,今天就重点看看服务端是如何处理的读请求也就是消费者请求

正文

Pulsar服务端处理消费者请求的流程大致如下图所示
在这里插入图片描述

  1. 消费者通过TCP向服务端发起消息拉取请求
  2. Broker会根据请求中携带的ID来获取在服务端对应的Consumer对象,每个Consumer对象都有一个对应的游标对象,这个游标对象会调用Dispatcher来做数据查询的操作
  3. Dispatcher会先尝试读取缓存,这个缓存是个跳表结构并且节点数据是存在堆外内存中的,如果命中则直接返回
  4. 未命中缓存的话会通过Bookkeeper客户端去读取Bookkeeper中的数据,读取到后会通过跟客户端所建立的TCP连接将查到的数据发送过去

整体流程就是这四步,接下来就让咱们看看Pulsar的代码实现吧

处理消费请求

Broker处理的请求基本都是从ServerCnx这里开始的,因为它实现了Netty的ChannelInboundHandlerAdapter类,因此所有TCP的数据写进来时最终都是ServerCnx进行处理的,处理消费的请求时从handleFlow方法开始,因此从这里进行跟踪

    protected void handleFlow(CommandFlow flow) {
				....
        //从当前Broker维护的Consumer列表中获取客户端对应服务端的Consumer对象
        CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());

        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
                //传入客户端配置的拉取条数,最大默认不会超过1000
                consumer.flowPermits(flow.getMessagePermits());
            } else {
                log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
            }
        }
    }

	public void flowPermits(int additionalNumberOfMessages) {
    			....
          // 处理消息拉取请求,继续跟进去看看
          subscription.consumerFlow(this, additionalNumberOfMessages);
	}

    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        //最终调用者是dispatcher
        dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }

Dispatcher是个接口,在这里选择PersistentDispatcherSingleActiveConsumer的实现进行跟踪

    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        //作为一个任务交给线程池处理
        executor.execute(() -> internalConsumerFlow(consumer));
    }

    private synchronized void internalConsumerFlow(Consumer consumer) {
      	//进行消息的读取
        readMoreEntries(consumer);
    }

		private void readMoreEntries(Consumer consumer) {
      	....
        //通过游标进行数据的读取
        cursor.asyncReadEntriesOrWait(messagesToRead,
                            bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
    }

PersistentDispatcherSingleActiveConsumer最终会调用ManagedCursorImpl进行数据的读取,这里要注意PersistentDispatcherSingleActiveConsumer实现了回调接口,也就是它自身实现了数据读取成功的处理逻辑。这里它将自己作为参数传给下一层用于在读取成功后进行回调处理,这也是最常见的异步回调设计方式。

继续跟踪ManagedCursorImpl的数据读取逻辑

    public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
        asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);
    }


		public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
                                               Object ctx, PositionImpl maxPosition,
                                               Predicate<PositionImpl> skipCondition) {
      			....
      			// 读取数据
            asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
                    maxPosition, skipCondition);
    }


    public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                                 Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
       // 封装第二层回调
      OpReadEntry op =
                OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
      //核心方法,从这里进去读取
      ledger.asyncReadEntries(op);
}


    void asyncReadEntries(OpReadEntry opReadEntry) {
      	....
        internalReadFromLedger(currentLedger, opReadEntry);
        ....
    }


		private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
      	....
        // 进行数据读取
        asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
    }


    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
            Object ctx) {
        if (config.getReadEntryTimeoutSeconds() > 0) {
            ....
            // 封装第三层回调
            ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
                    opReadEntry, readOpCount, createdTime, ctx);
            lastReadCallback = readCallback;
            // 尝试从缓存中读取数据,继续跟踪进去
            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
                    readCallback, readOpCount);
        } else {
            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
                    ctx);
        }
    }

entryCache有RangeEntryCacheImpl和EntryCacheDisabled两种实现,EntryCacheDisabled相当于不走缓存直接查Bookkeeper,而RangeEntryCacheImpl是会尝试去读取Broker自身的缓存,这里跟着RangeEntryCacheImpl看看实现

    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
            final ReadEntriesCallback callback, Object ctx) {
      	//跟进去看
      	asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
    }

    void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
            final ReadEntriesCallback callback, Object ctx) {
      	//一样,继续跟踪看
        asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
    }

    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
        final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
      	....
      	// 缓存实现是ConcurrentSkipListMap value是堆外内存
        Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);
      	....
      	//如果全部命中缓存则直接返回,否则往下走
        // 从bookkeeper读
        pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
                    shouldCacheEntry, callback, ctx);
    }

    void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
                     final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
      	....
        //从Bookkeeper进行数据的读取
        CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,
                        lastEntry, shouldCacheEntry);
    }

    CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
                                                       long firstEntry, long lastEntry, boolean shouldCacheEntry) {
     	  ....
        //这里的lh其实就是Bookkeeper的客户端对象LedgerHandle
      	CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
        ....
    }

到这里基本就到了Bookkeeper的内部逻辑了,Bookkeeper相关的后面在单独进行分析。读取逻辑基本就到这了,肯定会有伙伴疑惑🤔,读到数据后怎么将数据发给客户端/消费者呢?请继续往下看

回调处理

刚刚进行代码跟踪的时候应该都看到流程中封住了好几个回调函数,这里就拎最重要的也就是PersistentDispatcherSingleActiveConsumer进行讨论,这里直接从它的回调方法readEntriesComplete进行跟踪

    public void readEntriesComplete(final List<Entry> entries, Object obj) {
      	//作为任务放到线程池去执行
        executor.execute(() -> internalReadEntriesComplete(entries, obj));
    }


    private synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
			....
      //分派数据到消费者
      dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo, epoch);
    }


    protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,
                                             EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
                                             SendMessageInfo sendMessageInfo, long epoch) {
      
     		//将查到的消息通过TCP写到消费者端
        currentConsumer
            .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
                    redeliveryTracker, epoch)
          ....
    }


    public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
                                     EntryBatchIndexesAcks batchIndexesAcks,
                                     int totalMessages, long totalBytes, long totalChunkedMessages,
                                     RedeliveryTracker redeliveryTracker, long epoch) {
      	....
        //通过PulsarCommandSenderImpl进行消息发送,继续跟踪进去
        Future<Void> writeAndFlushPromise = cnx.getCommandSender().sendMessagesToConsumer(....);
      	....
    }


    public ChannelPromise sendMessagesToConsumer(....) {
      	....
        //通过Netty的TCP将查到的消息数据写到客户端
        ctx.write(....);
        ....
    }

到这里基本上服务端的事情就结束了,剩余的其他几个回调函数感兴趣的伙伴可以自行跟踪。

总结

可以看到Pulsar里大量使用了异步回调处理,这样的设计在高并发场景大幅提升服务的性能,尽可能的避免了存在瓶颈的地方。不过带来的另一影响是,代码跟踪起来相对来说容易“迷路”,因此掌握好异步设计的逻辑是很有必要的,可以帮助我们更好的跟踪Pulsar的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/518377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Lua 和 Love 2d 教程 二十一点朴克牌 (上篇lua源码)

GitCode - 开发者的代码家园 Lua版完整原码 规则 庄家和玩家各发两张牌。庄家的第一张牌对玩家是隐藏的。 玩家可以拿牌&#xff08;即拿另一张牌&#xff09;或 停牌&#xff08;即停止拿牌&#xff09;。 如果玩家手牌的总价值超过 21&#xff0c;那么他们就爆掉了。 面牌…

WIFI|软体 茶凳浅谈 高通WIN QSDK - IPQ6000 与 88Q2112 的相遇

Qualcomm IPQ 系列的Ethernet IC 搭配的有 QCA8075, QCA8081 … 等等Qualcomm自家出产的芯片。QSDK中内建可以支持的3rd party芯片&#xff0c;却寥寥可数。日前&#xff0c;客户使用车载以太网 - 88Q2112 - Marvell与IPQ6000做搭配。将之记录下来&#xff0c;以供参考。 方…

传输层 --- TCP (下篇)

目录 1. 超时重传 1.1. 数据段丢包 1.2. 接收方发送的ACK丢包 1.3. 超时重传的超时时间如何设置 2. 流量控制 3. 滑动窗口 3.1. 初步理解滑动窗口 3.2. 滑动窗口的完善理解 3.3. 关于快重传的补充 3.4. 快重传和超时重传的区别 4. 拥塞控制 4.1. 拥塞控制的宏观认识…

2024年华为OD机试真题-推荐多样性-Java-OD统一考试(C卷)

题目描述&#xff1a; 推荐多样性需要从多个列表中选择元素&#xff0c;一次性要返回N屏数据&#xff08;窗口数量&#xff09;&#xff0c;每屏展示K个元素&#xff08;窗口大小&#xff09;&#xff0c;选择策略&#xff1a; 1. 各个列表元素需要做穿插处理&#xff0c;即先从…

新版HI3559AV100开发注意事项(三)

新版HI3559AV100开发注意事项&#xff08;三&#xff09; 十九、用的sdk是Hi3559V200_MobileCam_SDK_V1.0.1.5 播放AAC音频文件&#xff0c;adec->ao;adec的初始化里面包括了aaclc解码器的注册&#xff0c;可是在HI_MPI_ADEC_RegisterDecoder(&s32Handle, &stAac);…

一篇文章带你学会7大基本算法(2024最新保姆级教程)

&#x1f3e0;个人主页&#xff1a;尘觉主页 文章目录 算法 - 排序约定选择排序冒泡排序插入排序希尔排序归并排序1. 归并方法2. 自顶向下归并排序3. 自底向上归并排序 快速排序1. 基本算法2. 切分3. 性能分析4. 算法改进4.1 切换到插入排序4.2 三数取中4.3 三向切分 5. 基于切…

vue 打包 插槽 inject reactive draggable 动画 foreach pinia状态管理

在Vue项目中&#xff0c;当涉及到打包、插槽&#xff08;Slots&#xff09;、inject/reactive、draggable、transition、foreach以及pinia时&#xff0c;这些都是Vue框架的不同特性和库&#xff0c;它们各自在Vue应用中有不同的用途。下面我将逐一解释这些概念&#xff0c;并说…

用 Wireshark 解码 H.264

H264&#xff0c;你不知道的小技巧-腾讯云开发者社区-腾讯云 这篇文章写的非常好 这里仅做几点补充 init.lua内容&#xff1a; -- Set enable_lua to false to disable Lua support. enable_lua trueif not enable_lua thenreturn end-- If false and Wireshark was start…

Vue使用高德地图(快速上手)

1.在高德平台注册账号 2.我的 > 管理管理中添加Key 3.安装依赖 npm i amap/amap-jsapi-loader --save 或 yarn add amap/amap-jsapi-loader --save 4.导入 AMapLoade import AMapLoader from amap/amap-jsapi-loader; 5.直接上代码&#xff0c;做好了注释&#xff08;初…

单细胞RNA测序(scRNA-seq)SRA数据下载及fastq-dumq数据拆分

单细胞RNA测序&#xff08;scRNA-seq&#xff09;入门可查看以下文章&#xff1a; 单细胞RNA测序&#xff08;scRNA-seq&#xff09;工作流程入门 单细胞RNA测序&#xff08;scRNA-seq&#xff09;细胞分离与扩增 1. NCBI查询scRNA-seq SRA数据 NCBI地址&#xff1a; https…

前视声呐目标识别定位(六)-代码解析之目标截图并传输

前视声呐目标识别定位&#xff08;一&#xff09;-基础知识 前视声呐目标识别定位&#xff08;二&#xff09;-目标识别定位模块 前视声呐目标识别定位&#xff08;三&#xff09;-部署至机器人 前视声呐目标识别定位&#xff08;四&#xff09;-代码解析之启动识别模块 …

51单片机实验02- P0口流水灯实验

目录 一、实验的背景和意义 二、实验目的 三、实验步骤 四、实验仪器 五、实验任务及要求 1&#xff0c;从led4开始右移 1&#xff09;思路 ①起始灯 &#xff08;led4&#xff09; ②右移 2&#xff09;效果 3&#xff09;代码☀ 2&#xff0c;从其他小灯并向右依…

服务器设置了端口映射之后外网还是访问不了服务器

目录 排查思路参考&#xff1a; 1、确认服务是否在运行 2、确认端口映射设置是否正确 3、使用防火墙测试到服务器的连通性 4、检查服务内部的配置 5、解决办法 6、学习小分享 我们在一个完整的网络数据存储服务系统设备中都会存有业务服务器、防火墙、交换机、路由器&a…

【Laravel】09 用模型批量赋值简化代码 数据库关系

【Laravel】09 用模型批量赋值简化代码 & 数据库关系 1. 用模型批量赋值简化代码2. 数据库关系 1. 用模型批量赋值简化代码 原来存储一个值 2. 数据库关系 这里可以看到两个SQL是一样的

STM32之HAL开发——不同系列SPI功能对比(附STM32Cube配置)

不同系列STM32——SPI框图 F1系列框图 F4系列框图 TI模式时序图特性 F7系列框图 H7系列框图 注意&#xff1a;F7系列以及H7系列支持Quad-SPI模式&#xff0c;可以连接单&#xff0c;双或者四条数据线的Flash存储介质。 SPI——Cube配置流程 RCC时钟源配置 SYS系统调试模式配…

Spring 详细总结

文章目录 第一章 IOC容器第一节 Spring简介1、一家公司2、Spring旗下的众多项目3、Spring Framework①Spring Framework优良特性②Spring Framework五大功能模块 第二节 IOC容器概念1、普通容器①生活中的普通容器②程序中的普通容器 2、复杂容器①生活中的复杂容器②程序中的复…

MySQL、Oracle查看字节和字符长度个数的函数

目录 0. 总结1. MySQL1.1. 造数据1.2. 查看字符/字节个数 2. Oracle2.1. 造数据2.2. 查看字符/字节个数 0. 总结 databasecharbyteMySQLchar_length()length()Oraclelength()lengthB() 1. MySQL 1.1. 造数据 sql drop table if exists demo; create table demo (id …

Cesium 批量种树

1、准备树种建模 分各种级别建模LOD1-LODN 其中meta.json长这样&#xff1a; Gltf再3Dmax中导出Obj,再通过ObjToGltf的工具转换&#xff0c;参考 https://editor.csdn.net/md/?articleId96484597 2、准备shp点数据。&#xff08;shp中的点位就是种树的位置&#xff09; 3、准…

【并发编程】线程安全

线程安全 1. 讲一下 synchronized 关键字的底层原理 1.1 基本使用 如下抢票的代码&#xff0c;如果不加锁&#xff0c;就会出现超卖或者一张票卖给多个人 synchronized&#xff0c;同步【对象锁】采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】 其它线程再想获…

多模态AI全解析:概念、应用与风险

大家好&#xff0c;在人工智能的快速发展浪潮中&#xff0c;多模态学习作为一项革命性技术&#xff0c;正逐渐改变着我们与机器交互的方式。 自OpenAI推出ChatGPT以来&#xff0c;人工智能已经从处理单一文本输入的单模态工具&#xff0c;迈向了能够理解和生成包括文本、图像、…