Nacos2.X源码分析:服务注册、服务发现流程

文章目录

  • Nacos2.1.X源码
    • 源码下载
    • 服务注册
      • NacosClient端
      • NacosServer端
    • 服务发现
      • NacosClient端
      • NacosServer端

Nacos2.1.X源码

源码下载

源码下载地址



服务注册

官方文档,对于NamingService接口服务注册方法的说明

Nacos2.X 服务注册总流程图
在这里插入图片描述



NacosClient端

一个小变动,Nacos1.X版本,在spring.factories文件中服务注册相关的bean是在NacosDiscoveryAutoConfiguration这个自动配置类中的,而2.X版本改到了NacosServiceRegistryAutoConfiguration配置类中,当然调用流程没有改,还是父类监听事件 --> bind() --> start() --> register() -->registerInstance()

这里直接就从NamingService接口的registerInstance(...)方法开始

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    // 调用NacosServer端,发送服务注册, Nacos2.X采用的是grpc方式请求,所以这里是去NamingClientProxyDelegate实现类的方法中
    clientProxy.registerService(serviceName, groupName, instance);
}



// 补充说明,对上面一行代码的说明
// 如果在看源码时,遇到了一个方法,实在是不知道应该去看哪一个实现类,也不能debug的情况下,那么就去找调用方定义的位置,比如这里
public class NacosNamingService implements NamingService {
    // clientProxy变量定义的位置
    private NamingClientProxy clientProxy;

    private void init(Properties properties) throws NacosException {
        ...
            // 该变量赋值的位置,也就定位到了原来是要去NamingClientProxyDelegate这个实现类中看方法
            this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
    }

    ...

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        // 所以这里是去NamingClientProxyDelegate实现类的方法中
        clientProxy.registerService(serviceName, groupName, instance);
    }
}

NamingClientProxyDelegate类的registerService(...)方法中会判断当前要注册的实例是否为临时实例,如果是临时实例就使用grpc的方式请求NacosService,如果是持久化实例就还是使用http的方式请求,一般情况下都是临时实例,所以会采用grpc的方式调用

现在进入到了NamingGrpcClientProxy.registerService(...)方法中了

看完了下面的方法,我们知道这里其实:

  • 构建的一个InstanceRedoData对象,存入一个registeredInstances集合中;

  • 直接发送了一个grpc请求,请求参数是InstanceRequest;

  • 请求发送完后修改registeredInstances集合中InstanceRedoData对象的一个属性值

  • 此时是不是有一个疑问,我的instance怎么没有定时发送心跳的任务嘞?

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                       instance);
    // 构建的一个InstanceRedoData对象,存入一个registeredInstances集合中
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    // 进行服务注册
    doRegisterService(serviceName, groupName, instance);
}


public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // NacosClient这边会封装为一个请求对象,
    // 我们这里可以利用InstanceRequest这个类名去NacosServer端找该调用接口具体实现位置,一般的命名就是后面加一个Handler进行拼接
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                  NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // 使用grpc的方式发送请求
    requestToServer(request, Response.class);
    // 这里就从registeredInstances集合中取出上面构建的InstanceRedoData对象,并把它的registered属性设置为true
    redoService.instanceRegistered(serviceName, groupName);
}


// 真正发送grpc请求方法
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
    throws NacosException {
    try {
        request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        
        // 通过rpcClient发送请求,更具体的实现就没必要去看了,都是grpc相关的内容了
        Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                            response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}



NacosServer端

在Nacos1.X版本时,我们能够根据NacosClient端发送的请求url去找controller,比如http请求的url为/v1/ns/instance,那么就找InstanceController

在Nacos2.X版本中使用了grpc,我们可以根据发送的请求对象类名去找,比如上面NacosClient端使用grpc发送的请求对象是InstanceRequest,那么我们就可以去NacosServer端找InstanceRequestHandler这种直接在请求类名后面拼接一个handler的类。



现在进入到InstanceRequestHandler

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        // 创建一个service对象,这里和Nacos1.X有一些小变动,
        // Nacos1.X的Service对象中能同时保存持久化实例和非持久化实例集合
        // Nacos2.X的Service对象只能保存一种了,要么该service对应持久化实例,要么就对应非持久化实例
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        // 判断请求类型,是注册实例还是注销实例,进而调用对应的方法
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                // 调用 下方 注册服务的方法
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        // 注册实例,会进入到service层的registerInstance()方法
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        // 注销实例,会进入到service层的deregisterInstance()方法
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
    
}



