七、Nacos源码系列:Nacos服务发现

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {

    // 创建DiscoveryClient bean对象
	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
			matchIfMissing = true)
	public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
	}

}


@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
        // 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,
        // 如namespace、username、password、serverAddr等属性
		return new NacosDiscoveryProperties();
	}

    // 创建NacosServiceDiscovery bean对象
	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}

}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {

	private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);

	/**
	 * Nacos Discovery Client Description.
	 */
	public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";

	private NacosServiceDiscovery serviceDiscovery;

	public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return DESCRIPTION;
	}

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		try {
			return serviceDiscovery.getInstances(serviceId);
		}
		catch (Exception e) {
			throw new RuntimeException(
					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
		}
	}

	@Override
	public List<String> getServices() {
		try {
			return serviceDiscovery.getServices();
		}
		catch (Exception e) {
			log.error("get service name from nacos server fail,", e);
			return Collections.emptyList();
		}
	}

}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {

    // 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上
	private NacosDiscoveryProperties discoveryProperties;

    // nacos服务管理器对象
	private NacosServiceManager nacosServiceManager;

	public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		this.discoveryProperties = discoveryProperties;
		this.nacosServiceManager = nacosServiceManager;
	}

	/**
	 * 返回指定group和servic的所有实例
	 */
	public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
		// 配置文件中配置的group组名
        String group = discoveryProperties.getGroup();
		// namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NacosNamingService#selectInstances()方法
        List<Instance> instances = namingService().selectInstances(serviceId, group,
				true);
        // 将Instance包装成NacosServiceInstance对象返回
		return hostToServiceInstanceList(instances, serviceId);
	}

	/**
	 * 返回指定group的所有服务名称
	 */
	public List<String> getServices() throws NacosException {
        // 配置文件中配置的group组名
		String group = discoveryProperties.getGroup();
        // namingService(): 通过反射创建一个NacosNamingService对象
        // 最终会调用NamingGrpcClientProxy#getServiceList()方法
		ListView<String> services = namingService().getServicesOfServer(1,
				Integer.MAX_VALUE, group);
    	// 返回所有服务名称
		return services.getData();
	}

	public static List<ServiceInstance> hostToServiceInstanceList(
			List<Instance> instances, String serviceId) {
		List<ServiceInstance> result = new ArrayList<>(instances.size());
		for (Instance instance : instances) {
			ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);
			if (serviceInstance != null) {
				result.add(serviceInstance);
			}
		}
		return result;
	}

	public static ServiceInstance hostToServiceInstance(Instance instance,
			String serviceId) {
		if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {
			return null;
		}
		NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();
		nacosServiceInstance.setHost(instance.getIp());
		nacosServiceInstance.setPort(instance.getPort());
		nacosServiceInstance.setServiceId(serviceId);

		Map<String, String> metadata = new HashMap<>();
		metadata.put("nacos.instanceId", instance.getInstanceId());
		metadata.put("nacos.weight", instance.getWeight() + "");
		metadata.put("nacos.healthy", instance.isHealthy() + "");
		metadata.put("nacos.cluster", instance.getClusterName() + "");
		if (instance.getMetadata() != null) {
			metadata.putAll(instance.getMetadata());
		}
		metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));
		nacosServiceInstance.setMetadata(metadata);

		if (metadata.containsKey("secure")) {
			boolean secure = Boolean.parseBoolean(metadata.get("secure"));
			nacosServiceInstance.setSecure(secure);
		}
		return nacosServiceInstance;
	}

	private NamingService namingService() {
		return nacosServiceManager.getNamingService();
	}

}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
//getServices()调用栈大体如下:
 namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);
    NacosNamingService#getServicesOfServer
        clientProxy.getServiceList(pageNo, pageSize, groupName, selector)
            NamingClientProxyDelegate#getServiceList
                grpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);

// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
        throws NacosException {
    // 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名
    ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
    if (selector != null) {
        if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
            request.setSelector(JacksonUtils.toJson(selector));
        }
    }
    // 发送服务列表请求给Nacos服务端,接下来由服务端处理
    ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
    // 组装返回值出去
    ListView<String> result = new ListView<>();
    result.setCount(response.getCount());
    result.setData(response.getServiceNames());
    return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
    // ServiceManager.getInstance()通过单例返回一个ServiceManager对象
   
    /**
     * 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。
     * ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)
     * key: namespaceId
     * value: Set<Service>
     * 注册实例的时候,就往这个map写入了数据
     *
     * ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:
     * namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))
     */
    Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());

    // 构建响应结果
    ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
    if (!serviceSet.isEmpty()) {
        // 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceA
        Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
        // 按分页裁剪serviceNameSet
        List<String> serviceNameList = ServiceUtil
                .pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
        result.setCount(serviceNameSet.size());
        result.setServiceNames(serviceNameList);
    }
    return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);
	// NamingService#selectInstances(serviceName, groupName, healthy, true)
		// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    // 集群名称,使用逗号分隔
    String clusterString = StringUtils.join(clusters, ",");

    // 是否订阅,默认是订阅的
    if (subscribe) {
        /**
         * 1.从缓存中获取ServiceInfo
         * ConcurrentMap<String, ServiceInfo> serviceInfoMap
         * key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString
         * value: ServiceInfo
         */

        // 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUP
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        // 2.缓存为空,执行订阅服务
        if (null == serviceInfo) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 4.筛选满足条件的实例
    return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    // 组装服务名(带组名):groupName@@serviceName
    // 例如:DEFAULT_GROUP@@discovery-provider
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果指定了集群,那么key还会加上"@@clusters"
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // ConcurrentMap<String, ServiceInfo> serviceInfoMap
    // 从缓存中获取服务信息
    return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    // 服务名称(带组名)  格式:groupName@@serviceName
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果集群名称非空,key还需要拼接上集群名称
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 调度更新,往线程池中提交一个UpdateTask任务
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 获取缓存中的服务信息
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        // 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    if (!asyncQuerySubscribeService) {
        return;
    }
    // 组装key   格式:groupName@@serviceName@@clusters
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    // Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()
    // futureMap用于保存UpdateTask线程池任务的执行结果
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        // double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务
        if (futureMap.get(serviceKey) != null) {
            return;
        }

        // 往线程池中添加一个更新任务
        // UpdateTask实现了Runnable接口,需要关注其run()方法
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    // 延迟1s执行UpdateTask
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {
    
    long lastRefTime = Long.MAX_VALUE;
    
    private boolean isCancel;
    
    private final String serviceName;
    
    private final String groupName;
    
    private final String clusters;
    
    private final String groupedServiceName;
    
    private final String serviceKey;
    
    /**
     * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
     */
    private int failCount = 0;
    
    public UpdateTask(String serviceName, String groupName, String clusters) {
        this.serviceName = serviceName;
        this.groupName = groupName;
        this.clusters = clusters;
        this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
    }
    
    @Override
    public void run() {
        long delayTime = DEFAULT_DELAY;
        
        try {
            if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
                    serviceKey)) {
                NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
                isCancel = true;
                return;
            }
            
            ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
            if (serviceObj == null) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
                lastRefTime = serviceObj.getLastRefTime();
                return;
            }
            
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                // 使用grpc向服务端发送ServiceQueryRequest服务查询请求
                serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
                // 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件
                serviceInfoHolder.processServiceInfo(serviceObj);
            }
            lastRefTime = serviceObj.getLastRefTime();
            if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                // 记录失败次数
                incFailCount();
                return;
            }
            // TODO multiple time can be configured.
            delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
            // 重置失败次数
            resetFailCount();
        } catch (NacosException e) {
            handleNacosException(e);
        } catch (Throwable e) {
            handleUnknownException(e);
        } finally {
            if (!isCancel) {
                // 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
                        TimeUnit.MILLISECONDS);
            }
        }
    }
    
    private void handleNacosException(NacosException e) {
        incFailCount();
        int errorCode = e.getErrCode();
        if (NacosException.SERVER_ERROR == errorCode) {
            handleUnknownException(e);
        }
        NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());
    }
    
    private void handleUnknownException(Throwable throwable) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);
    }
    
    private void incFailCount() {
        int limit = 6;
        if (failCount == limit) {
            return;
        }
        failCount++;
    }
    
    private void resetFailCount() {
        failCount = 0;
    }
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
    // 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,
    // 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe

    // 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    // 使用grpc发送服务订阅请求
    return doSubscribe(serviceName, groupName, clusters);
}

