Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口

基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。

此前我们学习了Dubbo服务的导出和引入的源码,现在我们来学习Dubbo服务调用的源码。

此前的文章中我们讲过了最上层代理的调用逻辑(服务引用bean的获取以及懒加载原理):业务引入的接口代理对象(ReferenceBean内部的lazyProxy对象)-> 代理目标对象(ReferenceConfig内部的接口代理对象ref),代理目标对象的请求都会被统一转发到内部的InvokerInvocationHandler#invoke方法中去。

所以,我们学习Dubbo服务调用源码入口就是InvokerInvocationHandler#invoke方法。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • InvokerInvocationHandler#invoke调用入口
  • InvocationUtil#invoke发起调用
    • getUrl获取consumerUrl
  • MigrationInvoker#invoke决策调用Invoker
  • MockClusterInvoker#invoker本地mock调用
  • AbstractCluster#invoke继续调用
  • CallbackRegistrationInvoker#invoke回调注册
  • CopyOfFilterChainNode#invoke过滤器链式调用
  • ConsumerContextFilter封装信息
    • FutureFilter执行回调方法
    • MonitorFilter收集监控数据
    • RouterSnapshotFilter路由快照zhichi
  • AbstractClusterInvoker#invoke集群容错rpc调用
  • FailoverClusterInvoker#doInvoke失败重试调用
    • AbstractClusterInvoker#select选择invoker
    • AbstractClusterInvoker#doSelect继续选择invoker
    • FailoverClusterInvoker#invokeWithContext调用服务
  • 总结

InvokerInvocationHandler#invoke调用入口

InvokerInvocationHandler#invoke可以看作是消费者发起调用的入口方法。

该方法中,将会构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息,然后通过InvocationUtil#invoke方法来发起真正的远程or本地调用。

/**
 * InvokerInvocationHandler的方法
 * <p>
 * 执行代理对象的方法的转发
 *
 * @param proxy  在其上调用方法的代理实例
 * @param method 在代理实例上调用的接口方法
 * @param args   一个对象数组,包含在代理实例的方法调用中传递的参数值,如果接口方法不接受参数,则为null
 * @return 调用代理对象方法执行结果
 */
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    //对于object类的方法,直接反射调用
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    //获取方法名和方法参数类型数组
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    //特殊方法的调用,直接调用invoker对象的同名方法
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    //构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息
    RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);

    if (serviceModel instanceof ConsumerModel) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
        rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
    }
    //通过InvocationUtil.invoke方法来发起真正的远程or本地调用
    return InvocationUtil.invoke(invoker, rpcInvocation);
}

InvocationUtil#invoke发起调用

这里的Invoker是经过层层封装之后的Invoker(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来),我们将按照这个顺序从上向下讲解。

在这里插入图片描述

public class InvocationUtil {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);

    /**
     * 消费者调用
     *
     * @param invoker       可执行器
     * @param rpcInvocation 接口方法调用抽
     * @return 执行结果
     */
    public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
        //消费者url,consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363&register.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false&timestamp=1668394930680&unloadClusterRelated=false&version=1.0.0
        URL url = invoker.getUrl();
        //服务key,{group}/{serviceInterface}:{version}
        String serviceKey = url.getServiceKey();
        //设置目标服务的唯一名称
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
        //设置消费者url到rpc调用上下文中吗格式一个内部的线程本地变量
        // invoker.getUrl() returns consumer url.
        RpcServiceContext.getServiceContext().setConsumerUrl(url);
        //调用性能解析器
        if (ProfilerSwitch.isEnableSimpleProfiler()) {
            //线程本地变量
            ProfilerEntry parentProfiler = Profiler.getBizProfiler();
            ProfilerEntry bizProfiler;
            if (parentProfiler != null) {
                //接收请求。客户端调用开始。
                bizProfiler = Profiler.enter(parentProfiler,
                    "Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
            } else {
                //接收请求。客户端调用开始。
                bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
            }
            rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
            try {
                /*
                 * 执行invoker#invoker方法实现调用
                 */
                return invoker.invoke(rpcInvocation).recreate();
            } finally {
                //释放
                Profiler.release(bizProfiler);
                int timeout;
                //timeout附加信息,远程服务调用超时时间(毫秒),默认1000
                Object timeoutKey = rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
                if (timeoutKey instanceof Integer) {
                    timeout = (Integer) timeoutKey;
                } else {
                    timeout = url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
                        TIMEOUT_KEY,
                        DEFAULT_TIMEOUT);
                }
                //调用执行时间
                long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
                //是否超时
                if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
                    StringBuilder attachment = new StringBuilder();
                    rpcInvocation.foreachAttachment((entry) -> {
                        attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
                    });
                    //超时打印日志
                    logger.warn(String.format(
                        "[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
                        rpcInvocation.getProtocolServiceKey(),
                        rpcInvocation.getMethodName(),
                        usage / 1000_000,
                        usage % 1000_000,
                        timeout,
                        attachment,
                        Profiler.buildDetail(bizProfiler)));
                }
            }
        }
        /*
         * 抛出异常后重新执行invoker#invoker方法实现调用
         */
        return invoker.invoke(rpcInvocation).recreate();
    }
}

