Nacos 注册中心 - 健康检查机制源码

目录

1. 健康检查介绍

2. 客户端健康检查

2.1 临时实例的健康检查

2.2 永久实例的健康检查

3. 服务端健康检查

3.1 临时实例的健康检查

3.2 永久实例服务端健康检查


1. 健康检查介绍

当一个服务实例注册到 Nacos 中后,其他服务就可以从 Nacos 中查询出该服务实例信息,就可以调用使用了。

然而服务提供者如果此时挂掉了,此时其他服务拿到信息后就会调用不通,所以Nacos中的服务信息应该有一个更新机制(即删除掉挂掉的服务)

那么服务注册信息应该如何维护呢,那就是判断某个服务实例是否有问题,如果检测到服务实例出现问题了就将他剔除掉。

那么如何判断 服务实例 是否有问题呢?这就是健康检查要做的事情,即检查服务实例的健康状态。不健康则剔除下线。

下面看看客户端和服务端为了实现健康检查功能都各自做了哪些事情。

2. 客户端健康检查

2.1 临时实例的健康检查

从 Nacos 2.x 开始,临时实例的服务注册由原来的 HTTP 更换为了 GRPC 长连接方式。Nacos Client 和 Nacos Server 之间建立的 RPC 长连接,服务注册、服务取消注册等接口都是通过 GRPC 消息与服务端通信的。

GRPC 长连接是一直存在的,只要连接一直存在就代表Nacos Client 和 Nacos Server 之间的连接是通的,Nacos Client 则一直在线。如果Nacos Client 由于网络问题等其他问题挂掉了,那么这条长连接也会断开连接。

那么服务实例如何算健康呢,就是长连接一直存在没有断那就算健康的。

如果连接断掉了,那么该客户端上注册的全部服务实例都是不健康的了。

GRPC 长连接如果想一直保证连接状态,就需要定时发送心跳包,以确保连接处于活动的状态。否则一段时间不操作的话就会自动断开连接。

接下来看看 NacosClient 如何开启 RPC 长连接的

NacosClient 操作注册中心的 API 是通过 NamingService 进行的

 

在 NacosNamingService 的构造器中调用了 init 初始化方法:init 方法最后

进入最后一行代码:

再看最后 new NamingGrpcClientProxy 的源码:

public class NamingGrpcClientProxy {
    
 
    
    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
            NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) {
        
        // 省略部分代码
        
        // 创建 RPC Client
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        
        // 启动 RPC Client
        start(serverListFactory, serviceInfoHolder);
    }
    
    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        rpcClient.registerConnectionListener(redoService);
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        // 启动
        rpcClient.start();
       
    }

看看 RpcClient.start 做了什么

public abstract class RpcClient {
    
    protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
    
    public final void start() {
        // 省略部分代码
        
        // connection event consumer.
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        notifyDisConnected();
                    }
            }
        });
        
        Connection connectToServer = null;
        
        // 状态设置为启动中
        rpcClientStatus.set(RpcClientStatus.STARTING);
        
        int startUpRetryTimes = rpcClientConfig.retryTimes();
        while (startUpRetryTimes > 0 && connectToServer == null) {
            startUpRetryTimes--;
            ServerInfo serverInfo = nextRpcServer();
              
            // 建立连接
            connectToServer = connectToServer(serverInfo);
        }
        
        this.currentConnection = connectToServer;
        
        // 状态设置为 运行中
        rpcClientStatus.set(RpcClientStatus.RUNNING);
    
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        
        // 省略部分代码
    }
    
}

省略了一些代码,这次只关注核心的两个点

1. 找到下一个 RPC Server 建立连接

因为 Nacos 支持集群部署,此时的 RPC Server List 其实就是这些集群节点。也就是找到集群下一个节点建议连接,如果连接失败就走到下一轮循环再获取到下一个节点继续连接(再重试次数内循环)

2. eventLinkedBlockingQueue 队列中加入一项

eventLinkedBlockingQueue 队列里存的是 ConnectionEvent,ConnectionEvent 代表一个连接事件,事件有 已连接事件、断开连接事件。

public class ConnectionEvent {
        
    public static final int CONNECTED = 1;
        
    public static final int DISCONNECTED = 0;
        
    int eventType;
        