Service层的deregisterInstance()方法:

  • 将service添加进singletonRepository<Service, Service>namespaceSingletonMaps<namespace, Set<Service>>集合中,

  • 从singletonRepository集合中取出最先添加的service

  • 根据clientId取出Client

  • 将service与instance添加至Client对象中的publishers<Service, InstancePublishInfo>集合中

  • 发布三个事件:ClientChangedEvent、ClientRegisterServiceEvent、InstanceMetadataEvent

此时是不是有一个疑问:service是存在多个instance的,此时service保存在一边的集合中,而instance又保存在Client对象内的集合中。那怎么没有service与多个instance之间的对应关系嘞?或者会不会有一个Map<service,Set<Client>> 这样的集合存在嘞?

public void registerInstance(Service service, Instance instance, String clientId) {
    // 把service存入singletonRepository集合 如果不存在的前提下,并存入namespaceSingletonMaps集合中该服务命名空间对应的set集合中
    // 从singletonRepository集合中取出最先添加的service,因为之后新创建的service不会添加进singletonRepository集合中
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(...);
    }
    // clientId就是NacosClient发送请求时传递的一个connectionId 。client对象存放的就是NacosClient客户端相关的信息
    // 对于临时实例来说,getClient()就是从clients集合中取,返回的是IpPortBasedClient对象
    // 客户端与服务端建立连接之后,服务端就会生成一个Client对象,服务端会通过客户端传过来的connectionId来找到对应的Client对象
    Client client = clientManager.getClient(clientId);
    // 对客户端进行一些校验
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 把客户端封装的instance对象 转换为 服务端这边的instance对象
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 将service以及instanceInfo对象保存至 publishers <Service, InstancePublishInfo>  这个Map集合中
    // 同时发布一个ClientChangedEvent事件
    client.addServiceInstance(singleton, instanceInfo);
    // 设置最后修改时间
    client.setLastUpdatedTime();
    // 再发布两个事件ClientRegisterServiceEvent、InstanceMetadataEvent
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}


// 接下来就是上面方法中调用的一些其他方法的代码
------------------------------------------------------------------------------------------------------------------------
/**
* 两个集合保存的数据:
*      singletonRepository <Service, Service>
*      namespaceSingletonMaps  ConcurrentHashMap<namespace, Set<Service>>
*/
public Service getSingleton(Service service) {
    // 如果singletonRepository集合中没有当前service,那么就存进该集合中
    singletonRepository.putIfAbsent(service, service);
    // 从集合中取出service,这里有两种情况,
    // 1. 如果上面singletonRepository集合中刚开始不存在当前service,那么这里获取的就是方法参数中传过来的对象,
    // 2. 如果上面singletonRepository集合中之前就已经存在了该service,那么这里获取的就是之前存入该集合的service对象
    Service result = singletonRepository.get(service);
    // 该service的命名空间是否在namespaceSingletonMaps集合中存在,如果不存在则创建一个set集合
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
    // 将service添加进namespaceSingletonMaps集合中命名空间对应的set集合中
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
} 

------------------------------------------------------------------------------------------------------------------------
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    // parseToHealthCheckInstance() 把服务注册时生成的 InstancePublishInfo 对象 转换为 HealthCheckInstancePublishInfo类型的对象
    return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}

@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    
    // 将service与instance添加至Client对象中的publishers<Service, InstancePublishInfo>集合中
    if (null == publishers.put(service, instancePublishInfo)) {
        MetricsMonitor.incrementInstanceCount();
    }
    // 发布ClientChangedEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}