getUrl获取consumerUrl

该方法获取消费者url。例如:consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363&register.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false&timestamp=1668394930680&unloadClusterRelated=false&version=1.0.0

/**
 * MigrationInvoker的方法
 */
@Override
public URL getUrl() {
    if (currentAvailableInvoker != null) {
        //当前激活的Invoker
        return currentAvailableInvoker.getUrl();
    } else if (invoker != null) {
        //接口级别invoker
        return invoker.getUrl();
    } else if (serviceDiscoveryInvoker != null) {
        //应用级别invoker
        return serviceDiscoveryInvoker.getUrl();
    }
    //消费者url
    return consumerUrl;
}

MigrationInvoker#invoke决策调用Invoker

MigrationInvoker是可迁移Invoker,也是最上层Invoker,它可用于选择不同的Invoker执行调用,用于应用级服务发现和接口级服务发现之间的迁移,以及实现灰度调用。

该方法决策使用应用级Invoker还是接口级Invoker执行调用,如果设置了激活的invoker并且类型为应用级优先,那么还会进入灰度策略,如果随机决策值(0-100)大于灰度值,那么走接口级订阅模式,否则走应用级订阅模式。灰度比例功能仅在应用级优先状态下生效。

/**

 * MigrationInvoker的方法
 * 
 * 决策使用应用级Invoker还是接口级Invoker执行调用
 *
 * @param invocation RpcInvocation
 * @return 调用结果
 */
@Override
public Result invoke(Invocation invocation) throws RpcException {
    //如果设置了当前激活的invoker
    if (currentAvailableInvoker != null) {
        //如果类型为应用级优先
        if (step == APPLICATION_FIRST) {
            //如果随机决策值(0-100)大于灰度值,那么走接口模式
            //灰度比例功能仅在应用级优先状态下生效
            if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                //退回到接口模式调用
                return invoker.invoke(invocation);
            }
            //每次检查调用程序是否可用,应用级invoker优先,然后继续使用invoker调用
            return decideInvoker().invoke(invocation);
        }
        //其他类型
        return currentAvailableInvoker.invoke(invocation);
    }
    //如果没有当前激活的invoker
    switch (step) {
        case APPLICATION_FIRST:
            //每次检查调用程序是否可用,应用级invoker优先
            currentAvailableInvoker = decideInvoker();
            break;
        case FORCE_APPLICATION:
            //应用级invoker
            currentAvailableInvoker = serviceDiscoveryInvoker;
            break;
        case FORCE_INTERFACE:
        default:
            //接口级invoker
            currentAvailableInvoker = invoker;
    }
    //调用
    return currentAvailableInvoker.invoke(invocation);
}

MockClusterInvoker#invoker本地mock调用

dubbo除了限流措施之外,还支持mock本地伪装。本地伪装常被用于服务降级和熔断。比如某验权服务,当服务提供方全部挂掉后,假如此时服务消费方发起了一次远程调用,那么本次调用将会失败并抛出一个 RpcException 异常。

为了避免出现这种直接抛出异常的情况出现,那么客户端就可以利用本地伪装来提供 Mock 数据返回授权失败。mock的大概配置如下:

  1. false、0、null、N/A:如果没有设置mock属性,或者设置了这些值,表示不启用mock,直接发起远程调用,这是默认策略。
  2. force:强制服务降级,如果mock属性值以“force”开头则使用该策略。服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。
  3. fail策略:服务失败降级,如果mock属性值不符合上面所有的类型,则使用该策略。表示服务调用彻底失败并抛出RPCException之后执行本地mock逻辑,不一定会抛出异常。服务彻底失败具体和设置的集群容错方式有关。如果设置了retries,则是在全部重试次数使用完毕并且抛出RPCException异常之后才会执行mock的逻辑。
  4. true/default:如果mock属性值为true或者default,那么在最终失败并抛出RPCException之后,会在接口同路径下查找interfaceName+Mock名字的类,随后通过无参构造器实例化,随后调用该Mock类对象的对应方法,该方法中提供对应的逻辑。