    public ConnectionEvent(int eventType) {
        this.eventType = eventType;
    }
        
    public boolean isConnected() {
        return eventType == CONNECTED;
    }
        
    public boolean isDisConnected() {
        return eventType == DISCONNECTED;
    }
}

可见上面的源码,连接建立成功后,就会往队列压入一个 已连接事件 CONNECTED

队列事件的消费者在哪里呢?

便是在 start 方法的最开头定义的,while 循环不断从队列中获取到数据然后根据事件类型,进行各自的通知。

    public final void start() {
        // 省略部分代码
        
        // connection event consumer.
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take = eventLinkedBlockingQueue.take();
                
                    if (take.isConnected()) {
                        // 通知连接
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        // 通知断开连接
                        notifyDisConnected();
                    }
            }
        });
    }

notifyConnected 和 notifyDisConnected 实现差不多,所以这里只看一个的实现源码。

protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
​
protected void notifyConnected() {
    // 省略部分源码
    if (connectionEventListeners.isEmpty()) {
        return;
    }
     
    // 循环全部监听器 一个个回调
    for (ConnectionEventListener connectionEventListener : connectionEventListeners) {                      connectionEventListener.onConnected();    
    }
}

connectionEventListeners 里的数据是什么时候加入的呢?

那就是在 NamingGrpcClientProxy.start 方法

 

public class NamingGrpcRedoService implements ConnectionEventListener {
    
    private volatile boolean connected = false;
    
    @Override
    public void onConnected() {
        // 建立连接,改变连接状态
        connected = true;
    }
    
    @Override
    public void onDisConnect() {
        connected = false;
        
        // 将 redoService 上的全部缓存数据一改
        synchronized (registeredInstances) {
            registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
        }
        synchronized (subscribes) {
            subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
        }
    }

当GRPC 长连接断开后就会进入 onDisConnect 事件回调中,这里改变了setRegistered 状态

上篇说过,redoService的作用 Nacos 注册中心 - 服务注册源码

此时会走入服务卸载流程。

2.2 永久实例的健康检查

永久实例客户端只负责提交一个请求即完成了全部操作。

健康检查工作由 服务端做。

3. 服务端健康检查

在收到客户端建立连接事件回调后,会调用 init 方法

public class IpPortBasedClient extends AbstractClient {
    
    public void init() {
        if (ephemeral) {
            beatCheckTask = new ClientBeatCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(beatCheckTask);
        } else {
            healthCheckTaskV2 = new HealthCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
        }
    }   
}

如果当前是临时实例:使用 ClientBeatCheckTaskV2 处理健康检查

如果当前是永久实例:使用 HealthCheckTaskV2处理健康检查

然后将任务放到线程池中执行定时执行

3.1 临时实例的健康检查

看看 ClientBeatCheckTaskV2 如何实现:

public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
    
    // 省略部分代码
    
    private final IpPortBasedClient client;
    
    private final InstanceBeatCheckTaskInterceptorChain interceptorChain;
    
    // 执行健康检查
    @Override
    public void doHealthCheck() {
        
        // 拿到当前客户端上注册的全部服务
        Collection<Service> services = client.getAllPublishedService();
        
        for (Service each : services) {
            HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
                        .getInstancePublishInfo(each);
            
            // 将全部服务用拦截器链一个个执行
            interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
        } 
    }
    
    @Override
    public void run() {
        doHealthCheck();
    }
}

看看拦截器链如何实现

这里的拦截器链是一个典型的责任链模式

public abstract class AbstractNamingInterceptorChain<T extends Interceptable>
        implements NacosNamingInterceptorChain<T> {
    
    @Override
    public void doInterceptor(T object) {
        for (NacosNamingInterceptor<T> each : interceptors) {
            if (!each.isInterceptType(object.getClass())) {
                continue;
            }
            
            // 当前是责任节点,直接由该责任节点处理
            if (each.intercept(object)) {
                object.afterIntercept();
                
                // 不往后执行了
                return;
            }
        }
        
        // 如果没有责任节点执行,就调用 passIntercept
        object.passIntercept();
    }
}

首先第一个问题:拦截器都是些什么呢?

 

可见具体的有三个实现类