至此,NacosClient端请求NacosServer端进行服务注册的一次请求就结束了,但NacosServer端真正处理的逻辑还没有结束,接下来要去看监听事件是如何进行处理的。在idea中使用ctrl+shift+R进行全文搜索,看看某个事件对象到底是在哪里进行处理的。我们这里先看核心的ClientRegisterServiceEvent事件

ClientServiceIndexesManager.onEvent(...),该方法:

  • 进一步判断事件类型,去调用各个事件向对应的方法
  • 在服务注册对应的方法中,将service和clientid存入publisherIndexes<Service, Set<clientId>>集合中
  • 发布一个ServiceChangedEvent事件
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    ...
    @Override
    public void onEvent(Event event) {
        // 根据两类事件去调用对应的方法,再进行更细致的判断
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            // 注册服务、注销服务、订阅服务、取消订阅,相关的事件
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    ...
    
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            // 注册服务
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            // 注销服务
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            // 订阅服务
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            // 取消订阅
            removeSubscriberIndexes(service, clientId);
        }
    }
    
    //服务注册
    private void addPublisherIndexes(Service service, String clientId) {
        // 将service和clientid存入publisherIndexes<Service, Set<clientId>>集合中
        // 也就是此集合保证了service和多个instance之间的对应关系
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        // 发布一个ServiceChangedEvent事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
    ...
}



其实到现在为止,服务注册的主要功能已经实现了,已经将service和instance保存在内存中了。

一个集合专门存放service,singletonRepository <Service, Service> 集合

一个命名空间下所有的service,namespaceSingletonMaps <namespace, Set<Service>>集合

一个集合存放Client,clients <clientId, IpPortBasedClient>集合

Client对象中的publishers <Service, InstancePublishInfo>集合

service和Client的绑定publisherIndexes <Service, Set<clientId>>集合

而对于这里又发布的ServiceChangedEvent事件,其实从名字就能看出来这个一个服务改变的事件,那么我们就可以大胆猜一下这之后的处理逻辑很大概率就是把最新的改动推送给其他位置,其他位置进行更新。

所以我们这里先不继续往下了。



服务发现

Nacos2.X 服务发现在线流程图

在这里插入图片描述



NacosClient端

NamingService接口中,它其中有两类方法是用来获取服务实例的:

  • getAllInstances(...) 获取全部实例
  • selectInstances(...) 根据条件获取过滤后的实例列表。可以获取健康或不健康的服务实例

我们主要看NamingService.selectInstances(...)方法的实现逻辑

该方法的逻辑:

  • 先查本地缓存serviceInfoMap <key, ServiceInfo>,将查询的服务实例进行健康与不健康的筛选
  • 查询本地缓存如果没有查询到,那么就会调用NamingClientProxyDelegate.subscribe(..)方法,该方法会开启一个定时任务去NacosServer端拉取,并更新本地缓存。除此之外还会向NacosServer端发送一个grpc请求
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
                                      boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    // subscribe 是否订阅,默认值是true
    if (subscribe) {
        // 先从本地缓存serviceInfoMap中取
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        // 如果本地缓存中没有我们需要的服务实例列表信息,那么就向NacosServer端发送一个grpc请求,进行获取服务实例列表数据
        // 这里还会开启一个任务,每隔一段时间向Nacos拉取服务实例信息
        if (null == serviceInfo) {
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 移除与healthy不相同的服务实例,比如healthy为true表示我只要健康的服务实例,那么就要移除不健康实例
    return selectInstances(serviceInfo, healthy);
}

查询本地缓存的方法

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
    // groupName + @@ + serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 通过 grouped、ServiceName、clusters 生成一个key
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 根据这个key去本地缓存serviceInfoMap中去找
    return serviceInfoMap.get(key);
}



NamingClientProxyDelegate.subscribe(..)方法,该方法会开启一个定时任务去NacosServer端拉取,并更新本地缓存。除此之外还会向NacosServer端发送一个grpc请求

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
    // 生成key
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // 添加一个延迟1s执行任务,该任务会定期想NacosServer端拉取服务实例信息
    // 该定时任务和Nacos1.X版本一样,没什么改动,最多就是http变为了grpc
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // 从本地缓存中取
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    // 如果没有就发送grpc请求,因为上面的延迟任务会延迟1s执行,所以这里从本地缓存中取不到数据
    if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // 将查询的数据存入本地缓存中
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