详细介绍参见官方文档:https://dubbo.apache.org/zh/docs3-v2/java-sdk/advanced-features-and-usage/service/local-mock/#%E5%BC%80%E5%90%AF-mock-%E9%85%8D%E7%BD%AE。

/**
 * MockClusterInvoker的方法
 * <p>
 * mock调用
 *
 * @param invocation RpcInvocation
 * @return 调用结果
 */
@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result result;
    //消费者url获取mock参数
    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    //false、0、null、N/A,表示不需要mock
    if (ConfigUtils.isEmpty(value)) {
        //no mock
        /*
         * 继续调用下层Invoker#invoke
         */
        result = this.invoker.invoke(invocation);
    }
    //以force开头,表示强制服务降级
    else if (value.startsWith(FORCE_KEY)) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
        }
        //force:direct mock
        //服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。
        //比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。
        result = doMockInvoke(invocation, null);
    }
    //其他值,先先发起远程调用,当远程服务调用失败时,才会根据配置降级执行mock功能
    else {
        //fail-mock
        try {
            /*
             * 继续调用下层Invoker#invoke
             */
            result = this.invoker.invoke(invocation);

            //fix:#4585
            //最终失败并抛出RPCException
            if (result.getException() != null && result.getException() instanceof RpcException) {
                RpcException rpcException = (RpcException) result.getException();
                if (rpcException.isBiz()) {
                    throw rpcException;
                } else {
                    /*
                     * 执行本地mock调用
                     */
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }

            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
            }
            /*
             * 执行本地mock调用
             */
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

AbstractCluster#invoke继续调用

继续向下到这里,首先执行inoker的过滤操作。

/**
 * AbstractCluster的方法
 */
@Override
public Result invoke(Invocation invocation) throws RpcException {
    return filterInvoker.invoke(invocation);
}

CallbackRegistrationInvoker#invoke回调注册

filterInvoker实际类型为FilterChainBuilder的内部类CallbackRegistrationInvoker,它的invoke方法用于添加一个回调,它可以在RPC调用完成时触发,在这回调函数将会倒序执行filters中的过滤器。

/**
 * FilterChainBuilder的内部类CallbackRegistrationInvoker的方法
 */
@Override
public Result invoke(Invocation invocation) throws RpcException {
    /*
     * 继续调用下层invoker#invoke
     */
    Result asyncResult = filterInvoker.invoke(invocation);
    //添加一个回调,它可以在RPC调用完成时触发。
    asyncResult.whenCompleteWithContext((r, t) -> {
        RuntimeException filterRuntimeException = null;
        //过滤器倒序执行
        for (int i = filters.size() - 1; i >= 0; i--) {
            FILTER filter = filters.get(i);
            try {
                InvocationProfilerUtils.releaseDetailProfiler(invocation);
                if (filter instanceof ListenableFilter) {
                    //执行过滤器
                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    try {
                        if (listener != null) {
                            if (t == null) {
                                listener.onResponse(r, filterInvoker, invocation);
                            } else {
                                listener.onError(t, filterInvoker, invocation);
                            }
                        }
                    } finally {
                        listenableFilter.removeListener(invocation);
                    }
                } else if (filter instanceof FILTER.Listener) {
                    FILTER.Listener listener = (FILTER.Listener) filter;
                    if (t == null) {
                        listener.onResponse(r, filterInvoker, invocation);
                    } else {
                        listener.onError(t, filterInvoker, invocation);
                    }
                }
            } catch (RuntimeException runtimeException) {
                LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
                }
                filterRuntimeException = runtimeException;
                t = runtimeException;
            }
        }
        if (filterRuntimeException != null) {
            throw filterRuntimeException;
        }
    });

    return asyncResult;
}

CopyOfFilterChainNode#invoke过滤器链式调用

FilterChainBuilder的内部类,一个CopyOfFilterChainNode实例表示一个过滤器节点,将会首先执行全部的过滤器,执行完毕之后,才会继续后面真正的rpc调用。

这个版本默认4个filter,分别是:ConsumerContextFilter、FutureFilter、MonitorFilter、RouterSnapshotFilter。

ConsumerContextFilter封装信息

ConsumerContextFilter为consumer invoker序设置当前RpcContext,包括invoker,invocation, local host, remote host and port。它这样做是为了让执行线程的RpcContext包含这些必要的信息,方便后续直接获取。

/**
 * ConsumerContextFilter的方法
 *
 * 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
 *
 * @param invoker 下一个invoker节点
 */
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
    try {
        //设置相关属性
        RpcContext.getServiceContext()
            //下一个invoker
            .setInvoker(invoker)
            //调用抽象
            .setInvocation(invocation)
            //本地地址
            .setLocalAddress(NetUtils.getLocalHost(), 0);

        RpcContext context = RpcContext.getClientAttachment();
        context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
        if (invocation instanceof RpcInvocation) {
            //下一个invoker
            ((RpcInvocation) invocation).setInvoker(invoker);
        }

        if (CollectionUtils.isNotEmpty(supportedSelectors)) {
            for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
                Map<String, Object> selected = supportedSelector.select();
                if (CollectionUtils.isNotEmptyMap(selected)) {
                    ((RpcInvocation) invocation).addObjectAttachments(selected);
                }
            }
        } else {
            ((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
        }

        Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

        // pass default timeout set by end user (ReferenceConfig)
        Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
        if (countDown != null) {
            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
            if (timeoutCountDown.isExpired()) {
                return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
                    "No time left for making the following call: " + invocation.getServiceName() + "."
                        + invocation.getMethodName() + ", terminate directly."), invocation);
            }
        }

        RpcContext.removeServerContext();
        //调用下一个invoker节点
        return invoker.invoke(invocation);
    } finally {
        RpcContext.restoreServiceContext(originServiceContext);
    }
}