这三个类代表者三个地方的判断,判断是否开启了 健康心跳检查功能? 如果没开,那就被拦截了呀,就走不到后面的心跳检查代码了

ServiceEnableBeatCheckInterceptor

从 Service 的元数据上判断

public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
        // 获取当前 Service 的元数据
        Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());
        
        // 如果元数据存在,并且其数据 enableClientBeat 配置了
        if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            
            // 直接取 enableClientBeat 值
            return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));
        }
        
        return false;
    }
    
 
}

InstanceBeatCheckResponsibleInterceptor

并不是一个客户端要负责集群中全部节点的心跳处理的,而是只负责自己注册的。

public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        // 是否是当前责任节点
        return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());
    }
    
}

InstanceEnableBeatCheckInterceptor

这个就是实例级别的健康检查判断

public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
    
    @Override
    public boolean intercept(InstanceBeatCheckTask object) {
        NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
        HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();
        
        // 获取到实例上的元数据
        Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());
        // 从元数据上取
        if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            // 元数据存在取该值
            return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
        }
        
        // 从 extendDatum 中取数据
        if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
            return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
        }
        return false;
    }
   
}

如果都没被上面三个拦截器拦截掉,那就代表 当前实例是 开启了 健康检查,所以后面就要开始进行 检查操作

检查操作由 object.passIntercept(); 做

object.passIntercept(); 是什么呢?

就是刚才的 开始拦截方法最后一行

@Override
public void doInterceptor(T object) {
    for (NacosNamingInterceptor<T> each : interceptors) {
        if (!each.isInterceptType(object.getClass())) {
            continue;
        }
        // 拦截器是否拦截?
        if (each.intercept(object)) {
            object.afterIntercept();
            // 拦截了,直接返回
            return;
        }
    }
    
    // 未被拦截到
    object.passIntercept();
}

object 是什么?

就是之前传过来的 InstanceBeatCheckTask

 

接下来看看 InstanceBeatCheckTask

public class InstanceBeatCheckTask implements Interceptable {
    
    // 全部检查项目
    private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
    
    private final IpPortBasedClient client;
    
    private final Service service;
    
    private final HealthCheckInstancePublishInfo instancePublishInfo;
    
    static {
        // 添加检查项目
        CHECKERS.add(new UnhealthyInstanceChecker());
        CHECKERS.add(new ExpiredInstanceChecker());
        // SPI 机制添加
        CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
    }
    
    @Override
    public void passIntercept() {
        // 遍历全部检查项目
        for (InstanceBeatChecker each : CHECKERS) {
            // 开始检查
            each.doCheck(client, service, instancePublishInfo);
        }
    }
}

全部检查项目 都是什么呢?

 

下面分别介绍

UnhealthyInstanceChecker

不健康实例检查器

public class UnhealthyInstanceChecker implements InstanceBeatChecker {
    
    // 开始做检查
    @Override
    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        if (instance.isHealthy() && isUnhealthy(service, instance)) {
            // 当前实例不健康了 -> 改变健康状态为 不健康
            changeHealthyStatus(client, service, instance);
        }
    }
    
    // 判断实例是否健康
    private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
        // 获取超时时间 默认 15 秒;可通过配置更改。
        long beatTimeout = getTimeout(service, instance);
        
        // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点不健康了
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
    }
    
​
    // 改变健康状态
    private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        instance.setHealthy(false);
            
        // 省略部分代码
    }
    
}

ExpiredInstanceChecker

过期实例检查器

public class ExpiredInstanceChecker implements InstanceBeatChecker {
    
    @Override
    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
        if (expireInstance && isExpireInstance(service, instance)) {
            // 如果实例过期了,则直接剔除实例
            deleteIp(client, service, instance);
        }
    }
    
    private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
        // 获取超时时间 默认 30 秒;可通过配置更改。
        long deleteTimeout = getTimeout(service, instance);
        
        // 当前时间距离上一次发送心跳包时间  超过了 规定的超时时间  则返回 true,代表节点过期了,需要进行节点剔除操作
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
    }
    
    
    /**
     * 服务直接剔除掉
     */
    private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
        client.removeServiceInstance(service);
        
        // 客户端下线
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
        // 元数据改变
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
        // 注销实例
        NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
                false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(),
                service.getName(), instance.getIp(), instance.getPort()));
    }

