文章目录
- 前言
- 一、客户端实例变更:
- 二、实例变更感知:
- 2.1 实例注册信息通知:
- 2.1.1 接收DistroDataRequest 请求:
- 2.1.2 onReceive 处理请求:
- 2.1.3 processData 处理请求:
- 2.1.4 handlerClientSyncData 处理数据:
- 2.1.5 upgradeClient 数据对比和更新:
- 2.2 服务端节点启动,全量拉取数据:
- 2.2.1 DistroProtocol对象创建触发全量拉取:
- 2.2.2 DistroLoadDataTask 全量任务的执行:
- 2.2.3 load 拉取其它节点实例信息:
- 2.2.4 loadAllDataSnapshotFromRemote 拉取&处理:
- 2.2.4.1 获取数据和处理:
- 2.2.4.2 获取数据:
- 2.3 集群节点心跳监测:
- 总结
前言
Nacos 集群中的节点通过distro 协议,grpc 通信互相同步节点中的实例信息;本文对服务端实例同步的3种场景进行介绍;服务端版本 3.0.13。
一、客户端实例变更:
我们知道客户端在启动的时候会与服务端建立grpc 连接,并且在启动完成向其注册实例信息,此时客户端的实例只被保存在服务端的某一个节点上,需要将改实例信息发送到集群中的其它节点;
我们知道Nacos 是集群的,支持进行水平扩展,所以在向集群内添加节点时,新加入的节点也需要获取到其它节点保存的实例信息,并保存到自己的节点上;
Nacos 集群中的节点,可能分布在不同的环境,节点之间需要网络进行连接,此时就不可能避免的出现,集群内某个节点短暂失联的情况,当网络恢复正常后,落后的节点就需要同步其它节点的信息,并覆盖本地的实例信息,从而达到实例信息数据的一致性;
本文对以上提到的3中场景展开进行介绍。
二、实例变更感知:
2.1 实例注册信息通知:
当客户端注册到集群中的某个节点,该节点需要在保存实例注册信息后,也需要负责将注册的实例信息同步到集群内的其它节点;
在:源码篇–Nacos服务–中章(8):Nacos服务端感知客户端注册-2 ,介绍了集群内节点通信通道的建立;现在就可以使用改通道向集群内其它运行的节点同步客户端实例信息;
2.1.1 接收DistroDataRequest 请求:
DistroDataRequestHandler distro 协议处理类负责对集群内发送的 DistroDataRequest (实例变更请求)请求进行处理;
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
// 获取操作类型
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
// 实例变化类型
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
// 请求处理
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
2.1.2 onReceive 处理请求:
从请求的DistroData 参数中解析出来发送端注册的客户端实例信息,然后与本节点进行对比,完成对本节点注册实例的更新;
/**
* Receive synced distro data, find processor to process.
*
* @param distroData Received data
* @return true if handle receive data successfully, otherwise false
*/
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
// 获取资源类型: Nacos:Naming:v2:ClientData
String resourceType = distroData.getDistroKey().getResourceType();
// 获取资源处理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
// 数据处理
return dataProcessor.processData(distroData);
}
2.1.3 processData 处理请求:
对CHANGE 事件做出处理,先反序列化得到原始数据,然后进行处理;
@Override
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
// 反序列化,传入的调用sync 接口的节点下注册的实例信息
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
// 对实例信息进行处理
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
2.1.4 handlerClientSyncData 处理数据:
创建本节点client(对应发送端),集群内对每个节点(除了自己之外)都会建立对应的client 客户端(根据客户端的id 进行区分),后续可以使用其客户端,进行请求的发送;
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO
.info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
// 创建与发送请求节点服务的client 对象
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 数据更新
upgradeClient(client, clientSyncData);
}
2.1.5 upgradeClient 数据对比和更新:
先保存发送过来的客户端信息,然后在与本地保存的客户端信息进行对比,剔除掉本地过时的客户端实例信息;
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
for (int i = 0; i < namespaces.size(); i++) {
// 遍历命名空间
// 组装服务实例
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
// 获取本节点之前已经存在的服务注册实例
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
// 获取服务实例信息
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
// 与本地缓存的 服务实例进行对比
// 不相同,则在服务实例添加到 改client 下 ConcurrentHashMap<Service, InstancePublishInfo> publishers 中
client.addServiceInstance(singleton, instancePublishInfo);
// 发布服务实例注册事件
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false));
}
}
for (Service each : client.getAllPublishedService()) {
// 遍历本节点的所有服务注册实例key
if (!syncedService.contains(each)) {
// 本节点已经失效的服务注册实例,进行实例异常
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}
2.2 服务端节点启动,全量拉取数据:
集群内某个节点在启动时,全量拉取其它节点的实例信息,进行整理并保存到该节点的实例注册信息中;改部分的工作是在 DistroProtocol 对象构建时通过startLoadTask() 方法进行的;
2.2.1 DistroProtocol对象创建触发全量拉取:
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
// 开始任务
startDistroTask();
}
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
// 校验的定时任务 每隔5s 发送一次
startVerifyTask();
// 启动加载任务
startLoadTask();
}
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
// 值执行一次
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}
2.2.2 DistroLoadDataTask 全量任务的执行:
public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroConfig distroConfig, DistroCallback loadCallback) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroConfig = distroConfig;
this.loadCallback = loadCallback;
loadCompletedMap = new HashMap<>(1);
}
@Override
public void run() {
try {
// 加载
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
2.2.3 load 拉取其它节点实例信息:
遍历集群内除了自己的节点,只要有一个节点返回了注册的实例信息就可以进行本节点实例信息的更新;
private void load() throws Exception {
// 集群内只有自己,不需要加载数据
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
// 数据存储对象是否为空
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
// 从远处加载快照数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
2.2.4 loadAllDataSnapshotFromRemote 拉取&处理:
向集群中的某一节点发送 DistroDataRequest 事件是 SNAPSHOT,获取到返回的注册实例信息;反序列化得到原始数据,进行改节点的实例信息保存;
2.2.4.1 获取数据和处理:
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
// 传输代理类
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
// 数据处理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) {
// 遍历集群内其它节点
long startTime = System.currentTimeMillis();
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
// 从集群内其它节点获取注册的信息
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
getDistroDataLength(distroData));
// 处理
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
// 设置数据处理完毕标识
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
2.2.4.2 获取数据:
(1) DistroDataRequest 请求发送获取数据
@Override
public DistroData getDatumSnapshot(String targetServer) {
// 获取集群内改节点信息
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
// 构建 DistroDataRequest 对象
DistroDataRequest request = new DistroDataRequest();
request.setDataOperation(DataOperation.SNAPSHOT);
try {
// 通过 grpc 发送普通的request 请求
Response response = clusterRpcClientProxy
.sendRequest(member, request, DistroConfig.getInstance().getLoadDataTimeoutMillis());
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
(2)集群其它节点接收DistroDataRequest 并处理SNAPSHOT 事件:
DistroDataRequestHandler #handle 负责请求的处理;
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
// 获取操作类型
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
// 返回改节点下的注册实例信息
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
// 实例变化类型
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
快照信息获取1:
private DistroDataResponse handleSnapshot() {
DistroDataResponse result = new DistroDataResponse();
// 获取快照信息
DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
result.setDistroData(distroData);
return result;
}
快照信息获取2:
/**
* Query all datum snapshot.
*
* @param type datum type
* @return all datum snapshot
*/
public DistroData onSnapshot(String type) {
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
快照信息获取3:
@Override
public DistroData getDatumSnapshot() {
List<ClientSyncData> datum = new LinkedList<>();
// 遍历注册的客户端信息
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
datum.add(client.generateSyncData());
}
ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
snapshot.setClientSyncDataList(datum);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot);
return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}
@Override
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<String> batchNamespaces = new LinkedList<>();
List<String> batchGroupNames = new LinkedList<>();
List<String> batchServiceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
BatchInstanceData batchInstanceData = new BatchInstanceData();
// 遍历改服务端下注册的客户端实例
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
InstancePublishInfo instancePublishInfo = entry.getValue();
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
batchInstancePublishInfos.add(batchInstance);
buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
} else {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
}
// 返回当前服务端下注册的所有客户端实例
ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
data.getAttributes().addClientAttribute(REVISION, getRevision());
return data;
}
2.3 集群节点心跳监测:
篇幅原因,此章节放到 源码篇–Nacos服务–中章(8):Nacos服务端感知客户端实例变更(集群数据校验)-4 ,进行介绍。
总结
本文对Nacos 集群内实例注册的感知,对实例的注册;Nacos 集群节点启动,实例信息的同步进行介绍。