FutureFilter执行回调方法

这个过滤器主要是获取异步方法调用信息AsyncMethodInfo,AsyncMethodInfo内部包含在执行Invoker的各个阶段的回调方法和实例,这里将会执行invoker调用时(调用之前)的回调方法。

/**
 * FutureFilter的方法
 *
 * 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
 *
 * @param invoker 下一个invoker节点
 */
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    //获取Invoker回调方法,并且执行
    fireInvokeCallback(invoker, invocation);
    // need to configure if there's return value before the invocation in order to help invoker to judge if it's
    // necessary to return future.
    //调用下一个invoker节点
    return invoker.invoke(invocation);
}

private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    //获取异步方法调用信息AsyncMethodInfo,并且执行invoker调用时的回调方法
    final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
    //如果为null直接返回
    if (asyncMethodInfo == null) {
        return;
    }
    //调用异步调用时的回调方法
    final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
    //调用异步调用时的回调实例
    final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();

    if (onInvokeMethod == null && onInvokeInst == null) {
        return;
    }
    if (onInvokeMethod == null || onInvokeInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onInvokeMethod.isAccessible()) {
        onInvokeMethod.setAccessible(true);
    }

    Object[] params = invocation.getArguments();
    try {
        //反射执行invoker调用时的回调方法
        onInvokeMethod.invoke(onInvokeInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}

MonitorFilter收集监控数据

监控拦截器,它将收集关于此调用的调用数据并将其发送到监控中心。

/**
 * MonitorFilter的方法
 *
 * 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
 *
 * @param invoker 下一个invoker节点
 */
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //如果存在monitor,则设置监控数据
    if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
        invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
        invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
        // count up
        getConcurrent(invoker, invocation).incrementAndGet();
    }
    //调用下一个invoker节点
    return invoker.invoke(invocation);
}

RouterSnapshotFilter路由快照zhichi

匹配路由器快照切换器RouterSnapshotSwitcher,用于打印日志以及路由快照。

/**
 * RouterSnapshotFilter的方法
 *
 * 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。
 *
 * @param invoker 下一个invoker节点
 */
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    //默认不支持
    if (!switcher.isEnable()) {
        //调用下一个invoker节点
        //如果后面没有其他filter,那么该节点就是真正的invoker节点
        return invoker.invoke(invocation);
    }

    if (!logger.isInfoEnabled()) {
        return invoker.invoke(invocation);
    }

    if (!switcher.isEnable(invocation.getServiceModel().getServiceKey())) {
        return invoker.invoke(invocation);
    }

    RpcContext.getServiceContext().setNeedPrintRouterSnapshot(true);
    return invoker.invoke(invocation);
}

