文章目录
- 服务注册
- NacosClient找看源码入口
- NacosClient服务注册源码
- NacosServer处理服务注册
服务注册
服务注册 在线流程图
NacosClient找看源码入口
我们启动一个微服务,引入nacos客户端的依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
并配置文件中指定nacosServer的地址,并运行该微服务,那么服务是如何注册进nacosServer中去的嘞?
server:
port: 9002
spring:
application:
name: stock-service
cloud:
nacos:
discovery:
# 指定nacos server的地址
server-addr: localhost:8848
# 指定集群名称
cluster-name: SZ
metadata:
version: v3
我们应该怎么找源码的入口嘞?
我们知道SpringCloud是基于SpringBoot的,我们这里引入了这个nacos客户端nacos-discovery的依赖,我们就直接去找这个jar包下的spring.factories文件
这个文件中记录了很多的自动配置类,如果不知道应该看哪一个的话,那么就优先看和我们导入maven依赖名字相近的类
进入到这个NacosDiscoveryAutoConfiguration
自动配置类后会发现这里会往容器中注入三个bean,看bean的名字好像都是和nacos注册相关的,按照经验来说,带有AutoXXXX
这类方法一般都比较重要,再加上第三个方法还用到了上面两个bean,所以核心方法很大概率就是这个第三个方法。
进入到该类的构造方法之后,我们一般可以查看这个类的继承与实现结构,这样可以让我们更充分的了解该类
从上图中可以发现,NacosAutoServiceRegistration
这个类还实现了ApplicationListener
接口,这个接口我们都知道是Spring发布事件相关的接口
// ApplicationListener接口
package org.springframework.context;
import java.util.EventListener;
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}
我们再找这个方法的具体实现,会发现NacosAutoServiceRegistration
这个类它没有重写该方法,那么就要在它的父类中找该方法的实现
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
// 方法调用,进入到bind(...)方法
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
// 有if、并且有返回、并且后面还有业务代码的一般情况下这都是一个分支代码,可以先跳过
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
// 一般情况下,start() init() begin()这种方法名命名的方法一般都是比较重要的代码
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
根据看源码的经验来说:
- 有if,并且有返回值的,并且后面还有业务代码的一般情况下这都是一个分支代码,可以先跳过
- 一般情况下,start() init() begin()这中方法名命名的方法一般都是比较重要的代码
接下来再进入到start()
方法中
public void start() {
// if + return + 后面有业务逻辑,大概率是一个分支代码 可以先跳过。这里实际上也只是记录了一个日志
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
// 核心方法,可以发现在调用这个方法之前和调用方法之后都发布了一个事件
// 正好我们要找的也就是注册相关的方法
// 因为我们就是从NacosDiscoveryAutoConfiguration自动配置类的NacosAutoServiceRegistration这个和注册相关的bean进来的
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
接下来在点进register()
方法,跳转几次之后就会进入到NamingService
这个关键接口的registerInstance(...)
方法中去
NacosClient服务注册源码
关键接口NamingService
,它的registerInstance(...)
方法就是服务注册。程序刚开始会进入到该接口的实现类NacosNamingService
的registerInstance(String serviceName, String groupName, Instance instance)
方法中
// 该方法 往NacosServer中注册一个微服务实例
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 调用该方法进行微服务注册,instance对象中保存着微服务的各种信息,比如ip、端口、访问权重、健康状态、是否上线等等
serverProxy.registerService(groupedServiceName, groupName, instance);
}
instance对象保存的内容如下图所示
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 把instance对象中的数据取出来,封装成一个HashMap
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 发送一个post的http请求发送给NacosServer,进行服务注册
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
再通过官方文档 / 接口详细文档就能发现这就是服务注册的接口
NacosServer处理服务注册
NacosClient向NacosServer发送了一个服务注册的post请求,url为/v1/ns/instance
NacosServer这个项目有很多很多的子工程,就拿下图所示,我应该如何去找嘞?
其实我们就可以双击shift键,直接搜索controller,一个一个文件夹找,或者是直接加上关键字,就比如我这里加上了instance这个关键字就找到了真正处理这个请求的controller,类名为InstanceController
还有一种方法就是,我们在NacosClient这边进行服务注册时会频繁的使用到NamingService
,那我就在NacosServer端找有没有和Naming相关的子工程,最终发现还真就找到了
接下来是源码,首先直接查看该controller的post接口
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 通过解析request请求参数,封装成instance对象
final Instance instance = parseInstance(request);
// 调用Service层方法进行服务实例注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
接下来进入到service层的registerInstance(...)
方法逻辑
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 这里会去创建一个service,如果连该service对应的命名空间都不存在的话就会利用DCL双重锁检测机制去创建一个map<serviceName,service>
// 再去添加service,添加命名空间对应的map<serviceName,service>中。命名空间对应的这个map是存在注册表serviceMap中的
// 这里创建的只是一个空服务,它里面还没有instance服务实例
// 在创建service过程中还会进行初始化,会添加一个延迟定时任务进行该服务下的所有实例的健康检查、修改服务健康状态、删除过时服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 这里再获取,就不应该为null了,如果为null也就该抛异常了
// getService()会先根据命名空间取出一个Map,再从这个Map中根据serviceName取出service。service中包含instance微服务实例
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 进行服务实例instance添加操作
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
在createEmptyService(...)
方法中,首先会去判断当前service是否存在,如果不存在则创建一个空的新的service,并且把这个service添加进namespaceId对应的Map中,如果在这个过程中,namespaceId命名空间不存在则使用DCL双重锁检测去创建一个map集合添加进注册表serviceMap中。接下来在对刚刚创建的service进行init()初始化操作,蛀牙就是开启一个延时的定时任务,定期对该服务中的实例进行健康检查,并修改不健康的状态或移除超时实例
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 先从服务注册表serviceMap中获取
Service service = getService(namespaceId, serviceName);
// 如果不存在就创建
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 这里采用了DCL双重锁检测机制进行的添加操作
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
private void putServiceAndInit(Service service) throws NacosException {
// 将service添加进对应的NamespaceId中
putService(service);
// 服务进行初始化,主要就是添加延时定时任务,对实例进行健康检查
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
// DCL双重检测机制
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
public void init() {
// NacosClient客户端的心跳检测任务
// 这里就会开启一个线程去执行任务,clientBeatCheckTask属性就是一个task,run()方法中会进行实例的健康检查,后面会详细介绍
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
接下来就进入到了addInstance(...)
添加实例的方法中了,这个方法中主要做的事就是将当前service的所有实例通过一个key存入一个Map中,然后保存在一个dataStore这个Map中,同时再把这个key和相关的操作类型存入一个阻塞队列中,至此NacosClient发送的请求就处理完成了。之后会有单独一个线程异步处理进行真正的服务注册。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据ephemeral的值来决定生成上面key,默认情况下NacosClient传递过来的都是true,一般微服务的实例都是临时实例,不是持久化实例
// 如果是持久化实例就没有下面的ephemeral这个字符串拼接
// key = 一些字符串常量 + “ephemeral” + namespaceId + “##” + serviceName
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 添加微服务的实例,并返回当前服务所有的实例
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// key主要就是命名空间+服务名,通过这个key添加进consistencyService中
consistencyService.put(key, instances);
}
}
// 上面的Instances实现了Record接口,这里其实也就是一个封装
public void put(String key, Record value) throws NacosException {
// mapConsistencyService(key)这里会根据是否是临时实例进而去调用不同实现类的put()方法,这里以临时实例举例
mapConsistencyService(key).put(key, value);
}
public void put(String key, Record value) throws NacosException {
// 将key和所有服务实例封装的Record对象封装成一个datum对象,并保存到一个map集合中。
// 同时还有一个key和DataOperation枚举操作类型添加进阻塞队列的操作。
// 后续肯定有一个线程从这个注释队列中取出数据,然后根据key把datum对象取出来
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 这里把key和datum存入了一个dataMap这个集合中
// 这个datum对象中保存着我当前服务所有的服务实例instances
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 将key和后面这个枚举参数封装成一个Pair对象,并添加进一个阻塞队列中
notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 把key和dataOperation操作枚举添加进一个阻塞队列中,真正进行服务注册的是在下面的run()方法中
tasks.offer(Pair.with(datumKey, action));
}
// 另一个线程异步进行处理,从阻塞队列中获取数据,真正进行服务注册操作
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
// 该线程会一直死循环,从阻塞队列中取数据
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
接下来就是详细查看这个handle(pair)
方法,查看另一个线程是如何进行服务注册的?
这里会从先判断操作类型,然后从handle()
方法调用到onChange(...)
方法;而在onChange(...)
方法中首先会对服务实例的权重进行处理,再调用到updateIPs()
方法;在updateIPs()
方法中会对service中的cluster做一些处理,最终就会调用到updateIps(...)
方法中,在这个方法中会通过CopyOnWrite思想真正更改保存instance实例的集合
// handle()方法调用到onChange(...)方法
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
// dataStore.get(datumKey)取出来的对象是Datum类型的对象,它其中保存在key+所有服务实例instances
// dataStore.get(datumKey).value 就是这个datumKey对应的所有服务实例instances
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
...
}
}
...
} catch (Throwable e) {
...
}
}
// 对服务实例的权重进行处理,再调用到updateIPs()方法,
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
// 对instance服务实例的特殊权重做处理
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
// 会对service中的cluster做一些处理,最终就会调用到updateIps(...)方法中
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
// 遍历所有集群
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 实例instance的默认集群名字
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
// 一个新的集群名处理
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 实例添加进对应的集群集合中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 微服务实例真正的注册进服务注册表的方法
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 服务实例改变之后,会发布一个ServiceChangeEvent事件
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
/**
* Update instance list.
* 下面的方法中有CopyOnWrite的实现思想,真正存储实例的集合是ephemeralInstances,
* 但是这里面基本上都是在围绕oldIpMap这个复制出来的副本集合进行相应的操作,最后拿最新的集合复制给ephemeralInstances
*
* @param ips instance list
* @param ephemeral whether these instances are ephemeral
*/
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 先保存一份现有的实例列表
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
// 各个服务实例的ip和端口不一样,所以这里生成的key也不一样
oldIpMap.put(ip.getDatumKey(), ip);
}
// 接下来的很多操作就是拿老的实例集合和新的实例集合做一些数据比对,如果是新产生的实例那么就进行添加操作,如果是已经存在了的实例则是修改操作
// 最后将最终的结果赋值给ephemeralInstances或persistentInstances
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
// 最终,微服务的实例会保存在下面这个集合中
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}