// ----------------------------------------------------------------------------------------
// 首先看 定时向NacosServer端拉取服务实例的实现
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }

        // 添加一个延迟1s执行任务,该任务会定期想NacosServer端拉取服务实例信息
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}


@Override
public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
            serviceKey)) {
            NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
            isCancel = true;
            return;
        }

        // 从本地缓存中取,如果取不到那么就会通过queryInstancesOfService()方法去向NacosServer端发送请求查询服务实例列表
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // 发请求
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 将结果保存本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        // 如果本地缓存中的ServiceInfo对象的lastRefTime属性 小于等于了 lastRefTime也要发送请求
        // 所以大部分情况下 下面的if都会满足,因为每一次发送获取服务实例列表的请求后都会更新lastRefTime的值,下一次执行该任务这个就会相等
        // 那么就又要继续发送请求
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            // 发请求
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // 将结果保存本地缓存
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // 更新lastRefTime值
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // 刷新失败次数
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
    } finally {
        // 嵌套调用自己,6s <= 间隔时间 <= 60s,间隔时间和失败次数有关
        if (!isCancel) {
            executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);
        }
    }
}

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
                                           boolean healthyOnly) throws NacosException {
    // 定时任务,查询服务实例列表的请求对象ServiceQueryRequest
    ServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);
    request.setCluster(clusters);
    request.setHealthyOnly(healthyOnly);
    request.setUdpPort(udpPort);
    // 发送请求
    QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);
    return response.getServiceInfo();
}


//----------------------------------------------------------------------------------------
// 接下来看NamingClientProxyDelegate.subscribe(..)方法中,直接发送的grpc请求
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
    }
    redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
    // 向NacosServer发送一个grpc请求,拉取服务实例列表信息,
    return doSubscribe(serviceName, groupName, clusters);
}

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
    // 封装请求对象SubscribeServiceRequest
    SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);
    // 发送grpc请求
    SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
    redoService.subscriberRegistered(serviceName, groupName, clusters);
    return response.getServiceInfo();
}



NacosServer端

上面发送grpc请求对象是SubscribeServiceRequest,所以我们直接在NacosServer端找SubscribeServiceRequestHandler

下方handle方法处理逻辑为:

  • 获取请求参数
  • 将请求参数封装为一个service对象,该对象是被订阅方
  • 创建一个Subscriber对象,该对象是订阅方
  • 通过调用serviceStorage.getData(service)方法获取到ServiceInfo对象,ServiceInfo对象中包含了instance集合
  • 进行订阅的处理逻辑
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {

    ...

    @Override
    @Secured(action = ActionTypes.READ)
    public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
        // 获取请求参数
        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 将请求参数封装为一个service对象
        Service service = Service.newService(namespaceId, groupName, serviceName, true);
        // 创建一个Subscriber对象,其中存放订阅方的ip信息,以及订阅哪一个service的信息
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
                                               namespaceId, groupedServiceName, 0, request.getClusters());
        // 响应给客户端的数据,核心方法就是getData(service)
        ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),
                                                metadataManager.getServiceMetadata(service).orElse(null), subscriber);
        // 是否订阅
        if (request.isSubscribe()) {
            // 查看订阅的逻辑
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }
}



我们首先来看serviceStorage.getData(service)方法获取到ServiceInfo对象,看看该方法的处理逻辑

  • 查询serviceDataIndexes缓存,是否有我们要查询service对应的ServiceInfo对象
  • 如果缓存中没有那就创建一个新的ServiceInfo对象,并去服务注册表中找该服务所有的实例
  • 先根据service,从publisherIndexes <Service, Set<clientId>>集合中找出所有的clientId
  • 在遍历clientId,根据clientId从clients <clientId, IpPortBasedClient>集合中找对应的Client对象
  • 再从Client对象中publishers <Service, InstancePublishInfo>集合,找出InstancePublishInfo对象
  • 再把InstancePublishInfo转换为instance对象,最后得到Set<Instance>集合,存入ServiceInfo对象中
  • 在上面的过程中,会创建一个service与cluster的对应关系集合 serviceClusterIndex <Service, Set<clusters>>
  • 将serviceInfo对象存入缓存serviceDataIndexes <Service, ServiceInfo>