如果后面没有其他filter,那么下一个节点就是真正的invoker节点,即具有集群容错策略的ClusterInvoker,默认FailoverClusterInvoker。


AbstractClusterInvoker#invoke集群容错rpc调用

该方法是执行rpc调用的骨干方法。大概步骤为:

  1. 调用list方法,从服务目录Directory中根据路由规则或滤出满足规则的服务提供者invoker列表。最后会调用每个路由器的route方法进行服务路由,如果没有路由器则返回全部invoker列表。具体源码后面再说。
  2. 调用initLoadBalance方法,获取负载均衡策略实例,默认RandomLoadBalance。具体源码后面再说。
  3. 调用doinvoke方法,继续向下执行rpc调用的逻辑。
/**
     * AbstractClusterInvoker的方法
     * <p>
     * rpc调用的模版方法实现
     */
    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        //销毁检测
        checkWhetherDestroyed();

        // binding attachments into invocation.
//        Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
//        if (contextAttachments != null && contextAttachments.size() != 0) {
//            ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
//        }

        InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");
        /*
         * 1 从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表
         */
        List<Invoker<T>> invokers = list(invocation);
        InvocationProfilerUtils.releaseDetailProfiler(invocation);
        /*
         * 2 获取负载均衡策略实现
         */
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");
        try {
            /*
             * 3 继续执行rpc调用
             */
            return doInvoke(invocation, invokers, loadbalance);
        } finally {
            InvocationProfilerUtils.releaseDetailProfiler(invocation);
        }
    }

FailoverClusterInvoker#doInvoke失败重试调用

此前我们通过服务路由获取了符合规则的服务提供者invoker列表,以及获取了服务负载均衡实例,下面将会进行带有容错机制的rpc调用。

FailoverClusterInvoker采用失败自动切换机制,Dubbo默认的容错策略。服务消费方调用失败后自动切换到其他服务提供者的服务器进行重试。客户端等待服务端的处理时间超过了设定的超时时间时,也算做失败,将会重试。可通过 retries属性来设置重试次数(不含第一次),默认重试两次。

通常用于读操作或者具有幂等的写操作,需要注意的是重试会带来更长延迟。大概步骤为:

  1. 计算最大调用次数,默认3,包括两次失败重试,最小1次。在一个循环中执行调用。
  2. 每次重试前,重新调用list方法从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表。可想而知,如果此时服务有变更,那么invoked列表将失去准确性。
  3. 基于负载均衡策略选择一个服务提供者invoker。
  4. 通过服务提供者invoker执行rpc调用获取结果。
/**
 * FailoverClusterInvoker的方法
 * 
 * 失败重试的容错策略
 *
 * @param invocation  方法调用抽象
 * @param invokers    服务提供者invoker列表,已经经过了路由过滤
 * @param loadbalance 负载均衡
 * @return 执行结果
 */
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    //可用invoker列表不能为空,否则直接抛出异常
    checkInvokers(copyInvokers, invocation);
    //方法名
    String methodName = RpcUtils.getMethodName(invocation);
    /*
     * 计算最大调用次数,默认3,包括两次失败重试,最小1次
     * retries属性
     */
    int len = calculateInvokeTimes(methodName);
    // retry loop.
    RpcException le = null; // last exception.
    //已调用过的invoker列表
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    //已调用过的provider列表
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //重试前重新从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表
        //如果此时服务有变更,那么invoked列表将失去准确性
        if (i > 0) {
            checkWhetherDestroyed();
            //重新服务路由
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        /*
         * 基于负载均衡策略选择一个invoker
         */
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        //添加到已调用列表
        invoked.add(invoker);
        RpcContext.getServiceContext().setInvokers((List) invoked);
        boolean success = false;
        try {
            /*
             * 使用服务提供者invoker执行rpc服务调用
             */
            Result result = invokeWithContext(invoker, invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                    + " in the service " + getInterface().getName()
                    + " was successful by the provider " + invoker.getUrl().getAddress()
                    + ", but there have been failed providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost()
                    + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le);
            }
            success = true;
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            //如果未成功
            if (!success) {
                //加入已调用过的provider列表
                providers.add(invoker.getUrl().getAddress());
            }
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
        + methodName + " in the service " + getInterface().getName()
        + ". Tried " + len + " times of the providers " + providers
        + " (" + providers.size() + "/" + copyInvokers.size()
        + ") from the registry " + directory.getUrl().getAddress()
        + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
        + Version.getVersion() + ". Last error is: "
        + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

AbstractClusterInvoker#select选择invoker

基于负载均衡策略选择一个invoker。

/**
 * AbstractClusterInvoker的方法
 * 
 * 基于负载均衡策略选择一个invoker
 *
 * @param loadbalance 负载均衡策略
 * @param invocation  调用方法抽象
 * @param invokers    全部服务提供者invoker列表,已经经过了路由过滤
 * @param selected    已被调用过的服务提供者invoker列表
 * @return
 * @throws RpcException
 */
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    //调用方法名
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
    //默认false,设置true 该接口上的所有方法使用同一个provider.如果需要更复杂的规则,请使用路由
    boolean sticky = invokers.get(0).getUrl()
        .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method 忽略重载方法
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem 忽略并发问题
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availableCheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }
    /*
     * 继续选择invoker
     */
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {
        stickyInvoker = invoker;
    }

    return invoker;
}

