文章目录
- 了解CAP
- BASE理论
- Nacos支持CP还是AP
- 集群数据同步
- 实现集群数据一致性源码
了解CAP
CAP理论的核心观点是,一个分布式系统无法同时完全满足一致性、可用性和分区容错性这三个特性。具体而言,当发生网络分区时,系统必须在一致性和可用性之间做出选择:
CA(一致性和可用性):
系统在正常情况下能够保证一致性和可用性,但无法应对网络分区故障。一旦发生网络分区,系统必须放弃一致性或可用性中的一个。
CP(一致性和分区容错性):
系统在网络分区情况下能够保证一致性和分区容错性,但可能会牺牲部分可用性,即某些请求可能不会得到响应。
AP(可用性和分区容错性):
系统在网络分区情况下能够保证可用性和分区容错性,但可能会牺牲一致性,即不同节点可能会看到不一致的数据。
所谓的CAP定理,就是指在一个分布式系统中,CAP这三个指标,最多同时只能满足其中的两个,不可能三个都同时满足
实际应用中的选择
在实际应用中,不同的系统设计会根据具体需求在CAP三者之间进行权衡。
例如:
金融系统:通常更重视一致性(CP),确保数据的一致性和准确性,即使这可能会牺牲部分可用性。
社交媒体平台:通常更重视可用性和分区容错性(AP),确保用户始终可以访问和发布内容,即使这可能会导致短时间内的数据不一致。
BASE理论
BASE理论主要是包括以下三点:
基本可用(Basically Available):系统出现故障还是能够对外提供服务,不至于直接无法用了
软状态(Soft State):允许各个节点的数据不一致
最终一致性,(Eventually Consistent):虽然允许各个节点的数据不一致,但是在一定时间之后,各个节点的数据最终需要一致的
BASE理论其实就是妥协之后的产物。
Nacos支持CP还是AP
Nacos其实目前是同时支持AP和CP的
具体使用AP还是CP得取决于Nacos内部的具体功能,并不是有的文章说的可以通过一个配置自由切换。
就以服务注册举例来说,对于临时实例来说,Nacos会优先保证可用性,也就是AP
对于永久实例,Nacos会优先保证数据的一致性,也就是CP
集群数据同步
在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。
具体的代码分析:
1.先开始从服务注册接口开始:
地址:com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2.java
服务注册
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public Result<String> register(InstanceForm instanceForm) throws NacosException {
// check param
instanceForm.validate();
checkWeight(instanceForm.getWeight());
// build instance
Instance instance = buildInstance(instanceForm);
instanceServiceV2.registerInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
false, instanceForm.getNamespaceId(), instanceForm.getGroupName(), instanceForm.getServiceName(),
instance.getIp(), instance.getPort()));
return Result.success("ok");
}
2.添加实例
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//参数校验
NamingUtils.checkInstanceIsLegal(instance);
//判断是临时节点还是永久节点
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
client.recalculateRevision();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
3.添加并且同步
/**
* Request publisher publish event Publishers load lazily, calling publisher.
*
* @param eventType class Instances type of the event type.
* @param event event instance.
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
通知集群节点
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
实现集群数据一致性源码
进入DistroClientDataProcessor的实现类来执行onEvent
@Override
public void onEvent(Event event) {
if (EnvUtil.getStandaloneMode()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
syncToAllServer((ClientEvent) event);
}
}
通过syncToAllServer()方法同步数据给集群中的其它节点:
临时实例,使用distro协议同步; 持久实例:使用raft协议同步
private void syncToAllServer(ClientEvent event) {
/**
* 判断客户端是否为空,是否是临时实例,判断是否是负责节点
*
* 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
**/
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
//客户端断开连接
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
//客户端变更 新增或者修改
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
同步时,会涉及到一个负责节点和非负责节点。
负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。
DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。
DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。
对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
核心逻辑在syncToTarget
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
// distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
// 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。
/**
* 任务队列
* key:对应的服务
*/
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
protected final ReentrantLock lock = new ReentrantLock();
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
// 初始化任务队列
tasks = new ConcurrentHashMap<>(initCapacity);
// 创建定时任务的线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
// 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
public void addTask(Object key, AbstractDelayTask newTask) {
// 加锁防并发处理,key就是对应的服务
lock.lock();
try {
// ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
// 通过key判断是否已存在map中
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
// 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
newTask.merge(existTask);
}
// 将任务放入到map中,等待处理
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
/**
* 任务处理类
*/
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
// 从队列中移除这个任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
// 找到处理类
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
// 处理失败的话,重新入队(即重试)
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error ", e);
retryFailedTask(taskKey, task);
}
}
}
在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
// 设置默认的处理器
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
// unregister注册的是DELETE事件
case DELETE:
// 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
// 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。
可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:
public void addTask(Object tag, AbstractExecuteTask task) {
// 获取处理类
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
// 不为空,就用对应的processor处理
processor.process(task);
return;
}
// 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
/**
* 阻塞队列, 类型为Runnable,说明存入的是一个线程
*/
private final BlockingQueue<Runnable> queue;
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
// 添加任务到阻塞队列中
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
// 阻塞队列
this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
// 内部执行worker,实际上是一个线程
realWorker = new InnerWorker(this.name);
// 启动worker
realWorker.start();
}
在TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
// 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
Runnable task = queue.take();
long begin = System.currentTimeMillis();
// 执行任务
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e, e);
}
}
}
}
调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:
// AbstractDistroExecuteTask#run
public void run() {
String type = getDistroKey().getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
if (null == transportAgent) {
Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
return;
}
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
if (transportAgent.supportCallbackTransport()) {
doExecuteWithCallback(new DistroExecuteCallback());
} else {
executeDistroTask();
}
}
private void executeDistroTask() {
try {
boolean result = doExecute();
if (!result) {
// 失败重试
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();
最终将会执行DistroSyncChangeTask的doExecute()方法:
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
// 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return true;
}
// 将得到的数据同步给其他服务节点
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}
八、Nacos源码系列:Nacos集群数据同步
负责节点(发起同步)
DistroProtocol
获取同步数据getDistroData
执行同步数据syncData
非负责节点(接收请求)
在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。
ClientEvent.ClientChangedEvent事件的真正处理类是在DistroClientDataProcessor#onEvent方法:
public void onEvent(Event event) {
// 只有集群模式才有效,单机模式启动的Nacos,不会执行同步操作
if (EnvUtil.getStandaloneMode()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
// 同步数据给其它节点
syncToAllServer((ClientEvent) event);
}
}
通过syncToAllServer()方法同步数据给集群中的其它节点:
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
/**
* 判断客户端是否为空,是否是临时实例,判断是否是负责节点
*
* 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
*/
if (isInvalidClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
// 客户端断开连接事件
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
// 客户端变更事件(新增或修改)
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
同步时,会涉及到一个负责节点和非负责节点。
负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。
private boolean isInvalidClient(Client client) {
// 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
return null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client);
}
DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。
DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。
对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。
public void sync(DistroKey distroKey, DataOperation action) {
// 配置同步延迟的时间:默认为1s
sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
// com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync()
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历每个除自己以外的其它成员
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
核心逻辑在syncToTarget()方法:
```java
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
// distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
// 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。
/**
* 任务队列
* key:对应的服务
*/
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
protected final ReentrantLock lock = new ReentrantLock();
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
// 初始化任务队列
tasks = new ConcurrentHashMap<>(initCapacity);
// 创建定时任务的线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
// 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
public void addTask(Object key, AbstractDelayTask newTask) {
// 加锁防并发处理,key就是对应的服务
lock.lock();
try {
// ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
// 通过key判断是否已存在map中
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
// 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
newTask.merge(existTask);
}
// 将任务放入到map中,等待处理
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
/**
* 任务处理类
*/
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
// 从队列中移除这个任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
// 找到处理类
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
// 处理失败的话,重新入队(即重试)
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error ", e);
retryFailedTask(taskKey, task);
}
}
}
在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
// 设置默认的处理器
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
也就是说,在调用syncToTarget()方法后,会触发任务DistroDelayTaskProcessor处理任务。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于新增、修改类型的任务:
DistroSyncChangeTask。
// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction()) {
// unregister注册的是DELETE事件
case DELETE:
// 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
// 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
}
}
这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。
可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:
public void addTask(Object tag, AbstractExecuteTask task) {
// 获取处理类
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
// 不为空,就用对应的processor处理
processor.process(task);
return;
}
// 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
/**
* 阻塞队列, 类型为Runnable,说明存入的是一个线程
*/
private final BlockingQueue<Runnable> queue;
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
// 添加任务到阻塞队列中
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
// 阻塞队列
this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
// 内部执行worker,实际上是一个线程
realWorker = new InnerWorker(this.name);
// 启动worker
realWorker.start();
}
在TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
// 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
Runnable task = queue.take();
long begin = System.currentTimeMillis();
// 执行任务
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e, e);
}
}
}
}
调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:
// AbstractDistroExecuteTask#run
public void run() {
String type = getDistroKey().getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
if (null == transportAgent) {
Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
return;
}
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
if (transportAgent.supportCallbackTransport()) {
doExecuteWithCallback(new DistroExecuteCallback());
} else {
executeDistroTask();
}
}
private void executeDistroTask() {
try {
boolean result = doExecute();
if (!result) {
// 失败重试
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();
最终将会执行DistroSyncChangeTask的doExecute()方法:
protected boolean doExecute() {
String type = getDistroKey().getResourceType();
// 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
DistroData distroData = getDistroData(type);
if (null == distroData) {
Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
return true;
}
// 将得到的数据同步给其他服务节点
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
}
获取同步数据getDistroData
这里获取同步数据其实是从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client。
private DistroData getDistroData(String type) {
// 其实是从ClientManager实时获取Client
DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
if (null != result) {
result.setType(OPERATION);
}
return result;
}
// DistroClientDataProcessor#getDistroData
public DistroData getDistroData(DistroKey distroKey) {
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client) {
return null;
}
// 把生成的同步数据放入到数组中
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
}
可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。
public ClientSyncData generateSyncData() {
List<String> namespaces = new LinkedList<>();
List<String> groupNames = new LinkedList<>();
List<String> serviceNames = new LinkedList<>();
List<String> batchNamespaces = new LinkedList<>();
List<String> batchGroupNames = new LinkedList<>();
List<String> batchServiceNames = new LinkedList<>();
List<InstancePublishInfo> instances = new LinkedList<>();
List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
BatchInstanceData batchInstanceData = new BatchInstanceData();
for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
InstancePublishInfo instancePublishInfo = entry.getValue();
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
batchInstancePublishInfos.add(batchInstance);
buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
} else {
namespaces.add(entry.getKey().getNamespace());
groupNames.add(entry.getKey().getGroup());
serviceNames.add(entry.getKey().getName());
instances.add(entry.getValue());
}
}
ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
data.getAttributes().addClientAttribute(REVISION, getRevision());
return data;
}
执行同步数据syncData
这里的同步实际是由DistroClientTransportAgent来负责的,将数据封装成DistroDataRequest,然后获取目标节点Member,然后调用sendRequest异步方法执行同步:
public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
// 获取目标节点
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO
.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
data.getDistroKey());
return false;
}
try {
// 同步发送 DistroDataRequest 请求
// 真正处理请求是在:com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler.handle方法
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
}
return false;
}
这时我们主要关注非负责节点收到同步请求后如何处理。
非负责节点(接收请求)
当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。
// DistroDataRequestHandler#handle
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
// 变更操作: 维护注册表数据,然后发布事件
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
// 调用DistroProtocol.onReceive方法
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
// DistroProtocol#onReceive
public boolean onReceive(DistroData distroData) {
Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
distroData.getDistroKey());
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
// 通过处理器处理接收到的数据
return dataProcessor.processData(distroData);
}
这里我主要关注ADD/CHANGE,所以主要关注handleSyncData()方法。
public boolean processData(DistroData distroData) {
switch (distroData.getType()) {
case ADD:
case CHANGE:
// 反序列化同步数据为ClientSyncData
ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncData.class);
// 处理同步数据
handlerClientSyncData(clientSyncData);
return true;
case DELETE:
String deleteClientId = distroData.getDistroKey().getResourceKey();
Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
clientManager.clientDisconnected(deleteClientId);
return true;
default:
return false;
}
}
首先反序列化接收到的同步数据,封装成ClientSyncData,然后处理同步数据:
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO
.info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
// 同步客户端连接,生成client:不存在时创建client(IpPortBasedClient)
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
// 获取Client
Client client = clientManager.getClient(clientSyncData.getClientId());
// 更新Client数据
upgradeClient(client, clientSyncData);
}
同步客户端连接,然后获取到客户端对象,更新客户端的注册表信息,并发布一些事件:
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
Set<Service> syncedService = new HashSet<>();
// process batch instance sync logic
processBatchInstanceDistroData(syncedService, client, clientSyncData);
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
// 添加注册表信息,并发布ClientRegisterServiceEvent
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
// 删除注册表信息,并发布ClientDeregisterServiceEvent
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}