SpringBoot自动注入
项目启动的时候会通过自动注入的机制将
NacosDiscoveryClientConfiguration注入
当注入NacosDiscoveryClientConfiguration的时候会将DiscoveryClient一起注入Bean
DiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances和getServices方法,而且都是由NacosServiceDiscovery实现
获取实例信息
NacosDiscoveryClient
private NacosServiceDiscovery serviceDiscovery;
public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
this.serviceDiscovery = nacosServiceDiscovery;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
return serviceDiscovery.getInstances(serviceId);
}
catch (Exception e) {
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
return hostToServiceInstanceList(instances, serviceId);
}
NacosServiceDiscovery
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
//获取分组
String group = discoveryProperties.getGroup();
//查询服务下的实例
List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
//填充返回的实例信息数据
return hostToServiceInstanceList(instances, serviceId);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)
throws NacosException {
return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
// 默认是订阅的
if (subscribe) {
//从缓存中获取实例信息
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
//如果获取不到则从服务端拉取
if (null == serviceInfo) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 如果未订阅服务信息,则直接从服务器进行查询
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
//获取服务中的实例信息
return selectInstances(serviceInfo, healthy);
}
从缓存中拿数据
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
return serviceInfoMap.get(key);
}
获取服务的实例信息
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
Iterator<Instance> iterator = list.iterator();
while (iterator.hasNext()) {
Instance instance = iterator.next();
// 保留 健康、启用、权重大于0 的实例
if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
iterator.remove();
}
}
return list;
}
GRPC请求拉取服务实例信息
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
//定时同步服务端serviceInfo
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
//获取ServiceInfo 信息
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果没有则从服务端拿
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
//GRPC请求
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
//填充进Map中 这里可以看服务注册最后那部分代码最后也是调用serviceInfoHolder保存的
serviceInfoHolder.processServiceInfo(result);
return result;
}
定时同步服务端ServiceInfo
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
if (futureMap.get(serviceKey) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(serviceKey) != null) {
return;
}
//构建任务放到ScheduledFuture执行
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
futureMap.put(serviceKey, future);
}
}
缓存订阅信息
public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {
//拿服务当key
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
//构建需要缓存的订阅信息
SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);
//缓存订阅信息
synchronized (subscribes) {
subscribes.put(key, redoData);
}
}
执行订阅
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
//构建订阅请求
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true);
//执行订阅
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
//设置已订阅
redoService.subscriberRegistered(serviceName, groupName, clusters);
return response.getServiceInfo();
}
设置订阅信息已订阅
public void subscriberRegistered(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
synchronized (subscribes) {
SubscriberRedoData redoData = subscribes.get(key);
// 标记订阅数据已订阅
if (null != redoData) {
redoData.setRegistered(true);
}
}
}
Nacos订阅机制
Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理。该更新实例的更新实例,该更新本地缓存的更新本地缓存。
UpdateTask
public class UpdateTask implements Runnable {
public void run() {
long delayTime = DEFAULT_DELAY;
try {
//校验订阅任务是否不对 如果不对就不处理
if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(
serviceKey)) {
NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
isCancel = true;
return;
}
//从缓存中拿 Service信息
ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
//如果拿不到则去服务端拉取
if (serviceObj == null) {
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
//然后再填充进缓存
serviceInfoHolder.processServiceInfo(serviceObj);
//更新下事件
lastRefTime = serviceObj.getLastRefTime();
return;
}
// 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询
if (serviceObj.getLastRefTime() <= lastRefTime) {
//服务过期了重新查
serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //在缓存进去
serviceInfoHolder.processServiceInfo(serviceObj);
}
// 刷新更新时间
lastRefTime = serviceObj.getLastRefTime();
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
// 下次更新缓存时间设置,默认为6秒
// TODO multiple time can be configured.
delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
// 重置失败数量为0
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
} finally {
// 下次调度刷新时间,下次执行的时间与failCount有关
// failCount=0,则下次调度时间为6秒,最长为1分钟
// 即当无异常情况下缓存实例的刷新时间是6秒
if (!isCancel) {
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),
TimeUnit.MILLISECONDS);
}
}
}
}
实例变更事件处理
监听事件的注册
在NacosNamingService的subscribe方法中,通过如下方式进行了监听事件的注册:
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
这里的changeNotifier.registerListener便是进行具体的事件注册逻辑
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
//这里用到了双重检查锁机制
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
可以看出,事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。
这里的数据结构为Map,key为服务实例信息的拼接,value为监听事件的集合。
监听服务变更事件
因为UpdateTask 中假如没有从缓存中拿到服务信息则会通过grpc协议从服务端拉取然后会执行serviceInfoHolder.processServiceInfo方法缓存服务信息,当实例发生变化的话这个方法最终会发送一个InstancesChangeEvent 事件 所以这里会监听InstancesChangeEvent 事件进行处理
InstancesChangeNotifier
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
@Override
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
//[] final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
final com.alibaba.nacos.api.naming.listener.Event namingEvent = new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
// 最终调度执行listener.onEvent(namingEvent),只在NacosWatch#start找到了有效的EventListener,见下文
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}
}
}
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {
private Map<String, EventListener> listenerMap = new ConcurrentHashMap(16);
private final AtomicBoolean running = new AtomicBoolean(false);
public void start() {
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = (EventListener)this.listenerMap.computeIfAbsent(this.buildKey(), (event) -> {
return new EventListener() {
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List instances = ((NamingEvent)event).getInstances();
//[] Optional instanceOptional = NacosWatch.this.selectCurrentInstance(instances);
// 按IP和端口选择第一个instance作为当前的instance
Optional instanceOptional = instances.stream().filter((instance) -> {
return this.properties.getIp().equals(instance.getIp()) && this.properties.getPort() == instance.getPort();
}).findFirst()
instanceOptional.ifPresent((currentInstance) -> {
//[] NacosWatch.this.resetIfNeeded(currentInstance);
// 重新设置properties的metadata
if (!this.properties.getMetadata().equals(instance.getMetadata())) {
this.properties.setMetadata(instance.getMetadata());
}
});
}
}
};
});
}
}
获取服务信息
NacosDiscoveryClient
@Override
public List<String> getServices() {
try {
return serviceDiscovery.getServices();
}
catch (Exception e) {
log.error("get service name from nacos server fail,", e);
return Collections.emptyList();
}
}
public List<String> getServices() throws NacosException {
//获取分组
String group = discoveryProperties.getGroup();
//获取服务信息
ListView<String> services = namingService().getServicesOfServer(1,
Integer.MAX_VALUE, group);
return services.getData();
}
@Override
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException {
return getServicesOfServer(pageNo, pageSize, groupName, null);
}
@Override
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);
}
GRPC拉取信息
@Override
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)
throws NacosException {
//构建请求
ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);
if (selector != null) {
if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {
request.setSelector(JacksonUtils.toJson(selector));
}
}
//采用GRPC协议拉取信息
ServiceListResponse response = requestToServer(request, ServiceListResponse.class);
ListView<String> result = new ListView<String>();
result.setCount(response.getCount());
result.setData(response.getServiceNames());
return result;
}
服务端处理GRPC请求
接收获取服务的请求
ServiceListRequestHandler
@Override
@Secured(action = ActionTypes.READ)
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {
//根据命名空间获取这个命名空间下的所有服务信息 erviceManager.getInstance().getSingletons这个方法服务注册的时候里有
Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());
//构建返回信息
ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());
//服务信息不等于空填充返回信息
if (!serviceSet.isEmpty()) {
Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());
// TODO select service by selector
List<String> serviceNameList = ServiceUtil
.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);
result.setCount(serviceNameSet.size());
result.setServiceNames(serviceNameList);
}
return result;
}
订阅服务请求
SubscribeServiceRequestHandler
@Override
@Secured(action = ActionTypes.READ)
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
//命名空间
String namespaceId = request.getNamespace();
//服务名称
String serviceName = request.getServiceName();
//分组名称
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
//构建服务信息
Service service = Service.newService(namespaceId, groupName, serviceName, true);
//组装订阅请求
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),
namespaceId, groupedServiceName, 0, request.getClusters());
//获取健康的实例
ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), //服务元数据信息
metadataManager.getServiceMetadata(service).orElse(null), subscriber);
//是否订阅
if (request.isSubscribe()) {
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
}
//构建返回数据
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
发送订阅事件 后续事件监听可参考服务事件处理的那篇文章
@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
发送取消订阅事件 后续事件监听可参考服务事件处理的那篇文章
@Override
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
client.removeServiceSubscriber(singleton);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}
服务查询请求
ServiceQueryRequestHandler
@Override
@Secured(action = ActionTypes.READ)
public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
//获取命名空间
String namespaceId = request.getNamespace();
//分组明
String groupName = request.getGroupName();
//服务名
String serviceName = request.getServiceName();
//创建服务信息
Service service = Service.newService(namespaceId, groupName, serviceName);
//集群
String cluster = null == request.getCluster() ? "" : request.getCluster();
boolean healthyOnly = request.isHealthyOnly();
//获取服务信息
ServiceInfo result = serviceStorage.getData(service);
//获取服务元数据信息
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
// 获取有保护机制的健康实例
result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,
meta.getClientIp());
//构建返回信息
return QueryServiceResponse.buildSuccessResponse(result);
}
public ServiceInfo getData(Service service) {
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public Optional<ServiceMetadata> getServiceMetadata(Service service) {
return Optional.ofNullable(serviceMetadataMap.get(service));
}