public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
    SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
    // private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();
    synchronized (subscribes) {
        subscribes.put(key, redoData);
    }
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    // 构建一个SubscribeServiceRequest客户端订阅请求
    // 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handle
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
            true);
    // grpc请求Nacos服务端进行订阅
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    // 标记SubscriberRedoData重做数据为已订阅
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String serviceName = request.getServiceName();
    String groupName = request.getGroupName();
    String app = request.getHeader("app", "unknown");
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 构建一个Service服务,指定为临时实例
    Service service = Service.newService(namespaceId, groupName, serviceName, true);
    // 构建Subscriber订阅者对象
    Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
            namespaceId, groupedServiceName, 0, request.getClusters());
    // serviceStorage.getData(service): 从缓存中获取serviceInfo
    // metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata
    // ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例
    ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
            metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,
            true, subscriber.getIp());
    if (request.isSubscribe()) {
        // 订阅服务
        clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    } else {
        // 取消订阅服务
        clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
    }
    return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 添加到订阅者列表中,实际上就是保存在map中
    // 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 发布客户端订阅事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    // 获取老的服务
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    // 重新存入客户端缓存中
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    // 对比下服务信息是否发生变更
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                JacksonUtils.toJson(serviceInfo.getHosts()));
        // 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifier
        NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        // 同步serviceInfo数据到本地文件
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    // 构建服务查询请求
    // Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handle
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    // 通过grpc请求Nacos服务端处理
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
    String namespaceId = request.getNamespace();
    String groupName = request.getGroupName();
    String serviceName = request.getServiceName();
    Service service = Service.newService(namespaceId, groupName, serviceName);
    String cluster = null == request.getCluster() ? "" : request.getCluster();
    boolean healthyOnly = request.isHealthyOnly();
    // 从缓存中获取serviceInfo
    ServiceInfo result = serviceStorage.getData(service);
    // 从内存(map)获取ServiceMetadata
    ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
    // 获取有保护机制的健康实例
    result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
            meta.getClientIp());
    return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<>();
    }

    // 遍历所有实例,直接移除掉不满足条件的实例
    Iterator<Instance> iterator = list.iterator();
    while (iterator.hasNext()) {
        Instance instance = iterator.next();
        // 筛选出健康、启用、权重大于0的实例
        if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
            iterator.remove();
        }
    }
    
    return list;
}

3.5、总结图

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/378192.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DC-8靶机渗透详细流程

信息收集&#xff1a; 1.存活扫描&#xff1a; arp-scan -I eth0 -l └─# arp-scan -I eth0 -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:dd:ee:6a, IPv4: 192.168.10.129 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.10…

uni使用openlayer加载本机离线地图

manifest.json添加配置 "runmode": "liberate"(默认为normal) 把地图打包进apk&#xff0c;这样手机每次访问地图就可以访问到工程文件夹的地图资源了&#xff0c;不用每次都请求云资源&#xff0c;消耗流量太大了

第9章 安全漏洞、威胁和对策(9.11-9.16)

9.11 专用设备 专用设备王国疆域辽阔&#xff0c;而且仍在不断扩张。 专用设备是指为某一特定目的而设计&#xff0c;供某一特定类型机构使用或执行某一特定功能的任何设备。 它们可被看作DCS、物联网、智能设备、端点设备或边缘计算系统的一个类型。 医疗设备、智能汽车、…

《MySQL 简易速速上手小册》第3章:性能优化策略(2024 最新版)

文章目录 3.1 查询优化技巧3.1.1 基础知识3.1.2 重点案例3.1.3 拓展案例 3.2 索引和查询性能3.2.1 基础知识3.2.2 重点案例3.2.3 拓展案例 3.3 优化数据库结构和存储引擎3.3.1 基础知识3.3.2 重点案例3.3.3 拓展案例 3.1 查询优化技巧 让我们来聊聊如何让你的 MySQL 查询跑得像…

【python】if __name__ == ‘__main__‘:

if __name__ __main__: 是一个Python脚本中使用的常见结构&#xff0c;用来判断该脚本文件是直接运行的还是被导入到其他文件中运行的。 当一个Python文件被运行时&#xff0c;Python解释器会自动创建一些特殊的变量&#xff0c;__name__就是其中之一。如果这个文件是作为主程…

米贸搜|Facebook在购物季使用的Meta广告投放流程

一、账户简化 当广告系列开始投放后&#xff0c;每个广告组都会经历一个初始的“机器学习阶段”。简化账户架构可以帮助AI系统更快获得广告主所需的成效。例如&#xff1a; 每周转化次数超过50次的广告组&#xff0c;其单次购物费用要低28%&#xff1b;成功结束机器学习阶段的…

Ondo宣布将其原生稳定币USDY带入Sui生态

重要提示&#xff1a;USDY是由短期美国国债支持的token化票据&#xff0c;持有者享受稳定币的实用性同时获得收益。USDY不得在美国或向美国人出售或以其他方式提供。USDY也未根据1933年美国证券法注册。 不到一年的时间&#xff0c;Sui已经成为全链TVL排名前十的区块链&#xf…

Netty源码 之 ByteBuf自适应扩缩容源码

Netty体系如何使得ByteBuf根据实际IO收发数据场景进行自适应扩容缩容的&#xff1f; IO收发数据的过程&#xff1a; read 读取&#xff08;"I"&#xff09;&#xff1a;网卡硬件通过网络传输介质读取对端传输过来的数据&#xff0c;网卡硬件再把数据写到recv-socke…