3.2 永久实例服务端健康检查

永久实例的健康检查是服务端主动探测方式,服务端定时外部请求客户端来看是否健康。

public class IpPortBasedClient extends AbstractClient {
    
    public void init() {
        if (ephemeral) {
            beatCheckTask = new ClientBeatCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(beatCheckTask);
        } else {
            healthCheckTaskV2 = new HealthCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
        }
    }   
}

入口类是 HealthCheckTaskV2

public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {
 
    // 省略部分代码
    
    private final IpPortBasedClient client;
      
    @Override
    public void doHealthCheck() {
         
        // 获取到当前客户端上注册的全部节点
        for (Service each : client.getAllPublishedService()) {
            // 如果开启了健康检查
            if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
                
                // 拿到实例注册信息
                InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
                // 拿到集群元数据
                ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
                
                // 调用 HealthCheckProcessorV2Delegate.process()
                ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
​
                }
            }
    }
​
    
    @Override
    public void run() {
        doHealthCheck();
    }
    
}

最终调用了 HealthCheckProcessorV2Delegate.process 方法

看看如何实现

HealthCheckProcessorV2Delegate 这个类就是一个委托类

public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
    
    // 类型,健康检查实现类
    private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
​
    
    @Autowired
    public void addProcessor(Collection<HealthCheckProcessorV2> processors) {
        healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null)
                .collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));
    }
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        // 从元数据中获取到当前的健康检查类型 (HTTP、MySQL、TCP、None)
        String type = metadata.getHealthyCheckType();
        
        // 根据类型找到具体的 健康检查类
        HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
        if (processor == null) {
            // 找不到 就使用 None 健康检查
            processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
        }
        
        // 开始进行健康检查
        processor.process(task, service, metadata);
    }
}

健康检查有如下几类,还可通过 SPI 方式扩展

下面一个一个介绍

NoneHealthCheckProcessor

None 代表不做健康检查,所以这个类的Process 为空实现

public class NoneHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.NONE.name();
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
    
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
}

TcpHealthCheckProcessor

TCP 健康检查,用于通过 TCP 方式检查是否健康,本质上是通过建立 Socket 连接,发送 Socket 信息实现

public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {
    
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        // 省略
    }
  
    // 省略部分代码
    
    
    private class TaskProcessor implements Callable<Void> {
​
        @Override
        public Void call() {
         
            // 发送 Socket 请求
                SocketChannel channel = null;
           
                HealthCheckInstancePublishInfo instance = beat.getInstance();
                
                BeatKey beatKey = keyMap.get(beat.toString());
                if (beatKey != null && beatKey.key.isValid()) {
                    if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                        instance.finishCheck();
                        return null;
                    }
                    
                    beatKey.key.cancel();
                    beatKey.key.channel().close();
                }
                
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                // only by setting this can we make the socket close event asynchronous
                channel.socket().setSoLinger(false, -1);
                channel.socket().setReuseAddress(true);
                channel.socket().setKeepAlive(true);
                channel.socket().setTcpNoDelay(true);
                
                ClusterMetadata cluster = beat.getMetadata();
                int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();
                channel.connect(new InetSocketAddress(instance.getIp(), port));
                
                SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                key.attach(beat);
                keyMap.put(beat.toString(), new BeatKey(key));
                
                beat.setStartTime(System.currentTimeMillis());
                
                GlobalExecutor
                        .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
          
            
            return null;
        }
    }
}

MysqlHealthCheckProcessor

本质是 发送一个 sql。(sql从配置中获取),整个过程没报异常就算健康