public ServiceInfo getData(Service service) {
    // serviceDataIndexes集合缓存中是否有我们要查询的service,如果有就直接返回,如果没有就调用getPushData()方法
    // serviceDataIndexes就相当于是注册表的一份缓存数据,实现了读写分离,服务注册时添加 publisherIndexes <Service, Set<clientId>>集合
    // 服务发现时读取serviceDataIndexes <Service, ServiceInfo>集合
    return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    // 通过service对象构建出一个ServiceInfo对象
    ServiceInfo result = emptyServiceInfo(service);
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    // hosts属性就是instance集合,所以核心看getAllInstancesFromIndex()关键方法
    result.setHosts(getAllInstancesFromIndex(service));
    // 存入缓存中  serviceDataIndexes <Service, ServiceInfo>  ServiceInfo中存的是List<Instance>
    serviceDataIndexes.put(service, result);
    return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
    Set<Instance> result = new HashSet<>();
    Set<String> clusters = new HashSet<>();
    // 遍历service对应的所有clientId
    for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        // 从client对象中取出instance封装之后的InstancePublishInfo对象
        Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
        if (instancePublishInfo.isPresent()) {
            // 得到instance对象,添加进集合
            Instance instance = parseInstance(service, instancePublishInfo.get());
            result.add(instance);
            clusters.add(instance.getClusterName());
        }
    }
    // cache clusters of this service
    // 存入缓存  serviceClusterIndex  <Service, Set<clusters>>
    serviceClusterIndex.put(service, clusters);
    return new LinkedList<>(result);
}



我们接下来再来看订阅相关的处理逻辑,这里会调用到EphemeralClientOperationServiceImpl.subscribeService(..)方法中,该方法业务逻辑:

  • singletonRepository <Service, Service> 集合中取出被订阅方的service对象
  • 在根据clientId获取到Client对象,它代表着订阅方
  • 将service和subscriber 保存在Client对象中的subscribers <Service, Subscriber> 集合中
  • 发布ClientSubscribeServiceEvent事件
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    // 从singletonRepository这个集合中把service对象取出来,如果集合中不存在的话那就用新创建的
    // 这个singleton 是被订阅方
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    // 获取Client,是订阅方
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    // 将service和subscriber 保存在Client对象中的subscribers <Service, Subscriber> 集合中
    // 表示value service 订阅了 key service
    client.addServiceSubscriber(singleton, subscriber);
    // 修改最后更新时间
    client.setLastUpdatedTime();
    // 发布ClientSubscribeServiceEvent事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

处理该事件的位置和处理服务注册的位置是一样的,都是ClientServiceIndexesManager.onEvent(...),方法:

  • 和服务注册的处理逻辑一样,创建一个集合,将service和多个订阅方进行绑定subscriberIndexes <Service, Set<subscriberClientId>>
  • 发布ServiceSubscribedEvent事件
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    ...
        @Override
        public void onEvent(Event event) {
        // 根据两类事件去调用对应的方法,再进行更细致的判断
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            // 注册服务、注销服务、订阅服务、取消订阅,相关的事件
            handleClientOperation((ClientOperationEvent) event);
        }
    }

    ...

        private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            // 注册服务
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            // 注销服务
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            // 订阅服务
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            // 取消订阅
            removeSubscriberIndexes(service, clientId);
        }
    }

    private void addSubscriberIndexes(Service service, String clientId) {
        // 服务订阅的处理
        // 这里会有一个集合  subscriberIndexes <Service, Set<subscriberClientId>>
        // 把这个service和多个订阅方进行绑定
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            // 订阅方的clientId添加完后就会发布一个ServiceSubscribedEvent事件,此时订阅的主流程已经结束
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }

    ...
}



在服务发现过程中出现的集合:

