引言
在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧
Lookup是什么
在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种
- 应该跟哪台Broker建立连接
- Topic的Schema信息
- Topic的分区信息
其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
- 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
- Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
- 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
- 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
- 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入
补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。
客户端实现原理
Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
....
//这里进去就是创建跟Broker的网络连接
grabCnx();
}
void grabCnx() {
//实际上是调用ConnectionHandler进行的
this.connectionHandler.grabCnx();
}
protected void grabCnx(Optional<URI> hostURI) {
....
//这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法
cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
....
}
public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释
return getLookup(url).getBroker(topicName)
.thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
}
public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
try {
//忽略其他的,直接跟这里进去
return createLookup(serviceUrl);
} catch (PulsarClientException e) {
log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
throw new IllegalStateException("Failed to update url " + url);
}
});
}
public LookupService createLookup(String url) throws PulsarClientException {
//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询
if (url.startsWith("http")) {
return new HttpLookupService(conf, eventLoopGroup);
} else {
return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
}
public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
}
protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
....
//可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包
httpClient = new DefaultAsyncHttpClient(config);
....
}
通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息
public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
return getLookup(url).getBroker(topicName)
.thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
}
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
//判断访问哪个版本的接口
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
//获取要访问的Broker地址
return httpClient.get(path, LookupData.class)
.thenCompose(lookupData -> {
URI uri = null;
try {
//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口
InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
//HTTP通过Lookup方式访问服务端绝对不会走代理
return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
false /* HTTP lookups never use the proxy */));
} catch (Exception e) {
....
}
});
}
public class LookupTopicResult {
//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口
private final InetSocketAddress logicalAddress;
private final InetSocketAddress physicalAddress;
private final boolean isUseProxy;
}
客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节
服务端实现原理
服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url
public void lookupTopicAsync(
@Suspended AsyncResponse asyncResponse,
@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("listenerName") String listenerName,
@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
listenerName = listenerNameHeader;
}
//可以看得到这里是获取Lookup的,跟踪进去看看
internalLookupTopicAsync(topicName, authoritative, listenerName)
.thenAccept(lookupData -> asyncResponse.resume(lookupData))
.exceptionally(ex -> {
....
});
}
protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
//获得目标Broker地址, 继续从这里进去
.getBrokerServiceUrlAsync(topicName,
LookupOptions.builder()
.advertisedListenerName(listenerName)
.authoritative(authoritative)
.loadTopicsInBundle(false)
.build());
}
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
long startTime = System.nanoTime();
// 获取这个Topic所归属的Bundle
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
//根据获得的bundle信息查询归属的Broker
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
//如果findRedirectLookupResultAsync方式没查到则走这里进行查询
return findBrokerServiceUrl(bundle, options);
});
});
future.thenAccept(optResult -> {
....
}).exceptionally(ex -> {
....
});
return future;
}
先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
//直接看findBundle,命名意思已经很清晰了
.thenApply(bundles -> bundles.findBundle(topic));
}
public NamespaceBundle findBundle(TopicName topicName) {
checkArgument(nsname.equals(topicName.getNamespaceObject()));
//同理,继续跟踪进去
return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);
}
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
//计算Topic名称的哈希值
long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
//根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的
NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}
protected NamespaceBundle getBundle(long hash) {
//通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle
//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息
int idx = Arrays.binarySearch(partitions, hash);
int lowerIdx = idx < 0 ? -(idx + 2) : idx;
return bundles.get(lowerIdx);
}
知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去
private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
return CompletableFuture.completedFuture(Optional.empty());
}
return redirectManager.findRedirectLookupResultAsync();
}
总结
以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。