public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.MYSQL.name();
    
    private final HealthCheckCommonV2 healthCheckCommon;
    
    private final SwitchDomain switchDomain;
    
    public static final int CONNECT_TIMEOUT_MS = 500;
    
    private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
    
    private static final String MYSQL_SLAVE_READONLY = "ON";
    
    private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();
    
    public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
        this.healthCheckCommon = healthCheckCommon;
        this.switchDomain = switchDomain;
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
      
        // 省略
    }
    
    private class MysqlCheckTask implements Runnable {
        
        @Override
        public void run() {
            
            Statement statement = null;
            ResultSet resultSet = null;
            
        
                String clusterName = instance.getCluster();
                String key =
                        service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance
                                .getPort();
                Connection connection = CONNECTION_POOL.get(key);
                Mysql config = (Mysql) metadata.getHealthChecker();
                
                if (connection == null || connection.isClosed()) {
                    String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="
                            + CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;
                    connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());
                    CONNECTION_POOL.put(key, connection);
                }
                
                statement = connection.createStatement();
                statement.setQueryTimeout(1);
                
                resultSet = statement.executeQuery(config.getCmd());
                int resultColumnIndex = 2;
                
                if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
                    resultSet.next();
                    if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
                        throw new IllegalStateException("current node is slave!");
                    }
                }
                
                healthCheckCommon.checkOk(task, service, "mysql:+ok");
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
                        switchDomain.getMysqlHealthParams());
​
    }
}

HttpHealthCheckProcessor

本质是发送一个 HTTP 请求,返回状态码 200 就算健康

public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {
    
    public static final String TYPE = HealthCheckType.HTTP.name();
    
    private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager
            .getProcessorNacosAsyncRestTemplate();
    
    private final HealthCheckCommonV2 healthCheckCommon;
​
    
    @Override
    public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (null == instance) {
            return;
        }
        try {
         
            if (!instance.tryStartCheck()) {
                SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",
                        service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
                healthCheckCommon
                        .reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
                return;
            }
            
            Http healthChecker = (Http) metadata.getHealthChecker();
            int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();
            URL host = new URL(HTTP_PREFIX + instance.getIp() + ":" + ckPort);
            URL target = new URL(host, healthChecker.getPath());
            Map<String, String> customHeaders = healthChecker.getCustomHeaders();
            Header header = Header.newInstance();
            header.addAll(customHeaders);
            // 发送 HTTP 请求
            ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
                    new HttpHealthCheckCallback(instance, task, service));
            MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
        } catch (Throwable e) {
            instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());
            healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());
            healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                    switchDomain.getHttpHealthParams());
        }
    }
    
    @Override
    public String getType() {
        return TYPE;
    }
    
    private class HttpHealthCheckCallback implements Callback<String> {
        
        private final HealthCheckTaskV2 task;
        
        private final Service service;
        
        private final HealthCheckInstancePublishInfo instance;
        
        private long startTime = System.currentTimeMillis();
        
        public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,
                Service service) {
            this.instance = instance;
            this.task = task;
            this.service = service;
        }
        
        @Override
        public void onReceive(RestResult<String> result) {
            instance.setCheckRt(System.currentTimeMillis() - startTime);
            int httpCode = result.getCode();
            if (HttpURLConnection.HTTP_OK == httpCode) {
                healthCheckCommon.checkOk(task, service, "http:" + httpCode);
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
                        switchDomain.getHttpHealthParams());
            } else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
                    || HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
                // server is busy, need verification later
                healthCheckCommon.checkFail(task, service, "http:" + httpCode);
                healthCheckCommon
                        .reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
            } else {
                //probably means the state files has been removed by administrator
                healthCheckCommon.checkFailNow(task, service, "http:" + httpCode);
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                        switchDomain.getHttpHealthParams());
            }
        }
        
        @Override
        public void onError(Throwable throwable) {
            Throwable cause = throwable;
            instance.setCheckRt(System.currentTimeMillis() - startTime);
            int maxStackDepth = 50;
            for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
                if (HttpUtils.isTimeoutException(cause)) {
                    healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());
                    healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
                            switchDomain.getHttpHealthParams());
                    return;
                }
                cause = cause.getCause();
            }
            
          
            if (throwable instanceof ConnectException) {
                healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());
            } else {
                healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());
            }
            healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
                    switchDomain.getHttpHealthParams());
        }
        
        @Override
        public void onCancel() {
        
        }
    }
}
​

可见,不同方式的健康检查差异还是挺大的,那么如果将检查结果告知 Nacos 呢,那就是调用

// 健康检查结果:健康
healthCheckCommon.checkOk(task, service, "");

// 健康检查结果:不健康
healthCheckCommon.checkFail(task, service, "");

健康检查成功

检查成功主要做的事情就是 重置失败次数、结束检查

