基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了调用createProxy方法,根据服务引用参数map创建服务接口代理引用对象的整体流程,我们知道会调用createInvokerForRemote方法创建远程引用Invoker,这是Dubbo 3 服务引用的核心方法,我们现在来接着学习createInvokerForRemote方法。
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
文章目录
- 1 createInvokerForRemote创建远程引用Invoker
- 2 Protocol$Adaptive自适应Protocol
- 3 ProtocolSerializationWrapper协议序列化包装器
- 4 ProtocolFilterWrapper协议过滤器包装器
- 5 ProtocolListenerWrapper协议监听器包装器
- 6 总结
1 createInvokerForRemote创建远程引用Invoker
远程引用或者直连引用情况下,将会调用该方法,创建远程引用Invoker。该方法对于一个注册中心url和多个注册中心url的处理不一样,我们仅看一个注册中心的情况。
一个注册中心的情况下,该方法主要逻辑就是执行protocolSPI.refer方法,通过协议protocolSPI引用服务Invoker。
这里的protocolSPI是Protocol的自适应扩展实现,即Protocol$Adaptive,将会根据url的协议选择Protocol实现,然后调用Protocol#refer方法引用服务。
我们下面主要看protocolSPI#refer方法方法的相关源码,这也是Dubbo服务引入的核心流程之一。
/**
* ReferenceConfig的方法
* <p>
* 创建远程引用Invoker
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void createInvokerForRemote() {
//一个url,表示一个注册中心或者直连地址,这是大多数情况
//url例如: registry://47.94.229.245:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=demo1&application=demo-consumer&dubbo=2.0.2&pid=34457®istry=zookeeper&timeout=20000×tamp=1666854892012
if (urls.size() == 1) {
//获取注册中心协议url
URL curUrl = urls.get(0);
/*
* 通过协议protocolSPI引用服务,该方法固定返回InjvmInvoker实例,内部没有NettyClient,因为不需要发起网络调用
*
* 这里的protocolSPI是Protocol的自适应扩展实现,即Protocol$Adaptive
* 将会根据url的协议选择Protocol实现,然后调用Protocol#refer方法引用服务
*/
invoker = protocolSPI.refer(interfaceClass, curUrl);
// registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.
//如果是registry url(非直连地址), 或者unloadClusterRelated为true,那么不需要Cluster,否则需要Cluster包装
//对于远程注册中心协议来说,在protocolSPI.refer方法中就已经进行了Cluster包装,这里不再需要了
if (!UrlUtils.isRegistry(curUrl) &&
!curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {
List<Invoker<?>> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
}
//多个url,表示多个注册中心或者直连地址
else {
List<Invoker<?>> invokers = new ArrayList<>();
URL registryUrl = null;
for (URL url : urls) {
// For multi-registry scenarios, it is not checked whether each referInvoker is available.
// Because this invoker may become available later.
invokers.add(protocolSPI.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// use last registry url
registryUrl = url;
}
}
if (registryUrl != null) {
// registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
// (RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
} else {
// not a registry url, must be direct invoke.
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
URL curUrl = invokers.get(0).getUrl();
String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
2 Protocol$Adaptive自适应Protocol
Protocol$Adaptive的refer方法将会根据url的协议基于Dubbo SPI机制选择Protocol实现,然后调用Protocol#refer方法引用服务,默认dubbo协议。
/**
* Protocol$Adaptive的方法
* 基于url协议参数的自适应引用服务
*
* @param arg0 服务class
* @param arg1 远程服务的url地址
* @return Invoker
*/
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
//获取url协议作为扩展名,默认dubbo
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
//基于DUbbo SPI机制查找指定扩展名的Protocol实现类,默认DubboProtocol,这里会进行wrapper包装
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
//通过具体的Protocol包装实现类的refer方法实现服务的引用
return extension.refer(arg0, arg1);
}
同服务导出时一样,获取Protocol的时候,将会经过wrapper的包装。以InjvmProtocol协议为例,可以看到经过了三层包装,调用时由外向内调用,即ProtocolSerializationWrapper -> ProtocolFilterWrapper -> ProtocolListenerWrapper -> InjvmProtocol(具体的Protocol实现)。实际上Dubbo正式采用warpper机制和装饰设计模式实现类似aop的功能。
本地引入的injvm协议对应InjvmProtocol,需要引入远程接口级注册中心的registry对应InterfaceCompatibleRegistryProtocol,需要引入远程应用级注册中心的service-discovery-registry对应RegistryProtocol。
下面我们分别讲解这些wrapper和protocol是如何进行服务引入的!
3 ProtocolSerializationWrapper协议序列化包装器
protocol的最外层wrapper,它仅会在导出服务的export方法中起作用,在引入服务的refer方法中没有其他处理。
/**
* ProtocolSerializationWrapper的方法
*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//无特殊处理
return protocol.refer(type, url);
}
4 ProtocolFilterWrapper协议过滤器包装器
这个包装器首先会判断如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法。
否则,获取服务url对应的Filter并且构建为一个InvokerChain对象返回,内部包含了一个下层refer方法的Invoker和一条过滤器调用链。
/**
* ProtocolFilterWrapper的方法
*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
//获取服务url对应的Filter并且构建为一个InvokerChain对象返回,内部包含了一个下层refer方法的Invoker和一条过滤器调用链。
FilterChainBuilder builder = getFilterChainBuilder(url);
return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
5 ProtocolListenerWrapper协议监听器包装器
这个包装器首先会判断如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法。
否则,调用下一层refer方法获取返回的Invoker,如果url不包含registry-cluster-type参数,将返回的Invoker包装为ListenerInvokerWrapper,内部包含了一个Invoker和一个监听器列表。
/**
* ProtocolListenerWrapper的方法
*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//如果是注册中心的协议,例如registry或者service-discovery-registry,那么直接调用下一层refer方法
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
//调用下一层refer方法获取返回的Invoker
Invoker<T> invoker = protocol.refer(type, url);
//如果url不包含registry-cluster-type参数
if (StringUtils.isEmpty(url.getParameter(REGISTRY_CLUSTER_TYPE_KEY))) {
//将返回的Invoker包装为ListenerInvokerWrapper,内部包含了一个Invoker和一个监听器列表
invoker = new ListenerInvokerWrapper<>(invoker,
Collections.unmodifiableList(
ScopeModelUtil.getExtensionLoader(InvokerListener.class, invoker.getUrl().getScopeModel())
.getActivateExtension(url, INVOKER_LISTENER_KEY)));
}
return invoker;
}
6 总结
本次我们学习了createInvokerForRemote方法中的Wrapper有哪些以及作用,接下来我们将会的学习真正的本地、应用级别、接口级别的Protocol的引入逻辑。