文章目录
- 心跳机制与服务健康检查
- NacosClient端
- NacosServer端
- NacosServer端健康检查
- 服务发现
- NacosClient端
- NacosServer端
- AP集群
- 从源码启动集群
- 心跳设计原理
- 各节点状态同步
- 服务实例数据同步
- 服务实例状态变动同步
心跳机制与服务健康检查
官方文档:发送某个实例的心跳接口信息
心跳机制 在线流程图
健康检查在线流程图
健康检查:
NacosClient端
现在先回到NacosClient进行服务注册时的代码,通过调用NamingService.registerInstance(...)
方法进行服务注册
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 该方法 往NacosServer中注册一个微服务实例
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 是否为临时实例,如果是则进行延迟发送心跳
if (instance.isEphemeral()) {
// 创建一个BeatInfo对象,其中存放的该instance实例所有重要的信息,比如服务名、ip、端口、集群、权重等等
// 其中有一个关键信息 实例心跳间隔时长,配置为preserved.heart.beat.interval,如果没有默认值为5s 会保存在period属性中
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 该方法会执行一次延时任务,在该延迟任务中它会使用延迟任务嵌套调用自己
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 调用该方法进行微服务注册,instance对象中保存着微服务的各种信息,比如ip、端口、访问权重、健康状态、是否上线等等
serverProxy.registerService(groupedServiceName, groupName, instance);
}
// 首先是构建一个BeatInfo对象
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
// 实例心跳间隔时长,配置为preserved.heart.beat.interval,如果没有指定默认值为5s
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
// 接下来就是执行延迟任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 执行一次延迟任务,在该BeatTask任务中它会使用延迟任务嵌套调用自己
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
再来看BeatTask
类中run()方法具体的实现
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 获取心跳间隔时间
long nextTime = beatInfo.getPeriod();
try {
// 调用NacosServer的/v1/ns/instance/beat,发送一次心跳请求
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
......
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
}
// 继续调用自己
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
// 发送心跳请求
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
NacosServer端
接收客户端的心跳接口/nacos/v1/ns/instance/beat
,该接口找到instance所属的service后,再调用processClientBeat()
方法进行处理
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
// beat是JSON格式字符串:实例心跳内容
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 获取到发送心跳的实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 实例为null的处理逻辑
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 更新该服务实例最后发送心跳时间,这里是其他线程异步进行的处理
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
修改instance的LastBeat时间
// 这里会有一个线程,直接去执行ClientBeatProcessor中的run()方法,所以核心就是直接去看run()方法的执行逻辑
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
// 先得到集群
Cluster cluster = service.getClusterMap().get(clusterName);
// 再从集群中去实例集合
List<Instance> instances = cluster.allIPs(true);
// 再遍历集合,通过ip+端口找到对应的实例
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 修改该实例最后一次心跳时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
NacosServer端健康检查
健康检查在线流程图
健康检查机制是在服务注册的流程中进行的,接下来回忆一下服务注册的流程:
NacosClient端通过调用/v1/ns/instance
接口发送post请求会往NacosServer端进行服务实例注册。
而在这个过程中,如果该服务实例是本服务中第一个进行的注册,此时NacosServer都还没有service,那么此时就会去创建一个service。
还会调用该服务的初始化方法service.init();
public void init() {
// NacosClient客户端的健康检查
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
public static void scheduleCheck(ClientBeatCheckTask task) {
// GlobalExecutor.scheduleNamingHealth()就是一个定时任务,延时5秒执行一次,往后每隔5秒执行一次task任务
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
所以接下来服务实例健康检查的详细代码就在clientBeatCheckTask.run()
方法中
public void run() {
try {
// nacos集群相关判断
// 具体的实现是对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模
// 最终只允许一个nacos节点对某一个service进行健康检查
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 获取该服务所有的临时实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
// 首先检查实例的健康状态
for (Instance instance : instances) {
// 当前时间 - 上一次心跳时间 > 阀值默认15s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
// 修改该实例的健康状态,如果已经是不健康的实例了也就没必要重复修改了
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
// 删除过时的实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 当前时间 - 上一次心跳时间 > 阀值默认30s
// 再进行删除实例操作
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 服务实例的删除下线
// 这里其实就是往本服务器发送一个delete请求,调用的还是/v1/ns/instance接口
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
服务发现
官方文档:获取全部实例 NamingService接口方法说明
官方文档:查询服务下的实例列表接口信息
服务发现 在线流程图
NacosClient端
首先是NacosClient这边的流程,我们知道NamingService
它是一个最主要的接口,它其中有两类方法是用来获取服务实例的:
getAllInstances(...)
获取全部实例selectInstances(...)
根据条件获取过滤后的实例列表。可以获取健康或不健康的服务实例
下面方法主要就是调用getServiceInfo()
方法
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
// 会调用getServiceInfo(...)方法获取服务实例列表
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
下面方法中的核心代码就是updateServiceNow(serviceName, clusters);
。这里会先从本地缓存中找ServiceInfo对象,如果没有找到就会去向NacosServer发送请求查询所有服务列表,并将查询到的服务信息保存至本地缓存serviceInfoMap中
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 这里会先从NacosClient客户端自己本地缓存serviceInfoMap中取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 刚开始NacosClient客户端本地缓存是为空的,那么就会进入到这里的逻辑中
if (null == serviceObj) {
// 先创建一个对象存入缓存中
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 向NacosServer发送请求查询所有服务列表,并将查询到的服务信息保存至本地缓存serviceInfoMap中
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
// 在多线程的环境下,如果上面真正进行发送请求处理结果这个过程中的话,那么这个线程先调用wait()方法进行等待,等上面方法执行完后notifyAll唤醒
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 创建定时任务入口,拉取NacosServer端的服务实例列表并更新本地缓存serviceInfoMap,将该Future添加进futureMap集合中
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
// 只是调用updateService(String serviceName, String clusters)方法
private void updateServiceNow(String serviceName, String clusters) {
try {
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}
// 调用发送请求的方法,并将返回数据保存至NacosClient客户端本地缓存serviceInfoMap中
public void updateService(String serviceName, String clusters) throws NacosException {
// 当try{}语句段中主要流程执行完后,该对象会对外层方面中阻塞的线程进行唤醒
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 向NacosServer发送请求/nacos/v1/ns/instance/list,查询所有服务列表
// pushReceiver.getUdpPort(),如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClient
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
// 将查询到的服务信息保存至NacosClient客户端本地缓存serviceInfoMap中
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
// 发送请求
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
// udp端口,如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClient
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
接下来是延时任务的代码
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// 创建一个延迟任务,在任务最后会延迟嵌套调用本任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
// UpdateTask类的run()方法具体代码如下,主要还是通过updateService()方法发送请求并更新NacosClient客户端本地缓存serviceInfoMap
public void run() {
// 定时从NacosServer端拉取最新的服务实例列表
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
// 如果本地缓存中没有当前ServiceInfo,那么就直接向NacosServer发送一个查询服务实例列表请求,
// 并将结果封装为一个新的ServiceInfo对象并存入NacosClient客户端的本地缓存serviceInfoMap中
if (serviceObj == null) {
updateService(serviceName, clusters);
return;
}
// 发送查询服务实例列表请求,更新NacosClient客户端的本地缓存serviceInfoMap
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
// 嵌套执行本任务,执行时间和失败次数有关,最长是60秒调用一次
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
NacosServer端
NacosServer端就会去处理/v1/ns/instance/list
请求,NacosServer服务端的代码也很简单:
- 就是先获取请求参数,其中包括udp端口
- 通过请求参数中的namespaceId和serviceName去服务注册表
serviceMap
中获取到对应是Service - 再取出该service中所有的实例集合instances
- 将它根据健康实例与非健康实例分开,赋值给
Map<Boolean, List<Instance>> ipMap
- 遍历imMap,将结果封装成一个Map返回给客户端
具体源码如下
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
// 处理请求参数
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
// udp端口,如果服务实例发生了改变,NacosServer会通过udpPort主动推送最新的数据给NacosClient
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 调用doSrvIpxt()进行instances实例的获取
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 先根据namespaceId+serviceName从注册表serviceMap中取出Service
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
// service为null就表示当前服务没有实例,直接返回
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
checkIfDisabled(service);
List<Instance> srvedIPs;
// 获取service服务的所有实例instance集合
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
// 过滤筛选ip
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
// 如果该服务没有相应的实例,也直接返回
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
// 将实例集合分为健康实例和不健康实例,存入IPMap中
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
// 是否只要健康服务实例
if (healthyOnly && !entry.getKey()) {
continue;
}
// 遍历所有服务实例,并存入ArrayNode hosts中
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
// 处理响应数据
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
AP集群
从源码启动集群
第一步,运行sql脚本: distribution/conf 目录下的 nacos-mysql.sql 脚本
第二步:修改 console\src\main\resources 目录下的 application.properties 文件里的mysql配置
第三步,创建一个目录,用来存放某个nacos节点运行时的数据,同时在目录下创建一个conf目录,在该目录下创建一个cluster.conf文件,文件内容为各个nacos节点的ip和端口
然后将目录复制三份,并修改目录名
第四步,运行NacosServer,需要添加两个JVM参数-Dserver.port=8848 -Dnacos.home=D:\nacos-cluster\nacos-8848
,其他两个也要做相应的修改
结果
心跳设计原理
我们知道服务实例注册进NacosServer之后,它会定时的发送心跳给NacosServer。在NacosServer端中,每一个service都有一个心跳健康检查任务定时执行。
那么在集群模式下,我现有有两种方案:
- 方案一,我在每一台Nacos节点上都有定时心跳检测任务
- 方案二,我只在一台Nacos节点上有定时心跳检测任务,如果有不健康的instance出现或者下线后,这台节点再同步数据给其他节点
Nacos集群,它的实现是只允许一个service在一台nacos节点上进行健康检查机制。就比如下面的代码中
服务实例健康检查的详细代码就在clientBeatCheckTask.run()
方法中,第一个if判断就会对serviceName进行哈希运算,在对nacos集群节点数进行取模
public void run() {
try {
// nacos集群相关判断
// 具体的实现是对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模
// 最终只允许一个nacos节点对某一个service进行健康检查
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 获取该服务所有的临时实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
// 首先检查实例的健康状态
for (Instance instance : instances) {
// 当前时间 - 上一次心跳时间 > 阀值默认15s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
// 修改该实例的健康状态,如果已经是不健康的实例了也就没必要重复修改了
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
// 删除过时的实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 当前时间 - 上一次心跳时间 > 阀值默认30s
// 再进行删除实例操作
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 服务实例的删除下线
// 这里其实就是往本服务器发送一个delete请求,调用的还是/v1/ns/instance接口
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
public boolean responsible(String serviceName) {
final List<String> servers = healthyList;
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true;
}
if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
// 对我们是serviceName进行哈希运算,在对nacos集群节点数进行取模
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
各节点状态同步
Nacos集群中,存在一个定时任务用来同步各个节点之间的状态,方便感知到其他节点是否已经挂了。
入口在ServerListManager
的内部类ServerStatusReporter.run()
方法中
// ServerListManager它使用了@Component注解,表示它是一个bean,同时它还使用了@PostConstruct注解,进行初始化方法的执行,
@Component("serverListManager")
public class ServerListManager extends MemberChangeListener {
...
@PostConstruct
public void init() {
// 初始化方法中执行延迟任务,对Nacos集群同步各节点之间的健康状态
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
}
...
}
各节点状态同步详细代码如下所示
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
if (EnvUtil.getPort() <= 0) {
return;
}
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {
weight = 1;
}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
+ "\r\n";
// 获取所有的NacosServer节点,这里的servers就是从我们./conf/cluster.conf文件中获取的信息
List<Member> allServers = getServers();
if (!contains(EnvUtil.getLocalAddress())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
EnvUtil.getLocalAddress(), allServers);
return;
}
if (allServers.size() > 0 && !EnvUtil.getLocalAddress()
.contains(IPUtil.localHostIP())) {
// 使用for循环,向哥节点发送心跳
for (Member server : allServers) {
if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
continue;
}
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
Loggers.SRV_LOG
.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",
server.getAddress(), MemberMetaDataConstants.VERSION,
server.getExtendVal(MemberMetaDataConstants.VERSION));
continue;
}
Message msg = new Message();
msg.setData(status);
// 发送心跳
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
// 嵌套调用自己
GlobalExecutor
.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
服务实例数据同步
服务中新增/移除了一个instance实例,对该实例同步至其他节点。
我们在新增/移除实例时,其实他们都调用了这个方法consistencyService.put(key, instances);
最终都是会进入到下面的代码中,onPut(key, value);
就是往阻塞队列中添加数据,而下面这一行就是对nacos集群中各个节点进行实例数据同步
public void put(String key, Record value) throws NacosException {
// 将key和所有服务实例封装的Record对象封装成一个datum对象,并保存到一个map集合中。
// 同时还有一个key和DataOperation枚举操作类型添加进阻塞队列的操作。
// 后续肯定有一个线程从这个注释队列中取出数据,然后根据key把datum对象取出来
onPut(key, value);
// 各个nacos节点进行实例数据同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
服务实例状态变动同步
服务中的某一个实例停止运行,过段时间后,该服务实例的状态就变为不健康。
这个变动同步至其他各个节点是通过ServiceManager
的内部类ServiceReporter.run()
来实现的
@Component
public class ServiceManager implements RecordListener<Service> {
...
@PostConstruct
public void init() {
// 执行延迟任务,同步服务实例的健康状态
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
if (emptyServiceAutoClean) {
Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
cleanEmptyServiceDelay, cleanEmptyServicePeriod);
// delay 60s, period 20s;
// This task is not recommended to be performed frequently in order to avoid
// the possibility that the service cache information may just be deleted
// and then created due to the heartbeat mechanism
GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
cleanEmptyServicePeriod);
}
try {
Loggers.SRV_LOG.info("listen for service meta change");
consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
} catch (NacosException e) {
Loggers.SRV_LOG.error("listen for service meta change failed!");
}
}
....
}
每一个命名空间向各节点发送一次请求,发送数据为当前命名空间下所有服务的所有实例信息
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
// 遍历命名空间
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
// 遍历该命名空间下的所有service服务
for (String serviceName : allServiceNames.get(namespaceId)) {
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
// 将该服务下的所有实例信息进行字符串拼接,在MD5加密,对结果保存至checksum中
service.recalculateChecksum();
checksum.addItem(serviceName, service.getChecksum());
}
// 循环结束,checksum保存的当前命名空间下所有服务的所有实例信息
Message msg = new Message();
msg.setData(JacksonUtils.toJson(checksum));
// 获取所有nacos集群节点
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
for (Member server : sameSiteServers) {
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
// 发送请求
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}