public void checkOk(HealthCheckTaskV2 task, Service service, String msg) {
    try {
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (instance == null) {
            // 实例不存在,不做处理
            return;
        }
     
            if (!instance.isHealthy()) {
                // 如果实例不健康的,将状态改为 健康
                // 代码省略
            }
        
        } finally {
            // 重置失败次数
            instance.resetFailCount();
            // 结束检查
            instance.finishCheck();
        }
}

健康检查失

public void checkFail(HealthCheckTaskV2 task, Service service, String msg) {
​
        HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
                .getInstancePublishInfo(service);
        if (instance == null) {
            return;
        }
        try {
            if (instance.isHealthy()) {
                // 如果实例是健康的,将状态改为 不健康
                // 代码省略
            }
            
        } finally {
            // 重置健康次数
            instance.resetOkCount();
            // 结束检查
            instance.finishCheck();
        }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/1269.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode234_234. 回文链表

LeetCode234_234. 回文链表 一、描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true示例 2&…

Day920.结构化日志业务审计日志 -SpringBoot与K8s云原生微服务实践

结构化日志&业务审计日志 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于结构化日志&业务审计日志的内容。 1、什么是结构化日志 结构化日志&#xff08;Structured Logging&#xff09;是一种将日志信息组织为结构化数据的技术。 传统的日志通常是一些文…

UE实现建筑分层抽屉展示效果

文章目录 1.实现目标2.实现过程2.1 基础设置2.2 核心函数3.参考资料1.实现目标 使用时间轴对建筑楼层的位置偏移进行控制,实现分层抽屉的动画展示效果。 2.实现过程 建筑抽屉的实现原理比较简单,即对Actor的位置进行偏移,计算并更新其世界位置即可。这里还是基于ArchVizExp…

Mybatis报BindingException:Invalid bound statement (not found)异常

一、前言 本文的mybatis是与springboot整合时出现的异常&#xff0c;若使用的不是基于springboot&#xff0c;解决思路也大体一样的。 二、从整合mybatis的三个步骤排查问题 但在这之前&#xff0c;我们先要知道整合mybatis的三个重要的工作&#xff0c;如此才能排查&#x…

SDG,ADAM,LookAhead,Lion等优化器的对比介绍

本文将介绍了最先进的深度学习优化方法&#xff0c;帮助神经网络训练得更快&#xff0c;表现得更好。有很多个不同形式的优化器&#xff0c;这里我们只找最基础、最常用、最有效和最新的来介绍。 优化器 首先&#xff0c;让我们定义优化。当我们训练我们的模型以使其表现更好…

MySQL中事务的相关问题

事务 一、事务的概述&#xff1a; 1、事务处理&#xff08;事务操作&#xff09;&#xff1a;保证所有事务都作为一个工作单元来执行&#xff0c;即使出现了故障&#xff0c;都不能改变这种执行方式。当在一个事务中执行多个操作时&#xff0c;要么所有的事务都被提交(commit…

[ROC-RK3568-PC] [Firefly-Android] 10min带你了解Camera的使用

&#x1f347; 博主主页&#xff1a; 【Systemcall小酒屋】&#x1f347; 博主追寻&#xff1a;热衷于用简单的案例讲述复杂的技术&#xff0c;“假传万卷书&#xff0c;真传一案例”&#xff0c;这是林群院士说过的一句话&#xff0c;另外“成就是最好的老师”&#xff0c;技术…

再也不想去字节跳动面试了,6年测开面试遭到这样打击.....

前几天我朋友跟我吐苦水&#xff0c;这波面试又把他打击到了&#xff0c;做了快6年软件测试员。。。为了进大厂&#xff0c;也花了很多时间和精力在面试准备上&#xff0c;也刷了很多题。但题刷多了之后有点怀疑人生&#xff0c;不知道刷的这些题在之后的工作中能不能用到&…

【Python/Opencv】图像权重加法函数:cv2.addWeighted()详解

【Python/Opencv】图像权重加法函数&#xff1a;cv2.addWeighted()详解 文章目录【Python/Opencv】图像权重加法函数&#xff1a;cv2.addWeighted()详解1. 介绍2. API3. 代码示例与效果3.1 代码3.2 效果4. 参考1. 介绍 在OpenCV图像加法cv2.add函数详解详细介绍了图像的加法运…

字符串匹配【BF、KMP算法】

文章目录:star:BF算法代码实现BF的改进思路:star:KMP算法&#x1f6a9;next数组&#x1f6a9;代码实现优化next数组最终代码⭐️BF算法 BF算法&#xff0c;即暴力(Brute Force)算法&#xff0c;是普通的模式匹配算法&#xff0c;BF算法的思想就是将主串S的第一个字符与模式串P…

三、Python 操作 MongoDB ----非 ODM

文章目录一、连接器的安装和配置二、新增文档三、查询文档四、更新文档五、删除文档一、连接器的安装和配置 pymongo&#xff1a; MongoDB 官方提供的 Python 工具包。官方文档&#xff1a; https://pymongo.readthedocs.io/en/stable/ pip安装&#xff0c;命令如下&#xff1…

JVM调优,调的是什么?目的是什么?

文章目录前言一、jvm是如何运行代码的&#xff1f;二、jvm的内存模型1 整体内存模型结构图2 堆中的年代区域划分3 对象在内存模型中是如何流转的?4 什么是FULL GC,STW? 为什么会发生FULL GC?5 要调优,首先要知道有哪些垃圾收集器及哪些算法6 调优不是盲目的,要有依据,几款内…

HttpRunner3.x(1)-框架介绍

HttpRunner 是一款面向 HTTP(S) 协议的通用测试框架&#xff0c;只需编写维护一份 YAML/JSON 脚本&#xff0c;即可实现自动化测试、性能测试、线上监控、持续集成等多种测试需求。主要特征继承的所有强大功能requests &#xff0c;只需以人工方式获得乐趣即可处理HTTP&#xf…

【每日反刍】——指针运算

&#x1f30f;博客主页&#xff1a;PH_modest的博客主页 &#x1f6a9;当前专栏&#xff1a;每日反刍 &#x1f48c;其他专栏&#xff1a; &#x1f534; 每日一题 &#x1f7e2; 读书笔记 &#x1f7e1; C语言跬步积累 &#x1f308;座右铭&#xff1a;广积粮&#xff0c;缓称…

【Java进阶篇】—— File类与IO流

一、File类的使用 1.1 概述 File 类以及本章中的各种流都定义在 java.io 包下 一个File对象代表硬盘或网络中可能存在的一个文件或文件夹&#xff08;文件目录&#xff09; File 能新建、删除、重命名 文件和目录&#xff0c;但 File不能访问文件内容本身。如果我们想要访问…

【LeetCode】二叉树基础练习 5 道题

第一题&#xff1a;单值二叉树 题目介绍&#xff1a; 如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。 只有给定的树是单值二叉树时&#xff0c;才返回true&#xff1b;否则返回false。 //题目框架 bool isUnivalTree(struct TreeNode* root){ }…

【24】Verilog进阶 - 序列检测2

VL35 状态机-非重叠的序列检测 1 思路 状态机嘛,也是比较熟悉的朋友啦, 我就火速写出了STG。如下黑色所示: 2 初版代码 `timescale 1ns/1nsmodule sequence_test1(input wire clk ,input wire rst ,input wire data ,output reg flag ); //*************code**********…

系统架构:经典三层架构

引言 经典三层架构是分层架构中最原始最典型的分层模式&#xff0c;其他分层架构都是其变种或扩展&#xff0c;例如阿里的四层架构模式和DDD领域驱动模型。阿里的 四层架构模型在三层基础上增加了 Manager 层&#xff0c;从而形成变种四层模型&#xff1b;DDD架构则在顶层用户…

Canvas百战成神-圆(1)

Canvas百战成神-圆 初始化容器 <canvas id"canvas"></canvas>canvas{border: 1px solid black; }让页面占满屏幕 *{margin: 0;padding: 0; } html,body{width: 100%;height: 100%;overflow: hidden; } ::-webkit-scrollbar{display: none; }初始化画笔…

JavaEE--Thread 类的基本用法(不看你会后悔的嘿嘿)

Thread类是JVM用来管理线程的一个类,换句话说,每个线程都唯一对应着一个Thread对象. 因此,认识和掌握Thread类弥足重要. 本文将从 线程创建线程中断线程等待线程休眠获取线程实例 等方面来进行具体说明. 1)线程创建 方法1:通过创建Thread类的子类并重写run () 方法 class M…