引言
处理读写是Pulsar服务端最基本也是最重要的逻辑,今天就重点看看服务端是如何处理的读请求也就是消费者请求
正文
Pulsar服务端处理消费者请求的流程大致如下图所示
- 消费者通过TCP向服务端发起消息拉取请求
- Broker会根据请求中携带的ID来获取在服务端对应的Consumer对象,每个Consumer对象都有一个对应的游标对象,这个游标对象会调用Dispatcher来做数据查询的操作
- Dispatcher会先尝试读取缓存,这个缓存是个跳表结构并且节点数据是存在堆外内存中的,如果命中则直接返回
- 未命中缓存的话会通过Bookkeeper客户端去读取Bookkeeper中的数据,读取到后会通过跟客户端所建立的TCP连接将查到的数据发送过去
整体流程就是这四步,接下来就让咱们看看Pulsar的代码实现吧
处理消费请求
Broker处理的请求基本都是从ServerCnx这里开始的,因为它实现了Netty的ChannelInboundHandlerAdapter类,因此所有TCP的数据写进来时最终都是ServerCnx进行处理的,处理消费的请求时从handleFlow方法开始,因此从这里进行跟踪
protected void handleFlow(CommandFlow flow) {
....
//从当前Broker维护的Consumer列表中获取客户端对应服务端的Consumer对象
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
//传入客户端配置的拉取条数,最大默认不会超过1000
consumer.flowPermits(flow.getMessagePermits());
} else {
log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
}
}
}
public void flowPermits(int additionalNumberOfMessages) {
....
// 处理消息拉取请求,继续跟进去看看
subscription.consumerFlow(this, additionalNumberOfMessages);
}
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
this.lastConsumedFlowTimestamp = System.currentTimeMillis();
//最终调用者是dispatcher
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}
Dispatcher是个接口,在这里选择PersistentDispatcherSingleActiveConsumer的实现进行跟踪
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
//作为一个任务交给线程池处理
executor.execute(() -> internalConsumerFlow(consumer));
}
private synchronized void internalConsumerFlow(Consumer consumer) {
//进行消息的读取
readMoreEntries(consumer);
}
private void readMoreEntries(Consumer consumer) {
....
//通过游标进行数据的读取
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
}
PersistentDispatcherSingleActiveConsumer最终会调用ManagedCursorImpl进行数据的读取,这里要注意PersistentDispatcherSingleActiveConsumer实现了回调接口,也就是它自身实现了数据读取成功的处理逻辑。这里它将自己作为参数传给下一层用于在读取成功后进行回调处理,这也是最常见的异步回调设计方式。
继续跟踪ManagedCursorImpl的数据读取逻辑
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);
}
public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
....
// 读取数据
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
}
public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
// 封装第二层回调
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
//核心方法,从这里进去读取
ledger.asyncReadEntries(op);
}
void asyncReadEntries(OpReadEntry opReadEntry) {
....
internalReadFromLedger(currentLedger, opReadEntry);
....
}
private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
....
// 进行数据读取
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
....
// 封装第三层回调
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
// 尝试从缓存中读取数据,继续跟踪进去
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
ctx);
}
}
entryCache有RangeEntryCacheImpl和EntryCacheDisabled两种实现,EntryCacheDisabled相当于不走缓存直接查Bookkeeper,而RangeEntryCacheImpl是会尝试去读取Broker自身的缓存,这里跟着RangeEntryCacheImpl看看实现
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
//跟进去看
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
}
void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
//一样,继续跟踪看
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);
}
void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
....
// 缓存实现是ConcurrentSkipListMap value是堆外内存
Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);
....
//如果全部命中缓存则直接返回,否则往下走
// 从bookkeeper读
pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
shouldCacheEntry, callback, ctx);
}
void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
....
//从Bookkeeper进行数据的读取
CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,
lastEntry, shouldCacheEntry);
}
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
....
//这里的lh其实就是Bookkeeper的客户端对象LedgerHandle
CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
....
}
到这里基本就到了Bookkeeper的内部逻辑了,Bookkeeper相关的后面在单独进行分析。读取逻辑基本就到这了,肯定会有伙伴疑惑🤔,读到数据后怎么将数据发给客户端/消费者呢?请继续往下看
回调处理
刚刚进行代码跟踪的时候应该都看到流程中封住了好几个回调函数,这里就拎最重要的也就是PersistentDispatcherSingleActiveConsumer进行讨论,这里直接从它的回调方法readEntriesComplete进行跟踪
public void readEntriesComplete(final List<Entry> entries, Object obj) {
//作为任务放到线程池去执行
executor.execute(() -> internalReadEntriesComplete(entries, obj));
}
private synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
....
//分派数据到消费者
dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo, epoch);
}
protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,
EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
SendMessageInfo sendMessageInfo, long epoch) {
//将查到的消息通过TCP写到消费者端
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker, epoch)
....
}
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long totalChunkedMessages,
RedeliveryTracker redeliveryTracker, long epoch) {
....
//通过PulsarCommandSenderImpl进行消息发送,继续跟踪进去
Future<Void> writeAndFlushPromise = cnx.getCommandSender().sendMessagesToConsumer(....);
....
}
public ChannelPromise sendMessagesToConsumer(....) {
....
//通过Netty的TCP将查到的消息数据写到客户端
ctx.write(....);
....
}
到这里基本上服务端的事情就结束了,剩余的其他几个回调函数感兴趣的伙伴可以自行跟踪。
总结
可以看到Pulsar里大量使用了异步回调处理,这样的设计在高并发场景大幅提升服务的性能,尽可能的避免了存在瓶颈的地方。不过带来的另一影响是,代码跟踪起来相对来说容易“迷路”,因此掌握好异步设计的逻辑是很有必要的,可以帮助我们更好的跟踪Pulsar的代码。