目录
1. 健康检查介绍
2. 客户端健康检查
2.1 临时实例的健康检查
2.2 永久实例的健康检查
3. 服务端健康检查
3.1 临时实例的健康检查
3.2 永久实例服务端健康检查
1. 健康检查介绍
当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务实例信息,就可以调用使用了。
然而服务提供者如果此时挂掉了,此时其他服务拿到信息后就会调用不通,所以Nacos中的服务信息应该有一个更新机制(即删除掉挂掉的服务)
那么服务注册信息应该如何维护呢,那就是判断某个服务实例是否有问题,如果检测到服务实例出现问题了就将他剔除掉。
那么如何判断 服务实例 是否有问题呢?这就是健康检查要做的事情,即检查服务实例的健康状态。不健康则剔除下线。
下面看看客户端和服务端为了实现健康检查功能都各自做了哪些事情。
2. 客户端健康检查
2.1 临时实例的健康检查
从 Nacos 2.x 开始,临时实例的服务注册由原来的 HTTP 更换为了 GRPC 长连接方式。Nacos Client 和 Nacos Server 之间建立的 RPC 长连接,服务注册、服务取消注册等接口都是通过 GRPC 消息与服务端通信的。
GRPC 长连接是一直存在的,只要连接一直存在就代表Nacos Client 和 Nacos Server 之间的连接是通的,Nacos Client 则一直在线。如果Nacos Client 由于网络问题等其他问题挂掉了,那么这条长连接也会断开连接。
那么服务实例如何算健康呢,就是长连接一直存在没有断那就算健康的。
如果连接断掉了,那么该客户端上注册的全部服务实例都是不健康的了。
GRPC 长连接如果想一直保证连接状态,就需要定时发送心跳包,以确保连接处于活动的状态。否则一段时间不操作的话就会自动断开连接。
接下来看看 NacosClient 如何开启 RPC 长连接的
NacosClient 操作注册中心的 API 是通过 NamingService 进行的
在 NacosNamingService 的构造器中调用了 init 初始化方法:init 方法最后
进入最后一行代码:
再看最后 new NamingGrpcClientProxy 的源码:
public class NamingGrpcClientProxy {
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) {
// 省略部分代码
// 创建 RPC Client
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
// 启动 RPC Client
start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 启动
rpcClient.start();
}
看看 RpcClient.start 做了什么
public abstract class RpcClient {
protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
public final void start() {
// 省略部分代码
// connection event consumer.
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
}
});
Connection connectToServer = null;
// 状态设置为启动中
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes > 0 && connectToServer == null) {
startUpRetryTimes--;
ServerInfo serverInfo = nextRpcServer();
// 建立连接
connectToServer = connectToServer(serverInfo);
}
this.currentConnection = connectToServer;
// 状态设置为 运行中
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
// 省略部分代码
}
}
省略了一些代码,这次只关注核心的两个点
1. 找到下一个 RPC Server 建立连接
因为 Nacos 支持集群部署,此时的 RPC Server List 其实就是这些集群节点。也就是找到集群下一个节点建议连接,如果连接失败就走到下一轮循环再获取到下一个节点继续连接(再重试次数内循环)
2. eventLinkedBlockingQueue 队列中加入一项
eventLinkedBlockingQueue 队列里存的是 ConnectionEvent,ConnectionEvent 代表一个连接事件,事件有 已连接事件、断开连接事件。
public class ConnectionEvent {
public static final int CONNECTED = 1;
public static final int DISCONNECTED = 0;
int eventType;
public ConnectionEvent(int eventType) {
this.eventType = eventType;
}
public boolean isConnected() {
return eventType == CONNECTED;
}
public boolean isDisConnected() {
return eventType == DISCONNECTED;
}
}
可见上面的源码,连接建立成功后,就会往队列压入一个 已连接事件 CONNECTED
队列事件的消费者在哪里呢?
便是在 start 方法的最开头定义的,while 循环不断从队列中获取到数据然后根据事件类型,进行各自的通知。
public final void start() {
// 省略部分代码
// connection event consumer.
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
// 通知连接
notifyConnected();
} else if (take.isDisConnected()) {
// 通知断开连接
notifyDisConnected();
}
}
});
}
notifyConnected 和 notifyDisConnected 实现差不多,所以这里只看一个的实现源码。
protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
protected void notifyConnected() {
// 省略部分源码
if (connectionEventListeners.isEmpty()) {
return;
}
// 循环全部监听器 一个个回调
for (ConnectionEventListener connectionEventListener : connectionEventListeners) { connectionEventListener.onConnected();
}
}
connectionEventListeners 里的数据是什么时候加入的呢?
那就是在 NamingGrpcClientProxy.start 方法
public class NamingGrpcRedoService implements ConnectionEventListener {
private volatile boolean connected = false;
@Override
public void onConnected() {
// 建立连接,改变连接状态
connected = true;
}
@Override
public void onDisConnect() {
connected = false;
// 将 redoService 上的全部缓存数据一改
synchronized (registeredInstances) {
registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
}
synchronized (subscribes) {
subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
}
}
当GRPC 长连接断开后就会进入 onDisConnect 事件回调中,这里改变了setRegistered 状态
上篇说过,redoService的作用 Nacos 注册中心 - 服务注册源码
此时会走入服务卸载流程。
2.2 永久实例的健康检查
永久实例客户端只负责提交一个请求即完成了全部操作。
健康检查工作由 服务端做。
3. 服务端健康检查
在收到客户端建立连接事件回调后,会调用 init 方法
public class IpPortBasedClient extends AbstractClient {
public void init() {
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
}
如果当前是临时实例:使用 ClientBeatCheckTaskV2 处理健康检查
如果当前是永久实例:使用 HealthCheckTaskV2处理健康检查
然后将任务放到线程池中执行定时执行
3.1 临时实例的健康检查
看看 ClientBeatCheckTaskV2 如何实现:
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
// 省略部分代码
private final IpPortBasedClient client;
private final InstanceBeatCheckTaskInterceptorChain interceptorChain;
// 执行健康检查
@Override
public void doHealthCheck() {
// 拿到当前客户端上注册的全部服务
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
.getInstancePublishInfo(each);
// 将全部服务用拦截器链一个个执行
interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
}
}
@Override
public void run() {
doHealthCheck();
}
}
看看拦截器链如何实现
这里的拦截器链是一个典型的责任链模式
public abstract class AbstractNamingInterceptorChain<T extends Interceptable>
implements NacosNamingInterceptorChain<T> {
@Override
public void doInterceptor(T object) {
for (NacosNamingInterceptor<T> each : interceptors) {
if (!each.isInterceptType(object.getClass())) {
continue;
}
// 当前是责任节点,直接由该责任节点处理
if (each.intercept(object)) {
object.afterIntercept();
// 不往后执行了
return;
}
}
// 如果没有责任节点执行,就调用 passIntercept
object.passIntercept();
}
}
首先第一个问题:拦截器都是些什么呢?
可见具体的有三个实现类
这三个类代表者三个地方的判断,判断是否开启了 健康心跳检查功能? 如果没开,那就被拦截了呀,就走不到后面的心跳检查代码了
ServiceEnableBeatCheckInterceptor
从 Service 的元数据上判断
public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
// 获取当前 Service 的元数据
Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());
// 如果元数据存在,并且其数据 enableClientBeat 配置了
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
// 直接取 enableClientBeat 值
return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));
}
return false;
}
}
InstanceBeatCheckResponsibleInterceptor
并不是一个客户端要负责集群中全部节点的心跳处理的,而是只负责自己注册的。
public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
// 是否是当前责任节点
return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());
}
}
InstanceEnableBeatCheckInterceptor
这个就是实例级别的健康检查判断
public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();
// 获取到实例上的元数据
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());
// 从元数据上取
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
// 元数据存在取该值
return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
}
// 从 extendDatum 中取数据
if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
}
return false;
}
}
如果都没被上面三个拦截器拦截掉,那就代表 当前实例是 开启了 健康检查,所以后面就要开始进行 检查操作 了
检查操作由 object.passIntercept(); 做
object.passIntercept(); 是什么呢?
就是刚才的 开始拦截方法最后一行
@Override
public void doInterceptor(T object) {
for (NacosNamingInterceptor<T> each : interceptors) {
if (!each.isInterceptType(object.getClass())) {
continue;
}
// 拦截器是否拦截?
if (each.intercept(object)) {
object.afterIntercept();
// 拦截了,直接返回
return;
}
}
// 未被拦截到
object.passIntercept();
}
object 是什么?
就是之前传过来的 InstanceBeatCheckTask
接下来看看 InstanceBeatCheckTask
public class InstanceBeatCheckTask implements Interceptable {
// 全部检查项目
private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
private final IpPortBasedClient client;
private final Service service;
private final HealthCheckInstancePublishInfo instancePublishInfo;
static {
// 添加检查项目
CHECKERS.add(new UnhealthyInstanceChecker());
CHECKERS.add(new ExpiredInstanceChecker());
// SPI 机制添加
CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
}
@Override
public void passIntercept() {
// 遍历全部检查项目
for (InstanceBeatChecker each : CHECKERS) {
// 开始检查
each.doCheck(client, service, instancePublishInfo);
}
}
}
全部检查项目 都是什么呢?
下面分别介绍
UnhealthyInstanceChecker
不健康实例检查器
public class UnhealthyInstanceChecker implements InstanceBeatChecker {
// 开始做检查
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
if (instance.isHealthy() && isUnhealthy(service, instance)) {
// 当前实例不健康了 -> 改变健康状态为 不健康
changeHealthyStatus(client, service, instance);
}
}
// 判断实例是否健康
private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
// 获取超时时间 默认 15 秒;可通过配置更改。
long beatTimeout = getTimeout(service, instance);
// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点不健康了
return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
}
// 改变健康状态
private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
instance.setHealthy(false);
// 省略部分代码
}
}
ExpiredInstanceChecker
过期实例检查器
public class ExpiredInstanceChecker implements InstanceBeatChecker {
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
if (expireInstance && isExpireInstance(service, instance)) {
// 如果实例过期了,则直接剔除实例
deleteIp(client, service, instance);
}
}
private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
// 获取超时时间 默认 30 秒;可通过配置更改。
long deleteTimeout = getTimeout(service, instance);
// 当前时间距离上一次发送心跳包时间 超过了 规定的超时时间 则返回 true,代表节点过期了,需要进行节点剔除操作
return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
}
/**
* 服务直接剔除掉
*/
private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
client.removeServiceInstance(service);
// 客户端下线
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
// 元数据改变
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
// 注销实例
NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),
service.getName(), instance.getIp(), instance.getPort()));
}
3.2 永久实例服务端健康检查
永久实例的健康检查是服务端主动探测方式,服务端定时外部请求客户端来看是否健康。
public class IpPortBasedClient extends AbstractClient {
public void init() {
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
}
入口类是 HealthCheckTaskV2
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {
// 省略部分代码
private final IpPortBasedClient client;
@Override
public void doHealthCheck() {
// 获取到当前客户端上注册的全部节点
for (Service each : client.getAllPublishedService()) {
// 如果开启了健康检查
if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
// 拿到实例注册信息
InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
// 拿到集群元数据
ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
// 调用 HealthCheckProcessorV2Delegate.process()
ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
}
}
}
@Override
public void run() {
doHealthCheck();
}
}
最终调用了 HealthCheckProcessorV2Delegate.process 方法
看看如何实现
HealthCheckProcessorV2Delegate 这个类就是一个委托类
public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
// 类型,健康检查实现类
private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
@Autowired
public void addProcessor(Collection<HealthCheckProcessorV2> processors) {
healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null)
.collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));
}
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 从元数据中获取到当前的健康检查类型 (HTTP、MySQL、TCP、None)
String type = metadata.getHealthyCheckType();
// 根据类型找到具体的 健康检查类
HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
if (processor == null) {
// 找不到 就使用 None 健康检查
processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
}
// 开始进行健康检查
processor.process(task, service, metadata);
}
}
健康检查有如下几类,还可通过 SPI 方式扩展
下面一个一个介绍
NoneHealthCheckProcessor
None 代表不做健康检查,所以这个类的Process 为空实现
public class NoneHealthCheckProcessor implements HealthCheckProcessorV2 {
public static final String TYPE = HealthCheckType.NONE.name();
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
}
@Override
public String getType() {
return TYPE;
}
}
TcpHealthCheckProcessor
TCP 健康检查,用于通过 TCP 方式检查是否健康,本质上是通过建立 Socket 连接,发送 Socket 信息实现
public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 省略
}
// 省略部分代码
private class TaskProcessor implements Callable<Void> {
@Override
public Void call() {
// 发送 Socket 请求
SocketChannel channel = null;
HealthCheckInstancePublishInfo instance = beat.getInstance();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.finishCheck();
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
ClusterMetadata cluster = beat.getMetadata();
int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();
channel.connect(new InetSocketAddress(instance.getIp(), port));
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));
beat.setStartTime(System.currentTimeMillis());
GlobalExecutor
.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return null;
}
}
}
MysqlHealthCheckProcessor
本质是 发送一个 sql。(sql从配置中获取),整个过程没报异常就算健康
public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {
public static final String TYPE = HealthCheckType.MYSQL.name();
private final HealthCheckCommonV2 healthCheckCommon;
private final SwitchDomain switchDomain;
public static final int CONNECT_TIMEOUT_MS = 500;
private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
private static final String MYSQL_SLAVE_READONLY = "ON";
private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();
public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
this.healthCheckCommon = healthCheckCommon;
this.switchDomain = switchDomain;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 省略
}
private class MysqlCheckTask implements Runnable {
@Override
public void run() {
Statement statement = null;
ResultSet resultSet = null;
String clusterName = instance.getCluster();
String key =
service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance
.getPort();
Connection connection = CONNECTION_POOL.get(key);
Mysql config = (Mysql) metadata.getHealthChecker();
if (connection == null || connection.isClosed()) {
String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="
+ CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;
connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());
CONNECTION_POOL.put(key, connection);
}
statement = connection.createStatement();
statement.setQueryTimeout(1);
resultSet = statement.executeQuery(config.getCmd());
int resultColumnIndex = 2;
if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
resultSet.next();
if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
throw new IllegalStateException("current node is slave!");
}
}
healthCheckCommon.checkOk(task, service, "mysql:+ok");
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getMysqlHealthParams());
}
}
HttpHealthCheckProcessor
本质是发送一个 HTTP 请求,返回状态码 200 就算健康
public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {
public static final String TYPE = HealthCheckType.HTTP.name();
private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager
.getProcessorNacosAsyncRestTemplate();
private final HealthCheckCommonV2 healthCheckCommon;
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
.getInstancePublishInfo(service);
if (null == instance) {
return;
}
try {
if (!instance.tryStartCheck()) {
SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",
service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
return;
}
Http healthChecker = (Http) metadata.getHealthChecker();
int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();
URL host = new URL(HTTP_PREFIX + instance.getIp() + ":" + ckPort);
URL target = new URL(host, healthChecker.getPath());
Map<String, String> customHeaders = healthChecker.getCustomHeaders();
Header header = Header.newInstance();
header.addAll(customHeaders);
// 发送 HTTP 请求
ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
new HttpHealthCheckCallback(instance, task, service));
MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
} catch (Throwable e) {
instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());
healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public String getType() {
return TYPE;
}
private class HttpHealthCheckCallback implements Callback<String> {
private final HealthCheckTaskV2 task;
private final Service service;
private final HealthCheckInstancePublishInfo instance;
private long startTime = System.currentTimeMillis();
public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,
Service service) {
this.instance = instance;
this.task = task;
this.service = service;
}
@Override
public void onReceive(RestResult<String> result) {
instance.setCheckRt(System.currentTimeMillis() - startTime);
int httpCode = result.getCode();
if (HttpURLConnection.HTTP_OK == httpCode) {
healthCheckCommon.checkOk(task, service, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getHttpHealthParams());
} else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
|| HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
// server is busy, need verification later
healthCheckCommon.checkFail(task, service, "http:" + httpCode);
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
} else {
//probably means the state files has been removed by administrator
healthCheckCommon.checkFailNow(task, service, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public void onError(Throwable throwable) {
Throwable cause = throwable;
instance.setCheckRt(System.currentTimeMillis() - startTime);
int maxStackDepth = 50;
for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
if (HttpUtils.isTimeoutException(cause)) {
healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getHttpHealthParams());
return;
}
cause = cause.getCause();
}
if (throwable instanceof ConnectException) {
healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());
} else {
healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());
}
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
@Override
public void onCancel() {
}
}
}
可见,不同方式的健康检查差异还是挺大的,那么如果将检查结果告知 Nacos 呢,那就是调用
// 健康检查结果:健康
healthCheckCommon.checkOk(task, service, "");
// 健康检查结果:不健康
healthCheckCommon.checkFail(task, service, "");
健康检查成功
检查成功主要做的事情就是 重置失败次数、结束检查
public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {
try {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
.getInstancePublishInfo(service);
if (instance == null) {
// 实例不存在,不做处理
return;
}
if (!instance.isHealthy()) {
// 如果实例不健康的,将状态改为 健康
// 代码省略
}
} finally {
// 重置失败次数
instance.resetFailCount();
// 结束检查
instance.finishCheck();
}
}
健康检查失
public void checkFail(HealthCheckTaskV2 task, Service service, String msg) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
.getInstancePublishInfo(service);
if (instance == null) {
return;
}
try {
if (instance.isHealthy()) {
// 如果实例是健康的,将状态改为 不健康
// 代码省略
}
} finally {
// 重置健康次数
instance.resetOkCount();
// 结束检查
instance.finishCheck();
}
}