服务注册表的副本,serviceDataIndexes <Service, ServiceInfo> ServiceInfo中存的是List<Instance>

服务与cluster的对应关系 serviceClusterIndex <Service, Set<clusters>>

Client对象中的subscribers <Service, Subscriber> 集合,保存着Subscriber订阅了Service服务

service和多个订阅方进行绑定 subscriberIndexes <Service, Set<subscriberClientId>>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/781091.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为OSPF配置DR和BDR与指定DR

基础配置 <Huawei>sys #进入配置模式 Enter system view, return user view with CtrlZ. [Huawei]un in en #关闭报文弹窗 Info: Information center is disabled. [Huawei]sys R1 #设备名更改为R1 [R1]int g0/0/0 …

智能物联网鱼缸

硬件部分及接线图 工具 继电器、开发板、物联网os、云平台 微信小程序 结构&#xff1a;images、pages两个为主体。 标题头部分 <view class"container"> <view class"head_box"> <image src"/images/面性鱼缸.png"><…

【Java】详解String类中的各种方法

创建字符串 常见的创建字符串的三种方式&#xff1a; // 方式一 String str "hello world"; // 方式二 String str2 new String("hello world"); // 方式三 char[] array {a, b, c}; String str3 new String(array); "hello" 这样的字符串字…

昇思学习打卡-8-FCN图像语义分割

目录 FCN介绍FCN所用的技术训练数据的可视化模型训练模型推理FCN的优点和不足优点不足 FCN介绍 FCN主要用于图像分割领域&#xff0c;是一种端到端的分割方法&#xff0c;是深度学习应用在图像语义分割的开山之作。通过进行像素级的预测直接得出与原图大小相等的label map。因…

3-4 优化器和学习率

3-4 优化器和学习率 主目录点这里 优化器是机器学习和深度学习模型训练过程中用于调整模型参数的方法。它的主要目标是通过最小化损失函数来找到模型参数的最优值&#xff0c;从而提升模型的性能。 在深度学习中&#xff0c;优化器使用反向传播算法计算损失函数相对于模型参数…

C++ 函数高级——函数的占位参数

C中函数的形参列表里可以有占位参数&#xff0c;用来做占位&#xff0c;调用函数时必须填补改位置 语法&#xff1a; 返回值类型 函数名&#xff08;数据类型&#xff09;{ } 在现阶段函数的占位参数存在意义不大&#xff0c;但是后面的课程中会用到该技术 示例&#xff1a;…

TypeScript:5分钟上手创建一个简单的Web应用

一、练习TypeScript实例 1.1 在src目录里创建greeter.ts greeter.ts文件代码 // https://www.tslang.cn/docs/handbook/typescript-in-5-minutes.html // 格式化快捷键&#xff1a;https://blog.csdn.net/Dontla/article/details/130255699 function greeter(name: string) …

Windows电脑下载、安装VS Code的方法

本文介绍Visual Studio Code&#xff08;VS Code&#xff09;软件在Windows操作系统电脑中的下载、安装、运行方法。 Visual Studio Code&#xff08;简称VS Code&#xff09;是一款由微软开发的免费、开源的源代码编辑器&#xff0c;支持跨平台使用&#xff0c;可在Windows、m…

采煤机作业3D虚拟仿真教学线上展示增强应急培训效果

在化工行业的生产现场&#xff0c;安全永远是首要之务。为了加强从业人员的应急响应能力和危机管理能力&#xff0c;纷纷引入化工行业工艺VR模拟培训&#xff0c;让应急演练更加生动、高效。 化工行业工艺VR模拟培训软件基于真实的厂区环境&#xff0c;精确还原了各类事件场景和…

vue2 webpack使用optimization.splitChunks分包,实现按需引入,进行首屏加载优化

optimization.splitChunks的具体功能和配置信息可以去网上自行查阅。 这边简单讲一下他的使用场景、作用、如何使用&#xff1a; 1、没用使用splitChunks进行分包之前&#xff0c;所有模块都揉在一个文件里&#xff0c;那么当这个文件足够大、网速又一般的时候&#xff0c;首…

