基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。
此前我们学习了Dubbo服务的导出和引入的源码,现在我们来学习Dubbo服务调用的源码。
此前的文章中我们讲过了最上层代理的调用逻辑(服务引用bean的获取以及懒加载原理):业务引入的接口代理对象(ReferenceBean内部的lazyProxy对象)-> 代理目标对象(ReferenceConfig内部的接口代理对象ref),代理目标对象的请求都会被统一转发到内部的InvokerInvocationHandler#invoke方法中去。
所以,我们学习Dubbo服务调用源码入口就是InvokerInvocationHandler#invoke方法。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
文章目录
- InvokerInvocationHandler#invoke调用入口
- InvocationUtil#invoke发起调用
- getUrl获取consumerUrl
- MigrationInvoker#invoke决策调用Invoker
- MockClusterInvoker#invoker本地mock调用
- AbstractCluster#invoke继续调用
- CallbackRegistrationInvoker#invoke回调注册
- CopyOfFilterChainNode#invoke过滤器链式调用
- ConsumerContextFilter封装信息
- FutureFilter执行回调方法
- MonitorFilter收集监控数据
- RouterSnapshotFilter路由快照zhichi
- AbstractClusterInvoker#invoke集群容错rpc调用
- FailoverClusterInvoker#doInvoke失败重试调用
- AbstractClusterInvoker#select选择invoker
- AbstractClusterInvoker#doSelect继续选择invoker
- FailoverClusterInvoker#invokeWithContext调用服务
- 总结
InvokerInvocationHandler#invoke调用入口
InvokerInvocationHandler#invoke可以看作是消费者发起调用的入口方法。
该方法中,将会构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息,然后通过InvocationUtil#invoke方法来发起真正的远程or本地调用。
/**
* InvokerInvocationHandler的方法
* <p>
* 执行代理对象的方法的转发
*
* @param proxy 在其上调用方法的代理实例
* @param method 在代理实例上调用的接口方法
* @param args 一个对象数组,包含在代理实例的方法调用中传递的参数值,如果接口方法不接受参数,则为null
* @return 调用代理对象方法执行结果
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//对于object类的方法,直接反射调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
//获取方法名和方法参数类型数组
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//特殊方法的调用,直接调用invoker对象的同名方法
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
//构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
//通过InvocationUtil.invoke方法来发起真正的远程or本地调用
return InvocationUtil.invoke(invoker, rpcInvocation);
}
InvocationUtil#invoke发起调用
这里的Invoker是经过层层封装之后的Invoker(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来),我们将按照这个顺序从上向下讲解。
public class InvocationUtil {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
/**
* 消费者调用
*
* @param invoker 可执行器
* @param rpcInvocation 接口方法调用抽
* @return 执行结果
*/
public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
//消费者url,consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363®ister.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false×tamp=1668394930680&unloadClusterRelated=false&version=1.0.0
URL url = invoker.getUrl();
//服务key,{group}/{serviceInterface}:{version}
String serviceKey = url.getServiceKey();
//设置目标服务的唯一名称
rpcInvocation.setTargetServiceUniqueName(serviceKey);
//设置消费者url到rpc调用上下文中吗格式一个内部的线程本地变量
// invoker.getUrl() returns consumer url.
RpcServiceContext.getServiceContext().setConsumerUrl(url);
//调用性能解析器
if (ProfilerSwitch.isEnableSimpleProfiler()) {
//线程本地变量
ProfilerEntry parentProfiler = Profiler.getBizProfiler();
ProfilerEntry bizProfiler;
if (parentProfiler != null) {
//接收请求。客户端调用开始。
bizProfiler = Profiler.enter(parentProfiler,
"Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
} else {
//接收请求。客户端调用开始。
bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
}
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
/*
* 执行invoker#invoker方法实现调用
*/
return invoker.invoke(rpcInvocation).recreate();
} finally {
//释放
Profiler.release(bizProfiler);
int timeout;
//timeout附加信息,远程服务调用超时时间(毫秒),默认1000
Object timeoutKey = rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
if (timeoutKey instanceof Integer) {
timeout = (Integer) timeoutKey;
} else {
timeout = url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
TIMEOUT_KEY,
DEFAULT_TIMEOUT);
}
//调用执行时间
long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
//是否超时
if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
StringBuilder attachment = new StringBuilder();
rpcInvocation.foreachAttachment((entry) -> {
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
});
//超时打印日志
logger.warn(String.format(
"[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
rpcInvocation.getProtocolServiceKey(),
rpcInvocation.getMethodName(),
usage / 1000_000,
usage % 1000_000,
timeout,
attachment,
Profiler.buildDetail(bizProfiler)));
}
}
}
/*
* 抛出异常后重新执行invoker#invoker方法实现调用
*/
return invoker.invoke(rpcInvocation).recreate();
}
}
getUrl获取consumerUrl
该方法获取消费者url。例如:consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363®ister.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false×tamp=1668394930680&unloadClusterRelated=false&version=1.0.0
/**
* MigrationInvoker的方法
*/
@Override
public URL getUrl() {
if (currentAvailableInvoker != null) {
//当前激活的Invoker
return currentAvailableInvoker.getUrl();
} else if (invoker != null) {
//接口级别invoker
return invoker.getUrl();
} else if (serviceDiscoveryInvoker != null) {
//应用级别invoker
return serviceDiscoveryInvoker.getUrl();
}
//消费者url
return consumerUrl;
}
MigrationInvoker#invoke决策调用Invoker
MigrationInvoker是可迁移Invoker,也是最上层Invoker,它可用于选择不同的Invoker执行调用,用于应用级服务发现和接口级服务发现之间的迁移,以及实现灰度调用。
该方法决策使用应用级Invoker还是接口级Invoker执行调用,如果设置了激活的invoker并且类型为应用级优先,那么还会进入灰度策略,如果随机决策值(0-100)大于灰度值,那么走接口级订阅模式,否则走应用级订阅模式。灰度比例功能仅在应用级优先状态下生效。
/**
* MigrationInvoker的方法
*
* 决策使用应用级Invoker还是接口级Invoker执行调用
*
* @param invocation RpcInvocation
* @return 调用结果
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
//如果设置了当前激活的invoker
if (currentAvailableInvoker != null) {
//如果类型为应用级优先
if (step == APPLICATION_FIRST) {
//如果随机决策值(0-100)大于灰度值,那么走接口模式
//灰度比例功能仅在应用级优先状态下生效
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
//退回到接口模式调用
return invoker.invoke(invocation);
}
//每次检查调用程序是否可用,应用级invoker优先,然后继续使用invoker调用
return decideInvoker().invoke(invocation);
}
//其他类型
return currentAvailableInvoker.invoke(invocation);
}
//如果没有当前激活的invoker
switch (step) {
case APPLICATION_FIRST:
//每次检查调用程序是否可用,应用级invoker优先
currentAvailableInvoker = decideInvoker();
break;
case FORCE_APPLICATION:
//应用级invoker
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
//接口级invoker
currentAvailableInvoker = invoker;
}
//调用
return currentAvailableInvoker.invoke(invocation);
}
MockClusterInvoker#invoker本地mock调用
dubbo除了限流措施之外,还支持mock本地伪装。本地伪装常被用于服务降级和熔断。比如某验权服务,当服务提供方全部挂掉后,假如此时服务消费方发起了一次远程调用,那么本次调用将会失败并抛出一个 RpcException 异常。
为了避免出现这种直接抛出异常的情况出现,那么客户端就可以利用本地伪装来提供 Mock 数据返回授权失败。mock的大概配置如下:
- false、0、null、N/A:如果没有设置mock属性,或者设置了这些值,表示不启用mock,直接发起远程调用,这是默认策略。
- force:强制服务降级,如果mock属性值以“force”开头则使用该策略。服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。
- fail策略:服务失败降级,如果mock属性值不符合上面所有的类型,则使用该策略。表示服务调用彻底失败并抛出RPCException之后执行本地mock逻辑,不一定会抛出异常。服务彻底失败具体和设置的集群容错方式有关。如果设置了retries,则是在全部重试次数使用完毕并且抛出RPCException异常之后才会执行mock的逻辑。
- true/default:如果mock属性值为true或者default,那么在最终失败并抛出RPCException之后,会在接口同路径下查找interfaceName+Mock名字的类,随后通过无参构造器实例化,随后调用该Mock类对象的对应方法,该方法中提供对应的逻辑。
详细介绍参见官方文档:https://dubbo.apache.org/zh/docs3-v2/java-sdk/advanced-features-and-usage/service/local-mock/#%E5%BC%80%E5%90%AF-mock-%E9%85%8D%E7%BD%AE。
/**
* MockClusterInvoker的方法
* <p>
* mock调用
*
* @param invocation RpcInvocation
* @return 调用结果
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result;
//消费者url获取mock参数
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
//false、0、null、N/A,表示不需要mock
if (ConfigUtils.isEmpty(value)) {
//no mock
/*
* 继续调用下层Invoker#invoke
*/
result = this.invoker.invoke(invocation);
}
//以force开头,表示强制服务降级
else if (value.startsWith(FORCE_KEY)) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
//服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。
//比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。
result = doMockInvoke(invocation, null);
}
//其他值,先先发起远程调用,当远程服务调用失败时,才会根据配置降级执行mock功能
else {
//fail-mock
try {
/*
* 继续调用下层Invoker#invoke
*/
result = this.invoker.invoke(invocation);
//fix:#4585
//最终失败并抛出RPCException
if (result.getException() != null && result.getException() instanceof RpcException) {
RpcException rpcException = (RpcException) result.getException();
if (rpcException.isBiz()) {
throw rpcException;
} else {
/*
* 执行本地mock调用
*/
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
/*
* 执行本地mock调用
*/
result = doMockInvoke(invocation, e);
}
}
return result;
}
AbstractCluster#invoke继续调用
继续向下到这里,首先执行inoker的过滤操作。
/**
* AbstractCluster的方法
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filterInvoker.invoke(invocation);
}
CallbackRegistrationInvoker#invoke回调注册
filterInvoker实际类型为FilterChainBuilder的内部类CallbackRegistrationInvoker,它的invoke方法用于添加一个回调,它可以在RPC调用完成时触发,在这回调函数将会倒序执行filters中的过滤器。
/**
* FilterChainBuilder的内部类CallbackRegistrationInvoker的方法
*/
@Override
public Result invoke(Invocation invocation) throws RpcException {
/*
* 继续调用下层invoker#invoke
*/
Result asyncResult = filterInvoker.invoke(invocation);
//添加一个回调,它可以在RPC调用完成时触发。
asyncResult.whenCompleteWithContext((r, t) -> {
RuntimeException filterRuntimeException = null;
//过滤器倒序执行
for (int i = filters.size() - 1; i >= 0; i--) {
FILTER filter = filters.get(i);
try {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
//执行过滤器
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} catch (RuntimeException runtimeException) {
LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
}
filterRuntimeException = runtimeException;
t = runtimeException;
}
}
if (filterRuntimeException != null) {
throw filterRuntimeException;
}
});
return asyncResult;
}
CopyOfFilterChainNode#invoke过滤器链式调用
FilterChainBuilder的内部类,一个CopyOfFilterChainNode实例表示一个过滤器节点,将会首先执行全部的过滤器,执行完毕之后,才会继续后面真正的rpc调用。
这个版本默认4个filter,分别是:ConsumerContextFilter、FutureFilter、MonitorFilter、RouterSnapshotFilter。
ConsumerContextFilter封装信息
ConsumerContextFilter为consumer invoker序设置当前RpcContext,包括invoker,invocation, local host, remote host and port。它这样做是为了让执行线程的RpcContext包含这些必要的信息,方便后续直接获取。
/**
* ConsumerContextFilter的方法
*
* 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
*
* @param invoker 下一个invoker节点
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
try {
//设置相关属性
RpcContext.getServiceContext()
//下一个invoker
.setInvoker(invoker)
//调用抽象
.setInvocation(invocation)
//本地地址
.setLocalAddress(NetUtils.getLocalHost(), 0);
RpcContext context = RpcContext.getClientAttachment();
context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
if (invocation instanceof RpcInvocation) {
//下一个invoker
((RpcInvocation) invocation).setInvoker(invoker);
}
if (CollectionUtils.isNotEmpty(supportedSelectors)) {
for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
Map<String, Object> selected = supportedSelector.select();
if (CollectionUtils.isNotEmptyMap(selected)) {
((RpcInvocation) invocation).addObjectAttachments(selected);
}
}
} else {
((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
}
Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// pass default timeout set by end user (ReferenceConfig)
Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
RpcContext.removeServerContext();
//调用下一个invoker节点
return invoker.invoke(invocation);
} finally {
RpcContext.restoreServiceContext(originServiceContext);
}
}
FutureFilter执行回调方法
这个过滤器主要是获取异步方法调用信息AsyncMethodInfo,AsyncMethodInfo内部包含在执行Invoker的各个阶段的回调方法和实例,这里将会执行invoker调用时(调用之前)的回调方法。
/**
* FutureFilter的方法
*
* 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
*
* @param invoker 下一个invoker节点
*/
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
//获取Invoker回调方法,并且执行
fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
//调用下一个invoker节点
return invoker.invoke(invocation);
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
//获取异步方法调用信息AsyncMethodInfo,并且执行invoker调用时的回调方法
final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
//如果为null直接返回
if (asyncMethodInfo == null) {
return;
}
//调用异步调用时的回调方法
final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
//调用异步调用时的回调实例
final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
Object[] params = invocation.getArguments();
try {
//反射执行invoker调用时的回调方法
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
MonitorFilter收集监控数据
监控拦截器,它将收集关于此调用的调用数据并将其发送到监控中心。
/**
* MonitorFilter的方法
*
* 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
*
* @param invoker 下一个invoker节点
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//如果存在monitor,则设置监控数据
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
// count up
getConcurrent(invoker, invocation).incrementAndGet();
}
//调用下一个invoker节点
return invoker.invoke(invocation);
}
RouterSnapshotFilter路由快照zhichi
匹配路由器快照切换器RouterSnapshotSwitcher,用于打印日志以及路由快照。
/**
* RouterSnapshotFilter的方法
*
* 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
*
* @param invoker 下一个invoker节点
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//默认不支持
if (!switcher.isEnable()) {
//调用下一个invoker节点
//如果后面没有其他filter,那么该节点就是真正的invoker节点
return invoker.invoke(invocation);
}
if (!logger.isInfoEnabled()) {
return invoker.invoke(invocation);
}
if (!switcher.isEnable(invocation.getServiceModel().getServiceKey())) {
return invoker.invoke(invocation);
}
RpcContext.getServiceContext().setNeedPrintRouterSnapshot(true);
return invoker.invoke(invocation);
}
如果后面没有其他filter,那么下一个节点就是真正的invoker节点,即具有集群容错策略的ClusterInvoker,默认FailoverClusterInvoker。
AbstractClusterInvoker#invoke集群容错rpc调用
该方法是执行rpc调用的骨干方法。大概步骤为:
- 调用list方法,从服务目录Directory中根据路由规则或滤出满足规则的服务提供者invoker列表。最后会调用每个路由器的route方法进行服务路由,如果没有路由器则返回全部invoker列表。具体源码后面再说。
- 调用initLoadBalance方法,获取负载均衡策略实例,默认RandomLoadBalance。具体源码后面再说。
- 调用doinvoke方法,继续向下执行rpc调用的逻辑。
/**
* AbstractClusterInvoker的方法
* <p>
* rpc调用的模版方法实现
*/
@Override
public Result invoke(final Invocation invocation) throws RpcException {
//销毁检测
checkWhetherDestroyed();
// binding attachments into invocation.
// Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
// if (contextAttachments != null && contextAttachments.size() != 0) {
// ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
// }
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");
/*
* 1 从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表
*/
List<Invoker<T>> invokers = list(invocation);
InvocationProfilerUtils.releaseDetailProfiler(invocation);
/*
* 2 获取负载均衡策略实现
*/
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");
try {
/*
* 3 继续执行rpc调用
*/
return doInvoke(invocation, invokers, loadbalance);
} finally {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
}
}
FailoverClusterInvoker#doInvoke失败重试调用
此前我们通过服务路由获取了符合规则的服务提供者invoker列表,以及获取了服务负载均衡实例,下面将会进行带有容错机制的rpc调用。
FailoverClusterInvoker采用失败自动切换机制,Dubbo默认的容错策略。服务消费方调用失败后自动切换到其他服务提供者的服务器进行重试。客户端等待服务端的处理时间超过了设定的超时时间时,也算做失败,将会重试。可通过 retries属性来设置重试次数(不含第一次),默认重试两次。
通常用于读操作或者具有幂等的写操作,需要注意的是重试会带来更长延迟。大概步骤为:
- 计算最大调用次数,默认3,包括两次失败重试,最小1次。在一个循环中执行调用。
- 每次重试前,重新调用list方法从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表。可想而知,如果此时服务有变更,那么invoked列表将失去准确性。
- 基于负载均衡策略选择一个服务提供者invoker。
- 通过服务提供者invoker执行rpc调用获取结果。
/**
* FailoverClusterInvoker的方法
*
* 失败重试的容错策略
*
* @param invocation 方法调用抽象
* @param invokers 服务提供者invoker列表,已经经过了路由过滤
* @param loadbalance 负载均衡
* @return 执行结果
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
//可用invoker列表不能为空,否则直接抛出异常
checkInvokers(copyInvokers, invocation);
//方法名
String methodName = RpcUtils.getMethodName(invocation);
/*
* 计算最大调用次数,默认3,包括两次失败重试,最小1次
* retries属性
*/
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
//已调用过的invoker列表
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
//已调用过的provider列表
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试前重新从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表
//如果此时服务有变更,那么invoked列表将失去准确性
if (i > 0) {
checkWhetherDestroyed();
//重新服务路由
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
/*
* 基于负载均衡策略选择一个invoker
*/
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
//添加到已调用列表
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
/*
* 使用服务提供者invoker执行rpc服务调用
*/
Result result = invokeWithContext(invoker, invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
success = true;
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
//如果未成功
if (!success) {
//加入已调用过的provider列表
providers.add(invoker.getUrl().getAddress());
}
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
AbstractClusterInvoker#select选择invoker
基于负载均衡策略选择一个invoker。
/**
* AbstractClusterInvoker的方法
*
* 基于负载均衡策略选择一个invoker
*
* @param loadbalance 负载均衡策略
* @param invocation 调用方法抽象
* @param invokers 全部服务提供者invoker列表,已经经过了路由过滤
* @param selected 已被调用过的服务提供者invoker列表
* @return
* @throws RpcException
*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
//调用方法名
String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
//默认false,设置true 该接口上的所有方法使用同一个provider.如果需要更复杂的规则,请使用路由
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
//ignore overloaded method 忽略重载方法
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem 忽略并发问题
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availableCheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
/*
* 继续选择invoker
*/
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
AbstractClusterInvoker#doSelect继续选择invoker
- 如果只有一个服务提供者invoker,那么返回。
- 调用loadbalance#select方法基于负载均衡策略选择一个invoker。这是核心方法,Dubbo提供有很多的负载均衡实现,具体的源码我们后面单独分析。
- 如果invoker在已被调用过的服务提供者invoker列表中,那么调用reselect方法重新选择一个。
- 重新选择仍然失败,采用兜底策略。检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invoker,否则返回第一个。
/**
* AbstractClusterInvoker的方法
* <p>
* 基于负载均衡策略选择一个invoker
*
* @param loadbalance 负载均衡策略
* @param invocation 调用方法抽象
* @param invokers 全部服务提供者invoker列表,已经经过了路由过滤
* @param selected 已被调用过的服务提供者invoker列表
*/
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
//如果只有一个服务提供者invoker,那么返回
if (invokers.size() == 1) {
Invoker<T> tInvoker = invokers.get(0);
checkShouldInvalidateInvoker(tInvoker);
return tInvoker;
}
/*
* 基于负载均衡策略选择一个invoker
* 核心方法
*/
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
//如果invoker在selected中,则重新选择。
boolean isSelected = selected != null && selected.contains(invoker);
//如果availableCheck=true,并且invoker不可用,则重新选择。
boolean isUnavailable = availableCheck && !invoker.isAvailable() && getUrl() != null;
if (isUnavailable) {
invalidateInvoker(invoker);
}
//重新选择
if (isSelected || isUnavailable) {
try {
/*
* 重新基于负载均衡策略选择一个invoker
*/
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availableCheck);
if (rInvoker != null) {
invoker = rInvoker;
}
//重新选择仍然失败,兜底策略
else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
//检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invoker
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn("2-5", "select invokers exception", "", e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("2-5", "failed to reselect invokers", "", "cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
FailoverClusterInvoker#invokeWithContext调用服务
将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量。随后使用服务提供者invoker#invoke执行rpc服务调用。
/**
* AbstractClusterInvoker
* <p>
* 使用服务提供者invoker执行rpc服务调用
*
* @param invoker 服务提供者invoker
* @param invocation 方法调用抽象
* @return 调用结果
*/
protected Result invokeWithContext(Invoker<T> invoker, Invocation invocation) {
//将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量
setContext(invoker);
Result result;
try {
if (ProfilerSwitch.isEnableSimpleProfiler()) {
InvocationProfilerUtils.enterProfiler(invocation, "Invoker invoke. Target Address: " + invoker.getUrl().getAddress());
}
/*
* 使用服务提供者invoker执行rpc服务调用
*/
result = invoker.invoke(invocation);
} finally {
//清除上下文中的invoker属性
clearContext(invoker);
InvocationProfilerUtils.releaseSimpleProfiler(invocation);
}
return result;
}
总结
本次我们学习了,Dubbo 发起服务调用的上半部分源码,实际上就是按照Invoker一层层的调用,每一层的Invoker又不同的功能,通过Invoker层层包装实现一些类似于AOP的能力。
调用过程中也会执行一个Filter链表,用于执行一些额外的逻辑,最终默认会执行到FailoverClusterInvoker,这是一个失败重试的Invoker,是Dubbo默认的容错策略,会给予负载均衡策略选择一个真实的服务提供者Invoker发起远程RPC调用。
调用过程中,Invoker全程参与,这里我们也能明白为什么Invoker被称为可执行体,因为其内部封装了Dubbo远程RPC调用的各种逻辑:服务路由、负载均衡、失败容错等等,也能明白Invoker作为Dubbo核心模型的重要性了。
整体看下来,Dubbo服务调用的源码是不是比此前服务注册和服务发现的源码简单得多了呢?下文我们学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。