文章目录
- 1.前言
- 2. 服务注册的基本流程
- 3. 服务注册的核心代码分析
- 3.1. `NacosNamingService`
- NamingProxy 服务端通信的核心类
- NamingClientProxy nacos 2.x 版本服务端通信核心接口
- 3.2 NamingGrpcClientProxy 详解
- RpcClient类
- RpcClient类核心方法 `start`
- 3.3 NamingHttpClientProxy详解
- 方法 callServer
- 4. 心跳机制
- nacos v1.x 版本
- Nacos 2.x 版本
1.前言
本章我们来聊聊nacos 服务注册流程和原理解析。在微服务架构中,服务之间需要相互通信,因此服务如何发现彼此变得非常重要。这就需要使用到服务注册中心,在 Spring Cloud Alibaba 中,Nacos 就充当了这个角色。
Nacos 提供了服务发现、服务配置和服务元数据等服务治理功能。作为服务注册中心,Nacos 可以实现服务的自动注册、发现和健康检查等功能。
本章, 将深入剖析 Nacos 的服务注册流程和实现原理 将从 Nacos 服务启动注册开始,逐步解析其后续的心跳维持、服务发现等过程,以及在出现故障时 Nacos 如何进行处理。
2. 服务注册的基本流程
Nacos服务注册流程大致如下:
服务提供者启动: 当服务提供者启动后,会向Nacos Server发送一个注册请求,请求中包含了服务的基本信息,例如服务名称,服务地址,服务端口等。
Nacos Server接收请求: Nacos Server接收到注册请求后,会对服务信息进行处理。处理包括存储服务信息,检查服务是否已存在等。
服务提供者心跳检测: 服务提供者在注册成功后,会定期向Nacos Server发送心跳,以证明自己还在运行。如果Nacos Server在一定时间内没有收到服务提供者的心跳,会认为该服务已经不可用,并将其从注册列表中移除。
服务消费者查询: 服务消费者需要调用某个服务时,会向Nacos Server查询该服务的信息。Nacos Server会返回存储的服务提供者的信息给服务消费者。
服务消费者调用: 服务消费者根据Nacos Server返回的服务信息,选择一个合适的服务提供者进行调用。
Nacos服务注册流程包括服务提供者的注册,心跳检测,以及服务消费者的查询和调用。这个过程通过Nacos Server的管理,实现了服务的高可用和负载均衡。
3. 服务注册的核心代码分析
3.1. NacosNamingService
该类实现了NamingService
接口,提供了服务注册、注销、查询等方法。
NacosNamingService是Nacos中负责服务注册与发现的核心组件, NacosNamingService的一些主要功能和相关的源码解析:
- 服务注册:
registerInstance(String serviceName, String groupName, Instance instance)
方法允许服务提供者将自己的实例注册到指定的服务名下。
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
Instance instanceForServer = new Instance();
instanceForServer.setIp(instance.getIp());
instanceForServer.setPort(instance.getPort());
instanceForServer.setWeight(instance.getWeight());
instanceForServer.setMetadata(instance.getMetadata());
instanceForServer.setClusterName(instance.getClusterName());
instanceForServer.setServiceName(serviceName);
instanceForServer.setEnabled(instance.isEnabled());
instanceForServer.setHealthy(instance.isHealthy());
instanceForServer.setEphemeral(instance.isEphemeral());
serverProxy.registerService(serverName, groupName, instanceForServer);
}
- 服务发现:
selectInstances(String serviceName, String groupName, boolean healthy)
方法允许服务消费者发现指定服务名下的所有实例。
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return serverProxy.queryInstancesOfService(serviceName, groupName, 0, healthy);
}
- 服务下线:
deregisterInstance(String serviceName, String groupName, Instance instance)
方法允许服务提供者将自己的实例从指定的服务名下注销。
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
serverProxy.deregisterService(serviceName, instance);
}
- 获取服务列表:
getServicesOfServer(int pageNo, int pageSize, String groupName)
方法允许获取指定分页和组名下的服务列表。
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException {
return serverProxy.getServiceList(pageNo, pageSize, groupName);
}
NamingProxy 服务端通信的核心类
所有这些方法的实现都依赖于serverProxy
对象,这个对象是NamingProxy
类的实例,它包含了与Nacos服务器进行RPC通信的逻辑。在NacosNamingService进行服务注册或发现的时候,实际上是通过NamingProxy将请求发送到Nacos服务器,然后接收并处理服务器的响应。
我之前在1.x版本中看源码的时候 与服务端通信的代理工具类命名为 NamingProxy serverProxy;
但是在新版本中已经变为NamingClientProxy clientProxy;
这个变化可能在大家阅读源码的时候会注意到,从表面看是命名好像变了,大部分方法的实现都是一样,其实底层是发生了很多变化。
在早期的版本中,Nacos使用NamingProxy ServerProxy这个对象来处理与Nacos服务器的交互。然而,这种方式在处理复杂交互逻辑时,例如服务实例的注册、注销和查询等操作,可能会变得复杂并且容易出错
。
Nacos的开发者引入了NamingClientProxy接口,来封装与Nacos服务器交互的逻辑。通过实现这个接口,可以创建多个代理,每个代理处理一种特定的交互方式,例如HTTP或者gRPC。这样做的好处是可以将交互逻辑的实现细节封装在代理之中,使得对外提供的API变得更加清晰和简单。
我们通过源码可以看到
NamingProxy
和NamingClientProxy
都是Nacos在不同时期设计的与服务端通信的对象,用于处理与Nacos服务器的交互。
-
设计上的差异:
NamingProxy
是一个具体的类,用于处理与Nacos服务器之间的HTTP交互。而NamingClientProxy
是一个接口,定义了所有与Nacos服务器交互的对象应该具备的方法。通过实现这个接口,可以创建多个代理来处理不同类型的交互,例如HTTP和gRPC。 -
使用上的差异:在早期版本的Nacos中,
NamingProxy
负责所有的与Nacos服务器的交互。在后续版本中,这一角色被NamingClientProxy
接口和它的实现类取代。这意味着,新的交互方式可以通过添加新的NamingClientProxy
实现来支持,而不需要修改NamingProxy
的代码,可以以多态的形式进行扩展,抛弃了最早单一的类实现。
这个变化使得Nacos的代码更加具有可维护性和扩展性。
此外,使用NamingClientProxy还有一个好处是,可以更容易的添加新的交互方式。只需要实现新的NamingClientProxy,就可以处理新的交互方式,而不需要修改现有的代码。
NamingClientProxy nacos 2.x 版本服务端通信核心接口
服务注册和发现默认使用gRPC协议进行通信, 使用NamingGrpcClientProxy
。
NamingClientProxy
有两个实现类
NamingGrpcClientProxy
和NamingHttpClientProxy
都是NamingClientProxy
接口的实现类,用于定义具体的和Nacos Server进行交互的方法。不同的是,他们使用的通信协议不同。
-
NamingGrpcClientProxy
:这个类使用gRPC协议与Nacos服务器进行交互。gRPC是一个高性能、开源和通用的RPC框架,设计初衷是实现微服务之间的通信。gRPC协议在大规模服务间的通信、低延迟的场景下可能会有更好的性能。在Nacos 2.2.0版本开始,服务注册和发现默认使用gRPC协议进行通信。 -
NamingHttpClientProxy
:这个类使用HTTP协议与Nacos服务器进行交互。HTTP协议是Internet上应用最为广泛的一种网络协议,更加通用。在某些情况下,例如网络环境限制只能使用HTTP协议时,可以选择NamingHttpClientProxy
。
这两个实现类均定义了服务注册、服务注销、服务发现、服务列表查询等方法,只是底层通信协议的差异。在Nacos的客户端创建时,会根据配置选择使用哪个实现类,然后通过这个代理类与Nacos Server进行交互。
根据一位同学的测试,发现 使用NamingHttpClientProxy 频繁注册和注销和心跳 会导致服务端的内存短时间暴增,大家注意避雷,能升版本就升版本。详细issue 可以参考高频心跳导致nacos server内存持续高位且心跳停止内存未回收 #11424
场景
使用nacos 2.2.3版本
注册100个服务 使用api 发送心跳 nacos server内存持续增长,且心跳停止以后,内存未回收
3.2 NamingGrpcClientProxy 详解
在新版本中,Nacos 默认使用 gRPC 作为通信协议替换了 HTTP。下面是几个主要原因:
- 性能:相比
HTTP/1.1
,gRPC 基于 HTTP/2
,支持多路复用
、请求优先级
、双向流
等特性,能够支持更高的并发连接,而且延迟更低。- 语言无关:
gRPC
支持多种语言,包括Java、C++、Python、Go、Node.js、Ruby
等,方便在不同语言的环境中使用。- 强类型:
gRPC
使用Protobuf
作为序列化工具,定义服务接口和消息类型,生成强类型的代码,避免手动处理JSON
。- 易于扩展:支持拦截器和自定义元数据,方便进行认证、负载均衡、日志记录等操作。
- 流控制:
gRPC
内置了流控制机制,可以有效地控制消息的发送速率,防止因流量过大而导致的服务故障。
因此,新版本的Nacos采用gRPC作为通信协议,可以提供更高的性能,更好的兼容性和扩展性,同时也可以带来更好的开发体验。
源码解析
requestToServer
方法是NamingGrpcClientProxy
类中的核心方法,负责将请求发送到Nacos服务器,并得到返回的响应。
requestToServer将请求的安全性相关的头信息添加到请求中,然后根据是否设置了请求超时时间,决定使用哪种方式发送请求。接着,检查服务器返回的响应是否成功,如果成功,则检查响应的类型是否和期望的响应类匹配,如果匹配,则返回响应,如果不匹配,则记录错误日志。在请求过程中如果发生异常。
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
// 添加安全相关的头信息到请求中
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 根据是否设置了请求超时时间,来决定使用哪种方式来发送请求
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
// 检查服务器返回的响应是否成功
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// 检查服务器返回的响应类型是否和期望的响应类匹配
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
// 如果不匹配,记录错误日志
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (NacosException e) {
throw e;
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
// 如果服务器返回的响应无效,抛出NacosException
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
getSecurityHeaders 主要是获取请求头信息,如果我们设置了账号密码鉴权,发送的所有请求都需要携带token。如果我们跟源码会发现。核心的是SecurityProxy
,主要用于处理Nacos客户端与服务端之间的安全认证相关的逻辑。
在Nacos中,为了保障系统的安全,通常会进行一些安全认证的操作,比如登录认证、权限认证等。这些安全认证的操作通常涉及到一些复杂的逻辑,比如密码的加密、Token的生成和验证等。
SecurityProxy
类就是用来封装这些安全认证相关的逻辑的。比如在Nacos客户端启动时,SecurityProxy
会负责进行登录操作,向Nacos服务端发送用户名和密码,获取登录后的Token;在发送请求到Nacos服务端时,SecurityProxy
会将Token添加到请求的Header中,用于服务端验证请求的合法性。
此外,SecurityProxy
还封装了一些其他的功能,比如定时刷新Token,获取当前的安全信息等。通过SecurityProxy
,可以使Nacos的其他模块不需要关心具体的安全认证逻辑,只需要通过SecurityProxy
提供的接口进行操作即可,从而降低了系统的复杂性。
public class SecurityProxy implements Closeable {
private ClientAuthPluginManager clientAuthPluginManager;
public SecurityProxy(List<String> serverList, NacosRestTemplate nacosRestTemplate) {
clientAuthPluginManager = new ClientAuthPluginManager();
clientAuthPluginManager.init(serverList, nacosRestTemplate);
}
public void login(Properties properties) {
if (clientAuthPluginManager.getAuthServiceSpiImplSet().isEmpty()) {
return;
}
for (ClientAuthService clientAuthService : clientAuthPluginManager.getAuthServiceSpiImplSet()) {
clientAuthService.login(properties);
}
}
public Map<String, String> getIdentityContext(RequestResource resource) {
Map<String, String> header = new HashMap<>(1);
for (ClientAuthService clientAuthService : clientAuthPluginManager.getAuthServiceSpiImplSet()) {
LoginIdentityContext loginIdentityContext = clientAuthService.getLoginIdentityContext(resource);
for (String key : loginIdentityContext.getAllKey()) {
header.put(key, loginIdentityContext.getParameter(key));
}
}
return header;
}
@Override
public void shutdown() throws NacosException {
clientAuthPluginManager.shutdown();
}
}
我们还看到一个核心类RpcClient
RpcClient类
RpcClient是远程客户端的抽象类,用于连接到服务器并发送请求。它具有启动、关闭、发送请求等功能,并提供了用于处理连接事件和重连的线程池。它还可以注册连接事件监听器和服务器请求处理器。
start()
: 启动客户端,并连接到服务器。 会创建一个ScheduledExecutorService,用于处理连接事件和重连事件。然后,尝试连接服务器,如果连接成功,则设置当前连接为新连接,并将客户端状态设置为RUNNING;如果连接失败,则调用switchServerAsync()
进行切换服务器。
shutdown()
: 关闭客户端。该方法将客户端状态设置为SHUTDOWN,并关闭客户端的事件执行器和当前连接。
isWaitInitiated()
: 检查客户端是否等待初始化。
isRunning()
: 检查客户端是否正在运行。
isShutdown()
: 检查客户端是否已关闭。
request()
: 发送同步请求到服务器并返回响应。
asyncRequest()
: 发送异步请求到服务器。
requestFuture()
: 发送异步请求到服务器,并返回一个RequestFuture对象,可以用于获取响应。
connectToServer()
: 连接到指定的服务器。
handleServerRequest()
: 处理从服务器接收到的请求。
registerConnectionListener()
: 注册连接事件监听器。
registerServerRequestHandler()
: 注册服务器请求处理器。
nextRpcServer()
: 获取下一个RPC服务器。
currentRpcServer()
: 获取当前RPC服务器。
resolveServerInfo()
: 解析服务器地址。
getConnectionType()
: 获取客户端的连接类型。
rpcPortOffset()
: 获取RPC端口的偏移量。
getLabels()
: 获取客户端的标签。
RpcClient类核心方法 start
详细代码略了,有兴趣可以直接看源码。此处只是简要说明一下流程和设计
rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
这段是使用CAS(Compare And Swap)操作来改变客户端的状态,从初始化状态到启动状态,这是一种保证线程安全的方式。- 创建线程池clientEventExecutor,用于处理客户端的任务。
- 提交一个任务到线程池,这个任务是一个无限循环,主要任务是监听事件队列,当接收到连接事件时,会根据连接的状态进行通知。
- 提交另一个任务到线程池,这个任务也是一个无限循环,主要任务是处理重连的上下文,如果没有重连上下文,那么就进行健康检查,如果检查失败,那么就设置客户端的状态为
UNHEALTHY
,并创建一个新的重连上下文。- 尝试与服务器建立连接,如果连接失败,那么就进行重试。如果所有的重试都失败,那么就异步切换服务器。如果连接成功,那么就设置当前的连接,并改变客户端状态为
RUNNING
,并通知事件队列已经连接。- 注册处理连接重置请求的处理器。
- 注册处理客户端检测请求的处理器。
start这个方法设计思想主要体现在两个方面
:
一是采用了状态机模式来管理RPC客户端的状态,通过CAS操作保证状态的线程安全性
;二是通过线程池和阻塞队列来处理事件和进行重连操作,以实现事件驱动,使代码逻辑更清晰,同时提高代码的可维护性和可扩展性
。
public final void start() throws NacosException {
// 使用CAS操作确保start()只执行一次
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 创建一个定时线程池,用于处理客户端事件和重连
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// 提交一个任务到线程池,这个任务一直监听事件队列,当发现连接事件时,会根据连接的状态进行相应的通知
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
}
}
});
// 提交另一个任务到线程池,这个任务主要进行客户端的重连
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
// 如果获取不到重连上下文,那么就进行健康检查,如果检查失败,那么就设置客户端的状态为UNHEALTHY,并创建一个新的重连上下文
// 如果健康检查通过,那么更新最后活跃时间戳
}
if (reconnectContext.serverInfo != null) {
// clear recommend server if server is not in server list.
// 如果获取到重连上下文,那么就进行重连。在重连前会检查推荐的服务器是否在服务器列表中,如果不在就忽略
}
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// 忽略异常
}
}
});
// 尝试连接到服务器,如果连接失败会尝试重试,所有重试失败后异步切换服务器,连接成功后改变客户端状态为RUNNING
// 偿试连接到服务器, 并设置启动状态
int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes > 0 && connectToServer == null) {
// 详细代码略了,有兴趣可以直接看源码。此处只是简要说明一下流程和设计读取下一个服务信息并尝试连接,如果失败就打印日志并重试
}
// 如果连接成功,设置当前连接为成功的连接,并改变状态为RUNNING,并通知事件队列已经连接
// 否则,异步切换服务器
if (connectToServer != null) {
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
switchServerAsync();
}
// 注册处理连接重置请求的处理器和客户端检测请求的处理器
registerServerRequestHandler(new ConnectResetRequestHandler());
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
3.3 NamingHttpClientProxy详解
NamingHttpClientProxy
是Nacos服务发现功能中的一个重要组件,主要负责和Nacos服务器的HTTP交互,包括注册服务、注销服务、查询服务和更新服务等操作。 但是由于其有很多弊端在 新版本中已经默认弃用。默认使用grpc。
NamingHttpClientProxy
使用 HTTP 协议进行通信,虽然 HTTP 是一种简单、通用的协议,但在一些场景下, 存在以下弊端:
- 性能问题:HTTP 基于文本,数据传输效率较低;而且每次请求都需要建立连接,对服务器的资源消耗大,当并发量大时,性能可能会下降。
- 长连接问题:HTTP 本身不支持长连接,需要通过一些手段(如 Keep-Alive、Websocket 等)来实现,但这样会增加复杂性。
- 缺乏实时性:HTTP 是一种请求-响应模型,客户端主动发送请求,服务器才会响应,不适合需要实时更新的场景。 对于需要高并发、低延迟、实时更新、强类型和元数据支持的场景,
NamingHttpClientProxy
已不是最佳选择。所以在新版本中,Nacos 使用 gRPC 替代了 HTTP,以解决这些问题。
下面是NamingHttpClientProxy
类的部分源码:
可以看到,NamingHttpClientProxy
主要使用了HTTP协议进行通信,通过发送不同的HTTP请求(例如GET、POST和DELETE)实现不同的操作。同时,它还负责将HTTP响应的JSON字符串解析为Java对象,方便后续的处理。
需要注意的是,这个类并不负责维护和Nacos服务器的连接,而只是发送HTTP请求。连接的维护由其他部分的代码负责。
public class NamingHttpClientProxy {
...
public void registerService(String serviceName, Instance instance) throws NacosException {
Map<String, String> params = ...; // 构建请求参数
// 向Nacos服务器发送注册服务的请求
reqAPI(UtilAndComs.namingServletPath + "/instance", headers, params, "POST");
}
...
public void deregisterService(String serviceName, Instance instance) throws NacosException {
Map<String, String> params = ...; // 构建请求参数
// 向Nacos服务器发送注销服务的请求
reqAPI(UtilAndComs.namingServletPath + "/instance", headers, params, "DELETE");
}
...
public ServiceInfo queryInstancesOfService(String serviceName, ...) throws NacosException {
Map<String, String> params = ...; // 构建请求参数
// 向Nacos服务器发送查询服务的请求
String result = reqAPI(UtilAndComs.namingServletPath + "/instance/list", headers, params);
// 解析返回的JSON字符串为ServiceInfo对象
return JacksonUtils.toObj(result, ServiceInfo.class);
}
...
}
方法 callServer
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis(); // 记录请求开始的时间
long end = 0;
String namespace = params.get(CommonParams.NAMESPACE_ID); // 从参数中获取命名空间
String group = params.get(CommonParams.GROUP_NAME); // 从参数中获取分组名
String serviceName = params.get(CommonParams.SERVICE_NAME); // 从参数中获取服务名
params.putAll(getSecurityHeaders(namespace, group, serviceName)); // 添加安全头
Header header = NamingHttpUtil.builderHeader();
// 构建请求 URL
String url;
if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
url = curServer + api;
} else {
if (!InternetAddressUtil.containsPort(curServer)) {
curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
// 发送 HTTP 请求,并获取响应结果
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
// 记录请求的指标
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
// 根据响应结果处理并返回结果
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw e;
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
4. 心跳机制
服务提供者通过定期发送心跳请求来告知注册中心自身的健康状态。注册中心根据心跳信息判断服务提供者的可用性,并及时更新注册表。
nacos v1.x 版本
使用http 请求发送心跳导致很多问题。具体源码在BeatReactor
在 Nacos 1.x 中,Nacos 客户端周期性地向 Nacos 服务器发送心跳,以保持服务实例的存活状态。这个心跳机制主要通过 com.alibaba.nacos.client.naming.beat.BeatReactor
类实现。
当 Nacos 客户端注册服务实例时,BeatReactor
会为该服务实例创建一个心跳任务,并将这个任务加入到定时任务队列中。这个任务会周期性地向 Nacos 服务器发送心跳。
具体的代码如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
SERVICE_BEAT_MAP.putIfAbsent(serviceName, new ConcurrentHashMap<>());
SERVICE_BEAT_MAP.get(serviceName).put(beatInfo.getIp() + "#" + beatInfo.getPort(), beatInfo);
beatInfo.setPeriod(beatInfo.getPeriod() > 0 ? beatInfo.getPeriod() : DEFAULT_HEART_BEAT_INTERVAL);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
}
BeatTask
是一个 Runnable
,它的 run
方法会发送心跳:
@Override
public void run() {
long nextTime = beatInfo.getPeriod();
try {
int code = namingProxy.sendBeat(beatInfo, false);
long interval = switchDomain.getBeatInterval();
if (code == HttpURLConnection.HTTP_OK) {
...
} else if (code == HttpURLConnection.HTTP_NOT_MODIFIED) {
...
} else if (code == HttpURLConnection.HTTP_FORBIDDEN) {
...
}
nextTime = interval;
beatInfo.setPeriod(nextTime);
} catch (Exception e) {
LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}", beatInfo, e);
} finally {
executorService.schedule(this, nextTime, TimeUnit.MILLISECONDS);
}
}
可以看到,在 BeatTask
的 run
方法中,首先会调用 namingProxy.sendBeat
方法发送心跳,然后根据发送结果决定下一次心跳的间隔时间,最后调度下一次的心跳任务。
/**
* @author harold
*/
public class BeatReactor {
private ScheduledExecutorService executorService;
private NamingProxy serverProxy;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public void removeBeatInfo(String serviceName, String ip, int port) {
NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);
BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
if (beatInfo == null) {
return;
}
beatInfo.setStopped(true);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
private String buildKey(String serviceName, String ip, int port) {
return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER
+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long result = serverProxy.sendBeat(beatInfo);
long nextTime = result > 0 ? result : beatInfo.getPeriod();
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
Nacos 2.x 版本
在 Nacos 2.x 版本中,心跳机制的实现发生了一些改变。在最新的 2.x 版本中,Nacos 客户端会与服务器建立 gRPC 长连接,通过这个连接,服务器可以实时感知到客户端的运行状态,无需客户端周期性地发送心跳。
这种新的设计方式减少了网络请求的数量,提高了系统的性能和稳定性。所以在 Nacos 2.x 的源码中可能找不到心跳相关的代码,因为不再需要客户端主动发送心跳。
并不意味着心跳机制在 Nacos 2.x 中完全消失。实际上,Nacos 服务器仍然会定期检测与每个客户端的 gRPC 长连接的状态,以确定客户端的存活状态。这种被动的“心跳”检测方式,与传统的心跳机制在效果上是一致的。
RpcClient 中的健康检查,是向服务端请求一个HealthCheckRequest对象空数据,也可以把此作为心跳类型标识。
实际的消息体
实际的请求体为
metadata {
type: "HealthCheckRequest"
clientIp: "172.24.36.36"
}
body {
value: "{\"headers\":{},\"module\":\"internal\"}"
}
·healthCheck 方法·
private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
if (this.currentConnection == null) {
return false;
}
int reTryTimes = rpcClientConfig.healthCheckRetryTimes();
while (reTryTimes >= 0) {
reTryTimes--;
try {
Response response = this.currentConnection
.request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut());
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
}
return false;
}
此处就设计的比较巧妙了,但是这样心跳的间隔目前被硬编码到代码里了。心跳间隔为 rpcClientConfig.connectionKeepAlive()
.硬编码在DefaultGrpcClientConfig
里面 默认为5000毫秒。