AbstractClusterInvoker#doSelect继续选择invoker

  1. 如果只有一个服务提供者invoker,那么返回。
  2. 调用loadbalance#select方法基于负载均衡策略选择一个invoker。这是核心方法,Dubbo提供有很多的负载均衡实现,具体的源码我们后面单独分析。
  3. 如果invoker在已被调用过的服务提供者invoker列表中,那么调用reselect方法重新选择一个。
  4. 重新选择仍然失败,采用兜底策略。检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invoker,否则返回第一个。
/**
 * AbstractClusterInvoker的方法
 * <p>
 * 基于负载均衡策略选择一个invoker
 *
 * @param loadbalance 负载均衡策略
 * @param invocation  调用方法抽象
 * @param invokers    全部服务提供者invoker列表,已经经过了路由过滤
 * @param selected    已被调用过的服务提供者invoker列表
 */
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    //如果只有一个服务提供者invoker,那么返回
    if (invokers.size() == 1) {
        Invoker<T> tInvoker = invokers.get(0);
        checkShouldInvalidateInvoker(tInvoker);
        return tInvoker;
    }
    /*
     * 基于负载均衡策略选择一个invoker
     * 核心方法
     */
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    //如果invoker在selected中,则重新选择。
    boolean isSelected = selected != null && selected.contains(invoker);
    //如果availableCheck=true,并且invoker不可用,则重新选择。
    boolean isUnavailable = availableCheck && !invoker.isAvailable() && getUrl() != null;

    if (isUnavailable) {
        invalidateInvoker(invoker);
    }
    //重新选择
    if (isSelected || isUnavailable) {
        try {
            /*
             * 重新基于负载均衡策略选择一个invoker
             */
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availableCheck);
            if (rInvoker != null) {
                invoker = rInvoker;
            } 
            //重新选择仍然失败,兜底策略
            else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                //检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invoker
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn("2-5", "select invokers exception", "", e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("2-5", "failed to reselect invokers", "", "cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }

    return invoker;
}

FailoverClusterInvoker#invokeWithContext调用服务

将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量。随后使用服务提供者invoker#invoke执行rpc服务调用。

/**
 * AbstractClusterInvoker
 * <p>
 * 使用服务提供者invoker执行rpc服务调用
 *
 * @param invoker    服务提供者invoker
 * @param invocation 方法调用抽象
 * @return 调用结果
 */
protected Result invokeWithContext(Invoker<T> invoker, Invocation invocation) {
    //将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量
    setContext(invoker);
    Result result;
    try {
        if (ProfilerSwitch.isEnableSimpleProfiler()) {
            InvocationProfilerUtils.enterProfiler(invocation, "Invoker invoke. Target Address: " + invoker.getUrl().getAddress());
        }
        /*
         * 使用服务提供者invoker执行rpc服务调用
         */
        result = invoker.invoke(invocation);
    } finally {
        //清除上下文中的invoker属性
        clearContext(invoker);
        InvocationProfilerUtils.releaseSimpleProfiler(invocation);
    }
    return result;
}

总结

本次我们学习了,Dubbo 发起服务调用的上半部分源码,实际上就是按照Invoker一层层的调用,每一层的Invoker又不同的功能,通过Invoker层层包装实现一些类似于AOP的能力。

调用过程中也会执行一个Filter链表,用于执行一些额外的逻辑,最终默认会执行到FailoverClusterInvoker,这是一个失败重试的Invoker,是Dubbo默认的容错策略,会给予负载均衡策略选择一个真实的服务提供者Invoker发起远程RPC调用。

调用过程中,Invoker全程参与,这里我们也能明白为什么Invoker被称为可执行体,因为其内部封装了Dubbo远程RPC调用的各种逻辑:服务路由、负载均衡、失败容错等等,也能明白Invoker作为Dubbo核心模型的重要性了。

整体看下来,Dubbo服务调用的源码是不是比此前服务注册和服务发现的源码简单得多了呢?下文我们学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/966631.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【信息系统项目管理师-案例真题】2016下半年案例分析答案和详解

更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 试题一【问题1】4 分【问题2】12 分【问题3】3 分【问题4】6 分试题二【问题1】3 分【问题2】4 分【问题3】8 分【问题4】5 分【问题5】5 分试题三【问题1】4 分【问题2】8 分【问题3】5 分【问题4】8 分试题一…

pytest-xdist 进行多进程并发测试!

在软件开发过程中&#xff0c;测试是确保代码质量和可靠性的关键步骤。随着项目规模的扩大和复杂性的增加&#xff0c;测试用例的执行效率变得尤为重要。为了加速测试过程&#xff0c;特别是对于一些可以并行执行的测试用 例&#xff0c;pytest-xdist 提供了一种强大的工具&…

【R语言】数据分析

一、描述性统计量 借助R语言内置的airquality数据集进行简单地演示&#xff1a; 1、集中趋势&#xff1a;均值和中位数 head(airquality) # 求集中趋势 mean(airquality$Ozone, na.rmT) # 求均值 median(airquality$Ozone, na.rmT) # 求中位数 2、众数 众数&#xff08;mod…

kafka服务端之日志存储

文章目录 日志布局日志索引日志清理日志删除基于时间基千日志大小基于日志起始偏移量 日志压缩总结 日志布局 Ka饮a 中的消息是以主题为基本单位进行归类的&#xff0c; 各个主题在逻辑 上相互独立。 每个主题又可以分为一个或多个分区&#xff0c; 分区的数量可以在主题创建的…

家用报警器的UML 设计及其在C++和VxWorks 上的实现01

M.W.Richardson 著&#xff0c;liuweiw 译 论文描述了如何运用 UML&#xff08;统一建模语言&#xff09;设计一个简单的家用报警器&#xff0c;并实现到 VxWorks 操作系统上。本文分两个部分&#xff0c;第一部分描述了如何用 UML 设计和验证家用报警器的模型&#xff0c;以使…

数据结构与算法-链表

单向链表&#xff08;带哨兵&#xff09; public class SinglyLinkedList {private Node head new Node(Integer.MIN_VALUE, null); // 定义一个哨兵节点作为头部节点&#xff0c;避免对头节点进行特殊处理// 节点类&#xff0c;包含值和指向下一个节点的引用private static …

剪辑学习整理

文章目录 1. 剪辑介绍 1. 剪辑介绍 剪辑可以干什么&#xff1f;剪辑分为哪些种类&#xff1f; https://www.bilibili.com/video/BV15r421p7aF/?spm_id_from333.337.search-card.all.click&vd_source5534adbd427e3b01c725714cd93961af 学完剪辑之后如何找工作or兼职&#…

自动驾驶数据集三剑客:nuScenes、nuImages 与 nuPlan 的技术矩阵与生态协同

目录 1、引言 2、主要内容 2.1、定位对比&#xff1a;感知与规划的全维覆盖 2.2、数据与技术特性对比 2.3、技术协同&#xff1a;构建全栈研发生态 2.4、应用场景与评估体系 2.5、总结与展望 3、参考文献 1、引言 随着自动驾驶技术向全栈化迈进&#xff0c;Motional 团…

快速提取Excel工作簿中所有工作表的名称?

大家好&#xff0c;我是小鱼。 在Excel表格中如何快速提取工作簿中所有工作表的名称&#xff1f;这个问题在日常工作中也经常遇到&#xff0c;比如说经常使用的INDIRECT函数跨工作表汇总或者制作类似于导航的工作表快捷跳转列表&#xff0c;就需要每个工作表的名称。如果工作表…

【数据结构】(7) 栈和队列

一、栈 Stack 1、什么是栈 栈是一种特殊的线性表&#xff0c;它只能在固定的一端&#xff08;栈顶&#xff09;进行出栈、压栈操作&#xff0c;具有后进先出的特点。 2、栈概念的例题 答案为 C&#xff0c;以C为例进行讲解&#xff1a; 第一个出栈的是3&#xff0c;那么 1、…

从运输到植保:DeepSeek大模型探索无人机智能作业技术详解

DeepSeek&#xff0c;作为一家专注于深度学习与人工智能技术研究的企业&#xff0c;近年来在AI领域取得了显著成果&#xff0c;尤其在无人机智能作业技术方面展现了其大模型的强大能力。以下是从运输到植保领域&#xff0c;DeepSeek大模型探索无人机智能作业技术的详解&#xf…

qt部分核心机制

作业 1> 手动将登录项目实现&#xff0c;不要使用拖拽编程 并且&#xff0c;当点击登录按钮时&#xff0c;后台会判断账号和密码是否相等&#xff0c;如果相等给出登录成功的提示&#xff0c;并且关闭当前界面&#xff0c;发射一个跳转信号&#xff0c;如果登录失败&#…

【Spring】什么是Spring?

什么是Spring&#xff1f; Spring是一个开源的轻量级框架&#xff0c;是为了简化企业级开发而设计的。我们通常讲的Spring一般指的是Spring Framework。Spring的核心是控制反转(IoC-Inversion of Control)和面向切面编程(AOP-Aspect-Oriented Programming)。这些功能使得开发者…

【专题】2024-2025人工智能代理深度剖析:GenAI 前沿、LangChain 现状及演进影响与发展趋势报告汇总PDF洞察(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p39630 在科技飞速发展的当下&#xff0c;人工智能代理正经历着深刻的变革&#xff0c;其能力演变已然成为重塑各行业格局的关键力量。从早期简单的规则执行&#xff0c;到如今复杂的自主决策与多智能体协作&#xff0c;人工智能代理…

oCam:免费且强大的录屏软件

今天给大家推荐一个非常好的录屏软件。几乎可以满足你日常工作的需求。而且软件完全免费&#xff0c;没有任何的广告。 oCam&#xff1a;免费且强大的录屏软件 oCam是一款功能强大的免费录屏软件&#xff0c;支持屏幕录制、游戏录制和音频录制等多种模式&#xff0c;能够满足不…

spring学习(spring 配置文件详解)

一 了解如何创建基本的spring 配置文件 步骤 1 导入 spring-context 依赖 <!-- https://mvnrepository.com/artifact/org.springframework/spring-context --><dependency><groupId>org.springframework</groupId><artifactId>spring-context&l…

C++Primer学习(2.2)

2.2 变量 变量提供一个具名的、可供程序操作的存储空间。C中的每个变量都有其数据类型,数据类型决定着变量所占内存空间的大小和布局方式、该空间能存储的值的范围&#xff0c;以及变量能参与的运算。对C程序员来说,“变量(variable)”和“对象(object)”一般可以互换使用。 术…

无须付费,安装即是完全版!

不知道大家有没有遇到过不小心删掉了电脑上超重要的文件&#xff0c;然后急得像热锅上的蚂蚁&#xff1f; 别担心&#xff0c;今天给大家带来一款超给力的数据恢复软件&#xff0c;简直就是拯救文件的“救星”&#xff01; 数据恢复 专业的恢复数据软件 这款软件的界面设计得特…

【Ubuntu】本地部署Deep Seek(深度求索)大模型的保姆级教程 | 详细教程

杭州深度求索人工智能基础技术研究有限公司(简称“深度求索”或“DeepSeek”)&#xff0c;成立于2023年&#xff0c;DeepSeek是一家专注通用人工智能&#xff08;AGI&#xff09;的中国科技公司&#xff0c;主攻大模型研发与应用&#xff0c;经营范围包括技术服务、技术开发、软…

Ollama + AnythingLLM + Deepseek r1 实现本地知识库

1、Ollama&#xff1a;‌是一个开源的大型语言模型 (LLM)服务工具&#xff0c;旨在简化在本地运行大语言模型的过程&#xff0c;降低使用大语言模型的门槛‌。 2、AnythingLLM&#xff1a;是由Mintplex Labs Inc. 开发的一款全栈应用程序&#xff0c;旨在构建一个高效、可定制、…