Transformer-LSTM预测 | Matlab实现Transformer-LSTM多变量时间序列预测

Transformer-LSTM预测 | Matlab实现Transformer-LSTM多变量时间序列预测 目录 Transformer-LSTM预测 | Matlab实现Transformer-LSTM多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现Transformer-LSTM多变量时间序列预测&#xff0c;Transf…

玫瑰千层烤饼:味蕾的芬芳盛宴

在美食的缤纷世界里&#xff0c;有一种独特的存在&#xff0c;它融合了玫瑰的芬芳与烤饼的酥脆&#xff0c;那便是令人陶醉的甘肃美食玫瑰千层烤饼。食家巷玫瑰千层烤饼&#xff0c;宛如一件精心雕琢的艺术品。每一层薄如纸张的面皮&#xff0c;都承载着制作者的细腻与用心。层…

MySQL Binlog详解:提升数据库可靠性的核心技术

文章目录 1. 引言1.1 什么是MySQL Bin Log&#xff1f;1.2 Bin Log的作用和应用场景 2. Bin Log的基本概念2.1 Bin Log的工作原理2.2 Bin Log的三种格式 3. 配置与管理Bin Log3.1 启用Bin Log3.2 配置Bin Log参数3.3 管理Bin Log文件3.4 查看Bin Log内容3.5 使用mysqlbinlog工具…

vue3项目 前端blocked:mixed-content问题解决方案

一、问题分析 blocked:mixed-content其实浏览器不允许在https页面里嵌入http的请求&#xff0c;现在高版本的浏览器为了用户体验&#xff0c;都不会弹窗报错&#xff0c;只会在控制台上打印一条错误信息。一般出现这个问题就是在https协议里嵌入了http请求&#xff0c;解决方法…

多线程(进阶)

前言&#x1f440;~ 上一章我们介绍了线程池的一些基本概念&#xff0c;今天接着分享多线程的相关知识&#xff0c;这些属于是面试比较常见的&#xff0c;大部分都是文本内容 常见的锁策略 乐观锁 悲观锁 轻量锁 重量级锁 自旋锁 挂起等待锁 可重入锁和不可重入锁 互斥…

优化后Day53 动态规划part11

LC1143最长公共子序列 1.dp数组的含义&#xff1a;dp[i][j]表示以下标i结尾的text1子序列和以下标j结尾的text2子序列的最长公共子序列 2. 初始化&#xff1a;跟LC718一样&#xff0c;i结尾的需要初始化&#xff0c;i-1结尾不需要初始化 3. 递推公式 如果charAt(i)charAt(j)&…

C++ 函数高级——函数重载——注意事项

1.引用作为重载条件 2.函数重载碰到函数默认参数 示例&#xff1a; 运行结果&#xff1a;

【IMU】 确定性误差与IMU_TK标定原理

1、确定性误差 MEMS IMU确定性误差模型 K 为比例因子误差 误差来源:器件的输出往往为脉冲值或模数转换得到的值,需要乘以一个刻度系数才能转换成角速度或加速度值,若该系数不准,便存在刻度系数误差。 T 为交轴耦合误差 误差来源:如下图,b坐标系是正交的imu坐标系,s坐标系的三…

UE C++ 多镜头设置缩放 平移

一.整体思路 首先需要在 想要控制的躯体Pawn上&#xff0c;生成不同相机对应的SpringArm组件。其次是在Controller上&#xff0c;拿到这个Pawn&#xff0c;并在其中设置输入响应&#xff0c;并定义响应事件。响应事件里有指向Pawn的指针&#xff0c;并把Pawn的缩放平移功能进行…

暄桐教练日课·21天《线的初识》即将开始 一起感受线描的乐趣

林曦老师的直播课&#xff0c;是暄桐教室的必修课。而教练日课是丰富多彩的选修课&#xff0c;它会选出书法史/美术史上重要的、有营养的碑帖和画儿&#xff0c;与你一起&#xff0c;高效练习。而且暄桐教练日课远不止书法、国画&#xff0c;今后还会有更多有趣的课程陆续推出&…