一、引言
上个章节我们简单学习和使用了下Nacos服务自动注册,本文就来分析下Nacos客户端自动注册服务是怎么实现的~
二、目录
目录
一、引言
三、Nacos 源码编译
1.1 拉取代码
1.2 运行起来
四、客户端使用版本选择
五、Nacos客户端项目启动为什么会自动注册服务?
六、this.serviceRegistry 属性怎么被赋值的 ?
七、Nacos 客户端通过什么方式注册服务 ?
八、Nacos 客户端怎么发送服务心跳 ?
总结:
三、Nacos 源码编译
1.1 拉取代码
git地址:GitHub - alibaba/nacos at 1.4.1
a.这里我们选择1.4.1版本的Nacos进行下载,后面也是根据这个版本来进行学习的
b.直接git拉取或者下载zip的压缩包都可以
1.2 运行起来
a.拉下来的项目结构大概就是这个样子
b.我们试着运行起来~
Nacos的启动类在console模块当中,这里我们要给启动类配置成单机启动,方便我们学习测试·
/**
* Nacos starter.
*
* @author nacos
*/
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}
单机启动命令:
-Dnacos.standalone=true
配置启动类:
成功运行:
1.3 测试一下
a.注册地址的ip改为本地的,然后尝试启动起来,看看有没有成功注册到本地Nacos当中~
b.可以看到,order-service服务已经注册进去了
四、客户端使用版本选择
Nacos 服务端使用什么版本,其他组件也有对应使用版本要求。在实际项目使用过程中,遇到版本不一致这种问题太搞事情了,所以一开始在组件选型的时候,我们得先了解一下版本的选择。如下图表:
这里我们 Nacos 使用的版本是 1.4.1,那么对应Spring Cloud Alibaba的版本至少是2.2.4 Release,如下:
<properties>
<java.version>1.8</java.version>
<spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version>
</properties>
五、Nacos客户端项目启动为什么会自动注册服务?
首先我们查看源码前先确定我们的主线任务是:Nacos客户端项目启动为什么会自动注册服务
我们知道项目是引入了spring-cloud-starter-alibaba-nacos-discovery依赖,项目才有了注册服务的功能,那我们就从spring-cloud-starter-alibaba-nacos-discovery依赖的jar包分析,可以看到依赖中有个spring.factories文件,里面都是一些Configuration类。
说明:spring.factories 这个文件是在 Spring Boot 启动时,会扫描到这个文件,从而去创建里面的配置类。
我们看spring.factories文件里都是一些nacos 客户端相关的注册配置类,具体哪个是注册的类,这个时候我们只能去靠猜,然后去验证。NacosServiceRegistryAutoConfiguration 这个起的名字看着就很像,service服务,registry是注册的意思,auto自动,那连起来不就是Nacos 自动注册服务类。当然呀,这也只是我的猜测,接着跟我往下看就知道我猜的对不对了。
NacosServiceRegistryAutoConfiguration 类创建了三个bean对象。
- NacosServiceRegistry
- NacosRegistration
- NacosAutoServiceRegistration
我们先来看第一个bean对象,NacosServiceRegistry:
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
我们先来看一下这个 nacosServiceRegistry 方法的入参,NacosDiscoveryProperties,这一看不就是注册服务的yml配置类。
说明:ConfigurationProperties 映射配置文件中相对应的属性
我们再来看一下 NacosServiceRegistry 类的内容,可以看到有个register 方法,起的名字和内容看着是有点像 真正的注册方法 的。这个register 方法内容大概看了一下,获取一些参数,比如:namingService、serviceId、group、instance,最后调用了namingService.registerInstance()方法。看到这我们就不用往下细看了,别忘了我们这次的主线任务:Nacos客户端为什么启动项目能发起自动注册。
紧接着我们看第二个bean对象,NacosRegistration不正是第刚才register方法的入参嘛。看这个方法的名字感觉跟我们这次主线任务关系不大,直接跳过。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
然后我们看第三个bean对象,NacosAutoServiceRegistration,这个名字就很像Nacos自动注册服务了。我们看一下这个方法的入参包含前两个bean的对象,NacosServiceRegistry和NacosRegistration,这样的话一看这个方法就很重要了。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
我们再来看看方法的内容,大概就是通过构造方法往父类传值,这个我们下一小节再说。接着再往下面看又有一个 register(注册)的方法。
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
@Override
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register();
}
看这个方法的内容不像是真正执行注册的方法,我们看下这个 register 方法哪里调用了,一共两处,第二处是在本方法内调用的父类方法,所以直接看第一处调用。
start这个方法名就感觉像是找到关键注册方法的点了,在看下这个方法哪里调用了。
start方法第一处调用,是一个restart重启方法调用的,很明显不是。
start方法第二处调用,是 bind 方法调用的,bind方法又被onApplicationEvent方法调用的。
看到这里,大概就知道了。很明显是利用 Spirng 监听的事件,在Spring 容器启动的最后 执行 finishRefresh 方法,然后会发布一个事件。这样一说,估计有很多小伙伴不明白,下面我写了个案例演示一下:
/**
* @Author WangYan
* @Date 2024/3/22 09:57
* @Version 1.0
* 在 Spring 容器启动的时候,就会发布事件,这里就可以监听到,从而执行我们的代码
*
*/
@Data
@Component
public class Test01 implements ApplicationListener<WebServerInitializedEvent> {
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
System.out.println("监听事件开始执行~");
}
}
我在上面自定义了一个监听器代码,在Spring 容器启动的时候发布事件,这里就会监听到,从而执行我们这里的代码。
那我们的主线任务:Nacos客户端项目启动为什么会自动注册服务 ?
看到这就很明了了,就是利用了 Spring 事件监听,在监听中执行了最终执行了 register 方法,进行服务注册的。
六、this.serviceRegistry 属性怎么被赋值的 ?
首先Spring 监听事件启动,先去调用 bind 方法 ---> start 方法 ---> register 方法。最终在 register 方法当中执行了第一个bean “NacosServiceRegistry”类中对应的注册方法。
那我们来分析一下 start 方法当中register 方法里面的内容,是直接调用了serviceRegistry的注册方法,那这个属性是在什么时候进行赋值的呢 ?
protected void register() {
this.serviceRegistry.register(getRegistration());
}
我们在创建第三个bean对象的时候看看需要哪些参数:
- 参数一:NacosServiceRegistry 对应创建第一个bean对象
- 参数二:AutoServiceRegistrationProperties
- 参数三:NacosRegistration 对应创建第二个bean对象
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
就是第一个bean对象传进来就是serviceRegistry属性,通过父类的构造方法进行传值,最后再给ServiceRegistry serviceRegistry; 进行赋值
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
// 调用父类的构造方法进行传值
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
// 父类构造方法
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
AutoServiceRegistrationProperties properties) {
// 给ServiceRegistry<R> serviceRegistry; 进行赋值
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
是不是看看得有点懵逼,画个图理一理就清晰了:
整个流程图小节:
- 通过spring.factories文件,创建相关配置类,NacosServiceRegistryAutoConfiguration类里面定义了三个bean对象
- 创建第三个bean对象时,需要第一个和第二个bean对象,作为参数传入。第一个bean对象当中就包含了真正的注册方法。并且赋值给了第三个bean对象父类中的this.serviceRegistry属性。
- 第三个bean对象的父类实现了监听方法。当Spring 容器启动的时候,发布事件,这个时候就会监听到,从而执行 注册服务的逻辑。
七、Nacos 客户端通过什么方式注册服务 ?
回顾前两张小节:Spring容器启动的时候,监听事件就会去执行 register 方法,那 register方法是怎么实现的呢 ?
本节主线任务:Nacos 客户端底层通过什么方式注册服务的 ? Http ?还是Sofa ?接下来我们一起探索下
那我们就要回顾下创建第一个bean对象,”NacosServiceRegistry“,里面就包含了 register 注册方法。给大家 Debug 看下,证明我们没有分析错误,很明显,最后注册的逻辑走到这个里面来了。
接下来我们看下这个方法的大概内容:
@Override
public void register(Registration registration) {
// serviceId入参校验
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 获取NamingService
NamingService namingService = namingService();
// 服务id
String serviceId = registration.getServiceId();
// 分组名称
String group = nacosDiscoveryProperties.getGroup();
// 这个Instance里面包含了ip和port,比较重要的点
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 上面的话都是分支代码,跟我们本次主线任务无关,可以先了解下就行,后面再细看
// serviceId, group, instance都传进了namingService.registerInstance()这方法,很明显
// namingService.registerInstance() 就是本次的主线任务主要方法
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
接下来调用了这个方法NamingService类中 registerInstance接口,并且只有NacosNamingService一个实现类。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// instance参数校验
NamingUtils.checkInstanceIsLegal(instance);
// serviceName和groupName拼装成指定的字符串,比如:groupName@@serviceName
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断为 ephemeral 为true才执行,默认就为 true
if (instance.isEphemeral()) {
// 构建心跳信息,拼装参数
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 上面都是分支代码,有个印象就行,跟本次主线任务无关
// 很明显 serverProxy.registerService() 这个方法里面有注册服务的逻辑
serverProxy.registerService(groupedServiceName, groupName, instance);
}
紧接着我们来到了第三个调用 serverProxy.registerService() ,分析下方法内容:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
// 拼装Map组成参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
// ip、port就是instance里传过来的
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 请求地址、参数、请求方式 ,这里很明显就是HTPP的请求方式了
// UtilAndComs.nacosUrlInstance 拼装结果: /nacos/v1/ns/instance
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
当然上面也只是我们的猜测,接下来我们验证下对不对:
官方 Open API 文档截图如下:
可以看到注册实例的请求类型和请求路径,跟我们上面的分析出来的方法一样。就是HTTP请求
看到这本节的主线任务完成了:
- 主线任务:Nacos 客户端底层通过什么方式注册服务的 ?
- 结果:HTTP的请求方式
八、Nacos 客户端怎么发送服务心跳 ?
主线任务:Nacos 客户端怎么发送服务心跳?
现在我们来看下服务心跳的代码块,总块两个部分,一个构建心跳信息,一个去添加发送心跳
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断为 ephemeral 为true才执行,默认就为 true
if (instance.isEphemeral()) {
// 构建心跳信息,拼装参数
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 主线任务:添加心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
第一部分:我们先来看下构建心跳信息的代码,很简单主要是拼装BeatInfo对象参数。
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
// 拼装BeatInfo对象参数
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
// 这个还是有点重要的,period 字段:发送健康检查的间隔秒数
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
这里来重点说明一下period这个属性的取值方法:getInstanceHeartBeatInterval()
public long getInstanceHeartBeatInterval() {
// 先去获取 PreservedMetadataKeys.HEART_BEAT_INTERVAL 常量的值,为空的话默认返回 5000
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {
if (getMetadata() == null || getMetadata().isEmpty()) {
return defaultValue;
}
final String value = getMetadata().get(key);
if (!StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {
return Long.parseLong(value);
}
return defaultValue;
}
第二部分:
构建心跳信息的过完了,接着看下addBeatInfo() 发送健康心跳检查的代码内容。这里executorService是个线程池执行的任务,那么逻辑一定在run方法当中。
注意:executorService线程池上面代码设置成了守护线程,只要不退出当前jvm,用于发送心跳检查的线程会一直执行
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置成守护线程
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 上面都是分支代码,主线任务:executorService.schedule() 发送健康心跳检查
executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
接下来我们看下BeatReactor类的代码:
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
// 判断是否停止
if (beatInfo.isStopped()) {
return;
}
// 获取下一次执行的时间,同样还是5s
long nextTime = beatInfo.getPeriod();
try {
// 主线任务:发送健康心跳检查
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
// 获取服务端返回的code状态码
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 如果code = RESOURCE_NOT_FOUND 没有找到
// 很可能之前注册的信息,已经被 Nacos 服务端移除了,所以返回这个错误信息
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
// 重新拼装参数,发起服务注册
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
// 重新注册
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
// 循环执行发送健康心跳检查
executorService.schedule(new BeatReactor.BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
接下来我们看下具体发送实例心跳的代码,serverProxy.sendBeat() 方法。
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
// 拼装params参数
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 通过Http请求发送实例心跳
// 通过 /nacos/v1/ns/instance/beat 地址发送 put 请求
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
在官方文档上看下跟我们分析的代码是否一致,请求地址的、请求类型还有参数看样子都是能对的上的
官方地址:Open API 指南
本节小节:
- 主线任务:Nacos 客户端怎么发送服务心跳 ?
- 结果:Nacos客户端发起服务注册的时候,会执行一个 心跳健康检查的延时任务 ,这个任务每5秒会去发送一次心跳,告诉Nacos服务端我还活着,当前服务还是可用状态。
看完源码,把Nacos客户端流程图补充下:
总结:
本文主要围绕Nacos客户端发起自动注册的流程、自动注册的方式、发送服务心跳三个方面来分析源码的。