Flask 入门7:使用 Flask-Moment 本地化日期和时间

如果Web应用的用户来自世界各地&#xff0c;那么处理日期和时间可不是一个简单的任务。服务器需要统一时间单位&#xff0c;这和用户所在的地理位置无关&#xff0c;所以一般使用协调世界时&#xff08;UTC&#xff09;。不过用户看到 UTC 格式的时间会感到困惑&#xff0c;他们…

Linux系统安装(CentOS Vmware)

学习环境安装 VMware安装 VMware下载&安装 访问官网&#xff1a;https://www.vmware.com 在此处可以选择语言 点击China&#xff08;简体中文&#xff09; 点击产品&#xff0c;点击Workstation Pro 下滑&#xff0c;点击下载试用版 下滑找到Workstation 17 Pro for Wi…

如何查看端口映射?

端口映射是一种用于实现远程访问的技术。通过将外网端口与内网设备的特定端口关联起来&#xff0c;可以使外部网络用户能够通过互联网访问内部网络中的设备和服务。在网络中使用端口映射可以解决远程连接需求&#xff0c;使用户能够远程访问设备或服务&#xff0c;无论是在同一…

彻底学会系列:一、机器学习之线性回归(一)

1.基本概念(basic concept) 线性回归&#xff1a; 有监督学习的一种算法。主要关注多个因变量和一个目标变量之间的关系。 因变量&#xff1a; 影响目标变量的因素&#xff1a; X 1 , X 2 . . . X_1, X_2... X1​,X2​... &#xff0c;连续值或离散值。 目标变量&#xff1a; …

DAY7 作业

1.简易QQ登录页面 实际效果 qss界面代码 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent) :QWidget(parent),ui(new Ui::Widget) {ui->setupUi(this);this->setWindowTitle("QQ"); /*设置窗口标题*/this->…

python官网下载慢怎么办?这里是一些解决方法

为什么Python官网下载速度慢&#xff1f; Python官网是开源软件的官方网站&#xff0c;提供了Python编程语言的最新版本和相关资源供开发者下载。然而&#xff0c;由于全球用户访问量较大&#xff0c;有时候会导致Python官网的下载速度变慢或不稳定。这对于急需获取Python的开…

idea设置terminal为git

要在IntelliJ IDEA中设置终端为Git Bash&#xff0c;请按照以下步骤操作&#xff1a; 打开 Settings&#xff08;设置&#xff09;。点击 Tools&#xff08;工具&#xff09;选项卡。进入 Terminal&#xff08;终端&#xff09;界面。在 Shell Path 下选择 Browse&#xff08;…

如何在 Microsoft Azure 上部署和管理 Elastic Stack

作者&#xff1a;来自 Elastic Osman Ishaq Elastic 用户可以从 Azure 门户中查找、部署和管理 Elasticsearch。 此集成提供了简化的入门体验&#xff0c;所有这些都使用你已知的 Azure 门户和工具&#xff0c;因此你可以轻松部署 Elastic&#xff0c;而无需注册外部服务或配置…

【git】本地项目推送到github、合并分支的使用

1. github上创建仓库信息 点击个人头像&#xff0c;选择【你的仓库】 点击【新增】 填写仓库信息 2. 本地项目执行的操作 1.生成本地的git管理 (会生成一个.git的文件夹) git init 2.正常提交到暂存区&#xff0c;并填写提交消息 git add . git commit -m "init…

c语言--指针数组(详解)

目录 一、什么是指针数组&#xff1f;二、指针数组模拟二维数组 一、什么是指针数组&#xff1f; 指针数组是指针还是数组&#xff1f; 我们类比一下&#xff0c;整型数组&#xff0c;是存放整型的数组&#xff0c;字符数组是存放字符的数组。 那指针数组呢&#xff1f;是存放…

[每日一题] 02.07 - 小鱼比可爱

小鱼比可爱 n int(input()) lis list(map(int,input().split())) res [0] for i in range(1,n):count 0for j in range(i):if lis[i] > lis[j]:count 1res.append(count)a .join(str(i) for i in res) print(a[:-1])

【大模型上下文长度扩展】MedGPT:解决遗忘 + 永久记忆 + 无限上下文

MedGPT&#xff1a;解决遗忘 永久记忆 无限上下文 问题&#xff1a;如何提升语言模型在长对话中的记忆和处理能力&#xff1f;子问题1&#xff1a;有限上下文窗口的限制子问题2&#xff1a;复杂文档处理的挑战子问题3&#xff1a;长期记忆的维护子问题4&#xff1a;即时信息检…