【Nacos源码分析01-服务注册与集群间数据是同步】

文章目录

  • 了解CAP
  • BASE理论
  • Nacos支持CP还是AP
  • 集群数据同步
  • 实现集群数据一致性源码

了解CAP

在这里插入图片描述
CAP理论的核心观点是,一个分布式系统无法同时完全满足一致性、可用性和分区容错性这三个特性。具体而言,当发生网络分区时,系统必须在一致性和可用性之间做出选择:
CA(一致性和可用性):
系统在正常情况下能够保证一致性和可用性,但无法应对网络分区故障。一旦发生网络分区,系统必须放弃一致性或可用性中的一个。
CP(一致性和分区容错性):
系统在网络分区情况下能够保证一致性和分区容错性,但可能会牺牲部分可用性,即某些请求可能不会得到响应。
AP(可用性和分区容错性):
系统在网络分区情况下能够保证可用性和分区容错性,但可能会牺牲一致性,即不同节点可能会看到不一致的数据。
所谓的CAP定理,就是指在一个分布式系统中,CAP这三个指标,最多同时只能满足其中的两个,不可能三个都同时满足
实际应用中的选择
在实际应用中,不同的系统设计会根据具体需求在CAP三者之间进行权衡。
例如:
金融系统:通常更重视一致性(CP),确保数据的一致性和准确性,即使这可能会牺牲部分可用性。
社交媒体平台:通常更重视可用性和分区容错性(AP),确保用户始终可以访问和发布内容,即使这可能会导致短时间内的数据不一致。

BASE理论

BASE理论主要是包括以下三点:
基本可用(Basically Available):系统出现故障还是能够对外提供服务,不至于直接无法用了
软状态(Soft State):允许各个节点的数据不一致
最终一致性,(Eventually Consistent):虽然允许各个节点的数据不一致,但是在一定时间之后,各个节点的数据最终需要一致的
BASE理论其实就是妥协之后的产物。

Nacos支持CP还是AP

Nacos其实目前是同时支持AP和CP的
具体使用AP还是CP得取决于Nacos内部的具体功能,并不是有的文章说的可以通过一个配置自由切换。
就以服务注册举例来说,对于临时实例来说,Nacos会优先保证可用性,也就是AP
对于永久实例,Nacos会优先保证数据的一致性,也就是CP

集群数据同步

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。
具体的代码分析:
1.先开始从服务注册接口开始:
地址:com.alibaba.nacos.naming.controllers.v2.InstanceControllerV2.java
服务注册

@CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public Result<String> register(InstanceForm instanceForm) throws NacosException {
        // check param
        instanceForm.validate();
        checkWeight(instanceForm.getWeight());
        // build instance
        Instance instance = buildInstance(instanceForm);
        instanceServiceV2.registerInstance(instanceForm.getNamespaceId(), buildCompositeServiceName(instanceForm), instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
                false, instanceForm.getNamespaceId(), instanceForm.getGroupName(), instanceForm.getServiceName(),
                instance.getIp(), instance.getPort()));
        return Result.success("ok");
    }

2.添加实例

 @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //参数校验
        NamingUtils.checkInstanceIsLegal(instance);
        //判断是临时节点还是永久节点
        boolean ephemeral = instance.isEphemeral();
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        createIpPortClientIfAbsent(clientId);
        Service service = getService(namespaceId, serviceName, ephemeral);
        clientOperationService.registerInstance(service, instance, clientId);
    }
 @Override
    public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
    
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        client.recalculateRevision();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

3.添加并且同步

/**
     * Request publisher publish event Publishers load lazily, calling publisher.
     *
     * @param eventType class Instances type of the event type.
     * @param event     event instance.
     */
    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);
        }
        
        final String topic = ClassUtils.getCanonicalName(eventType);
        
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            return publisher.publish(event);
        }
        if (event.isPluginEvent()) {
            return true;
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }
  public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
            receiveEvent(event);
            return true;
        }
        return true;
    }
void receiveEvent(Event event) {
        final long currentEventSequence = event.sequence();
        
        if (!hasSubscriber()) {
            LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
            return;
        }
        
        // Notification single event listener
        for (Subscriber subscriber : subscribers) {
            if (!subscriber.scopeMatches(event)) {
                continue;
            }
            
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass());
                continue;
            }
            
            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.
            notifySubscriber(subscriber, event);
        }
    }

通知集群节点

 public void notifySubscriber(final Subscriber subscriber, final Event event) {
        
        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        
        final Runnable job = () -> subscriber.onEvent(event);
        final Executor executor = subscriber.executor();
        
        if (executor != null) {
            executor.execute(job);
        } else {
            try {
                job.run();
            } catch (Throwable e) {
                LOGGER.error("Event callback exception: ", e);
            }
        }
    }

实现集群数据一致性源码

进入DistroClientDataProcessor的实现类来执行onEvent

  @Override
    public void onEvent(Event event) {
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }

通过syncToAllServer()方法同步数据给集群中的其它节点:
临时实例,使用distro协议同步; 持久实例:使用raft协议同步

  
    private void syncToAllServer(ClientEvent event) {
     /**
     * 判断客户端是否为空,是否是临时实例,判断是否是负责节点
     * 
     * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
**/
        Client client = event.getClient();
        // Only ephemeral data sync by Distro, persist client should sync by raft.
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
        //客户端断开连接
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
        //客户端变更 新增或者修改
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }

同步时,会涉及到一个负责节点和非负责节点。
负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。
DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。
DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。
对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

   public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }

核心逻辑在syncToTarget

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    // 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
    // distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
    // 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/**
 * 任务队列
 * key:对应的服务
 */
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
 
protected final ReentrantLock lock = new ReentrantLock();
 
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    // 初始化任务队列
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 创建定时任务的线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
    // 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
 
public void addTask(Object key, AbstractDelayTask newTask) {
    // 加锁防并发处理,key就是对应的服务
    lock.lock();
    try {
        // ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
        // 通过key判断是否已存在map中
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            // 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
            newTask.merge(existTask);
        }
        // 将任务放入到map中,等待处理
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
 
/**
 * 任务处理类
 */
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}
 
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        // 从队列中移除这个任务
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
        // 找到处理类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                // 处理失败的话,重新入队(即重试)
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
    DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
    // 设置默认的处理器
    delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        // unregister注册的是DELETE事件
        case DELETE:
            // 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE:
        case ADD:
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            // 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。
可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {
    // 获取处理类
    NacosTaskProcessor processor = getProcessor(tag);
    if (null != processor) {
        // 不为空,就用对应的processor处理
        processor.process(task);
        return;
    }
    // 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
    TaskExecuteWorker worker = getWorker(tag);
    worker.process(task);
}
 
 
/**
 * 阻塞队列, 类型为Runnable,说明存入的是一个线程
 */
private final BlockingQueue<Runnable> queue;
 
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
    if (task instanceof AbstractExecuteTask) {
        // 添加任务到阻塞队列中
        putTask((Runnable) task);
    }
    return true;
}
 
private void putTask(Runnable task) {
    try {
        queue.put(task);
    } catch (InterruptedException ire) {
        log.error(ire.toString(), ire);
    }
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
    this.name = name + "_" + mod + "%" + total;
    // 阻塞队列
    this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    this.closed = new AtomicBoolean(false);
    this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
    // 内部执行worker,实际上是一个线程
    realWorker = new InnerWorker(this.name);
    // 启动worker
    realWorker.start();
}

在TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:

private class InnerWorker extends Thread {
    
    InnerWorker(String name) {
        setDaemon(false);
        setName(name);
    }
    
    @Override
    public void run() {
        while (!closed.get()) {
            try {
                // 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                // 执行任务
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[TASK-FAILED] " + e, e);
            }
        }
    }
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {
    String type = getDistroKey().getResourceType();
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    if (null == transportAgent) {
        Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
        return;
    }
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    if (transportAgent.supportCallbackTransport()) {
        doExecuteWithCallback(new DistroExecuteCallback());
    } else {
        executeDistroTask();
    }
}
 
private void executeDistroTask() {
    try {
        boolean result = doExecute();
        if (!result) {
            // 失败重试
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}
 
// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {
    String type = getDistroKey().getResourceType();
    // 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
    DistroData distroData = getDistroData(type);
    if (null == distroData) {
        Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
        return true;
    }
    // 将得到的数据同步给其他服务节点
    return getDistroComponentHolder().findTransportAgent(type)
            .syncData(distroData, getDistroKey().getTargetServer());
}

八、Nacos源码系列:Nacos集群数据同步

负责节点(发起同步)

DistroProtocol

获取同步数据getDistroData

执行同步数据syncData

非负责节点(接收请求)

在Nacos以集群模式运行时,当Nacos服务器收到服务注册请求后,发生了ClientEvent.ClientChangedEvent事件,就会触发将注册的服务信息同步给集群中的其他Nacos-server节点。

ClientEvent.ClientChangedEvent事件的真正处理类是在DistroClientDataProcessor#onEvent方法:



public void onEvent(Event event) {
    // 只有集群模式才有效,单机模式启动的Nacos,不会执行同步操作
    if (EnvUtil.getStandaloneMode()) {
        return;
    }
    if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
        syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
    } else {
        // 同步数据给其它节点
        syncToAllServer((ClientEvent) event);
    }
}

通过syncToAllServer()方法同步数据给集群中的其它节点:

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    /**
     * 判断客户端是否为空,是否是临时实例,判断是否是负责节点
     * 
     * 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
     */
    if (isInvalidClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        // 客户端断开连接事件
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 客户端变更事件(新增或修改)
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

同步时,会涉及到一个负责节点和非负责节点。

负责节点(发起同步)
也就是收到客户端事件ClientChangedEvent后负责同步信息给其他非负责节点, 所以这里只能有负责节点来进行同步,非负责节点只能接收同步事件。

private boolean isInvalidClient(Client client) {
    // 临时实例,使用distro协议同步; 持久实例:使用raft协议同步
    return null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client);
}

DistroProtocol
Distro是阿里巴巴的私有协议,distro协议是为了注册中心而创造出的协议。

DistroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改。

对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看。

public void sync(DistroKey distroKey, DataOperation action) {
    // 配置同步延迟的时间:默认为1s
    sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
 
// com.alibaba.nacos.core.distributed.distro.DistroProtocol#sync()
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    // 遍历每个除自己以外的其它成员
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}
核心逻辑在syncToTarget()方法:
```java
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    // 往延时任务引擎中加入DistroDelayTask任务,最终将会调用DistroDelayTaskProcessor.process方法
    // distroTaskEngineHolder.getDelayTaskExecuteEngine()返回的是DistroDelayTaskExecuteEngine,它继承自NacosDelayTaskExecuteEngine,
    // 其构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

在syncToTarget()方法中,构建了一个DistroDelayTask任务,然后放到了延时任务引擎中执行,熟悉Nacos服务注册流程的小伙伴对这一块应该不陌生,这里通过distroTaskEngineHolder.getDelayTaskExecuteEngine()返回了一个DistroDelayTaskExecuteEngine执行引擎,它继承自NacosDelayTaskExecuteEngine,而在NacosDelayTaskExecuteEngine的构造方法中初始化了一个定时任务ProcessRunnable,不断从阻塞队列中拿出任务出来执行(ConcurrentHashMap<Object, AbstractDelayTask> tasks)。

/**
 * 任务队列
 * key:对应的服务
 */
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
 
protected final ReentrantLock lock = new ReentrantLock();
 
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    // 初始化任务队列
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 创建定时任务的线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
    // 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
 
public void addTask(Object key, AbstractDelayTask newTask) {
    // 加锁防并发处理,key就是对应的服务
    lock.lock();
    try {
        // ConcurrentHashMap<Object, AbstractDelayTask> tasks = new ConcurrentHashMap<>(initCapacity);
        // 通过key判断是否已存在map中
        AbstractDelayTask existTask = tasks.get(key);
        if (null != existTask) {
            // 服务存在的话,则需要合并任务,其实就是合并多个任务,一起执行
            newTask.merge(existTask);
        }
        // 将任务放入到map中,等待处理
        tasks.put(key, newTask);
    } finally {
        lock.unlock();
    }
}
 
/**
 * 任务处理类
 */
private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}
 
protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        // 从队列中移除这个任务
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        // taskKey示例值: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
        // 找到处理类
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            if (!processor.process(task)) {
                // 处理失败的话,重新入队(即重试)
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error ", e);
            retryFailedTask(taskKey, task);
        }
    }
}

在processTasks()方法中,首先获取处理类,如果获取不到,则使用默认的处理类。在DistroTaskEngineHolder构造方法中,已经设置了默认处理类为DistroDelayTaskProcessor。

public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
// 设置默认的处理器
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
也就是说,在调用syncToTarget()方法后,会触发任务DistroDelayTaskProcessor处理任务。 对于删除类型的任务,触发任务DistroSyncDeleteTask , 对于新增、修改类型的任务:
DistroSyncChangeTask。

// DistroDelayTaskProcessor#process
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        // unregister注册的是DELETE事件
        case DELETE:
            // 添加了DistroSyncDeleteTask执行任务,由 DistroExecuteTaskExecuteEngine 执行
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE:
        case ADD:
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            // 往立即执行的任务引擎中加入DistroSyncChangeTask任务,DistroSyncChangeTask实现了runnable接口,关注其run方法
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}

这里我们以DistroSyncChangeTask为例,来分析整体的同步流程。

可以看到,这里还未开始同步数据的流程,而是又封装了一个DistroSyncChangeTask任务,加入到了Distro任务引擎中,实际上是加入到了TaskExecuteWorker类内部的阻塞任务队列中,如下所示:

public void addTask(Object tag, AbstractExecuteTask task) {
    // 获取处理类
    NacosTaskProcessor processor = getProcessor(tag);
    if (null != processor) {
        // 不为空,就用对应的processor处理
        processor.process(task);
        return;
    }
    // 没有找到处理类的话, 就用公共的TaskExecuteWorker执行
    TaskExecuteWorker worker = getWorker(tag);
    worker.process(task);
}

 
/**
 * 阻塞队列, 类型为Runnable,说明存入的是一个线程
 */
private final BlockingQueue<Runnable> queue;
 
// TaskExecuteWorker#process
public boolean process(NacosTask task) {
    if (task instanceof AbstractExecuteTask) {
        // 添加任务到阻塞队列中
        putTask((Runnable) task);
    }
    return true;
}
 
private void putTask(Runnable task) {
    try {
        queue.put(task);
    } catch (InterruptedException ire) {
        log.error(ire.toString(), ire);
    }
}

这里我们没有找到处理类的话,,就用公共的TaskExecuteWorker执行:

public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
    this.name = name + "_" + mod + "%" + total;
    // 阻塞队列
    this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
    this.closed = new AtomicBoolean(false);
    this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
    // 内部执行worker,实际上是一个线程
    realWorker = new InnerWorker(this.name);
    // 启动worker
    realWorker.start();
}TaskExecuteWorker的构造方法中,初始化了一个内部执行worker,实际上是一个线程,它不断地从阻塞队列中拿出任务,来执行:

private class InnerWorker extends Thread {
    
    InnerWorker(String name) {
        setDaemon(false);
        setName(name);
    }
    
    @Override
    public void run() {
        while (!closed.get()) {
            try {
                // 从阻塞队列获取任务,在process()方法中通过putTask()将任务存入到了阻塞队列中
                Runnable task = queue.take();
                long begin = System.currentTimeMillis();
                // 执行任务
                task.run();
                long duration = System.currentTimeMillis() - begin;
                if (duration > 1000L) {
                    log.warn("task {} takes {}ms", task, duration);
                }
            } catch (Throwable e) {
                log.error("[TASK-FAILED] " + e, e);
            }
        }
    }
}

调用task.run(),我们之前加入的是DistroSyncChangeTask,它继承自AbstractDistroExecuteTask,间接实现了runnable接口,查看其run方法:

// AbstractDistroExecuteTask#run
public void run() {
    String type = getDistroKey().getResourceType();
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
    if (null == transportAgent) {
        Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
        return;
    }
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    if (transportAgent.supportCallbackTransport()) {
        doExecuteWithCallback(new DistroExecuteCallback());
    } else {
        executeDistroTask();
    }
}
 
private void executeDistroTask() {
    try {
        boolean result = doExecute();
        if (!result) {
            // 失败重试
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}

// 由子类DistroSyncChangeTask去实现
protected abstract boolean doExecute();

最终将会执行DistroSyncChangeTask的doExecute()方法:

protected boolean doExecute() {
    String type = getDistroKey().getResourceType();
    // 从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client
    DistroData distroData = getDistroData(type);
    if (null == distroData) {
        Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
        return true;
    }
    // 将得到的数据同步给其他服务节点
    return getDistroComponentHolder().findTransportAgent(type)
            .syncData(distroData, getDistroKey().getTargetServer());
}

获取同步数据getDistroData
这里获取同步数据其实是从DistroClientDataProcessor获取DistroData, 其实是从ClientManager实时获取Client。

private DistroData getDistroData(String type) {
    // 其实是从ClientManager实时获取Client
    DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
    if (null != result) {
        result.setType(OPERATION);
    }
    return result;
}
 
// DistroClientDataProcessor#getDistroData
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    // 把生成的同步数据放入到数组中
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到generateSyncData 方法是关键获取服务的方法,该方法提供了同步数据,包含Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
 
    List<String> batchNamespaces = new LinkedList<>();
    List<String> batchGroupNames = new LinkedList<>();
    List<String> batchServiceNames = new LinkedList<>();
 
    List<InstancePublishInfo> instances = new LinkedList<>();
    List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
    BatchInstanceData  batchInstanceData = new BatchInstanceData();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        InstancePublishInfo instancePublishInfo = entry.getValue();
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
            batchInstancePublishInfos.add(batchInstance);
            buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
            batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
        } else {
            namespaces.add(entry.getKey().getNamespace());
            groupNames.add(entry.getKey().getGroup());
            serviceNames.add(entry.getKey().getName());
            instances.add(entry.getValue());
        }
    }
    ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
    data.getAttributes().addClientAttribute(REVISION, getRevision());
    return data;
}

执行同步数据syncData
这里的同步实际是由DistroClientTransportAgent来负责的,将数据封装成DistroDataRequest,然后获取目标节点Member,然后调用sendRequest异步方法执行同步:

public boolean syncData(DistroData data, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 获取目标节点
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
                .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
                        data.getDistroKey());
        return false;
    }
    try {
        // 同步发送 DistroDataRequest 请求
        // 真正处理请求是在:com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler.handle方法
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
    }
    return false;
}

这时我们主要关注非负责节点收到同步请求后如何处理。
非负责节点(接收请求)
当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。

// DistroDataRequestHandler#handle
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
    try {
        switch (request.getDataOperation()) {
            case VERIFY:
                return handleVerify(request.getDistroData(), meta);
            case SNAPSHOT:
                return handleSnapshot();
            case ADD:
            case CHANGE:
            case DELETE:
                // 变更操作: 维护注册表数据,然后发布事件
                return handleSyncData(request.getDistroData());
            case QUERY:
                return handleQueryData(request.getDistroData());
            default:
                return new DistroDataResponse();
        }
    } catch (Exception e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
        DistroDataResponse result = new DistroDataResponse();
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("handle distro request with exception");
        return result;
    }
}
 
private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
	// 调用DistroProtocol.onReceive方法
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}
 
// DistroProtocol#onReceive
public boolean onReceive(DistroData distroData) {
    Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
            distroData.getDistroKey());
    String resourceType = distroData.getDistroKey().getResourceType();
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
        return false;
    }
    // 通过处理器处理接收到的数据
    return dataProcessor.processData(distroData);
}

这里我主要关注ADD/CHANGE,所以主要关注handleSyncData()方法。

public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            // 反序列化同步数据为ClientSyncData
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            // 处理同步数据
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

首先反序列化接收到的同步数据,封装成ClientSyncData,然后处理同步数据:

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO
            .info("[Client-Add] Received distro client sync data {}, revision={}", clientSyncData.getClientId(),
                    clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
    // 同步客户端连接,生成client:不存在时创建client(IpPortBasedClient)
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    // 获取Client
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 更新Client数据
    upgradeClient(client, clientSyncData);
}

同步客户端连接,然后获取到客户端对象,更新客户端的注册表信息,并发布一些事件:

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    Set<Service> syncedService = new HashSet<>();
    // process batch instance sync logic
    processBatchInstanceDistroData(syncedService, client, clientSyncData);
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 添加注册表信息,并发布ClientRegisterServiceEvent
            client.addServiceInstance(singleton, instancePublishInfo);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            // 删除注册表信息,并发布ClientDeregisterServiceEvent
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
    client.setRevision(clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/666879.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JavaSE:SE知识整体总结

1、引言 历时一个多月的学习&#xff0c;已经掌握了JavaSE的知识&#xff0c;这篇博客就来做一下SE知识的总结~ 2、数据类型和变量 Java中的数据类型分为基本数据类型和引用数据类型。 2.1 基本数据类型 基本数据类型共有四类八种&#xff1a; 四类&#xff1a;整形、浮点…

​​​​​​​Beyond Compare 3密钥被撤销的解决办法

首先&#xff0c;BCompare3的链接如下 链接&#xff1a;https://pan.baidu.com/s/1vuSxY0cVQCt0-8CpFzUhvg 提取码&#xff1a;8888 --来自百度网盘超级会员V7的分享 1.问题现象 激活之后在使用过程中有时候会出现密钥被撤销的警告&#xff0c;而且该工具无法使用&#xff…

渡众机器人自动驾驶小车运行Autoware 实现港口物流运输

Autoware 是一个开源的自动驾驶软件堆栈&#xff0c;提供了丰富的功能和模块&#xff0c;用于实现自动驾驶车辆的感知、定位、规划和控制等功能。北京渡众机器人公司将多款自动驾驶小车在多场景运行Autoware &#xff0c;它可以实现以下功能&#xff1a; 1. 感知&#xff1a;利…

小数第n位【蓝桥杯】

小数第n位 模拟 思路&#xff1a;arr数组用来记录已经出现过的a&#xff0c;在循环时及时退出。易知题目的3位即a%a后的第n-1,n,n1位。该代码非常巧妙&#xff0c;num记录3位的输出状况。 #include<iostream> #include<map> using namespace std; typedef long l…

制造企业如何通过PLM系统实现BOM管理的飞跃

摘要 在当今快速变化的制造行业中&#xff0c;产品生命周期管理&#xff08;PLM&#xff09;系统的应用已成为企业提升效率、降低成本和增强竞争力的关键。本文将探讨PLM系统如何通过其先进的BOM&#xff08;物料清单&#xff09;管理功能&#xff0c;帮助制造企业在整个产品生…

uniApp子组件监听数据的变化的方法之一

props:{//用来接收外界传递过来的数据swiperList:{type:Array,default:[]}}, swiperList&#xff1a;是父组件传递过来的值 通过 watch 监听&#xff08;在父组件中也同样可以使用&#xff0c;跟VUE的监听数据变化同理&#xff09; watch:{//监听组件中的数据变化swiperList(ol…

(深度学习记录)第TR3周:Transformer 算法详解

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 文本的输入处理中&#xff0c;transformer会将输入文本序列的每个词转化为一个词向量&#xff0c;我们通常会选择一个合适的长度作为输入…

C. Turtle and an Incomplete Sequence

思路&#xff1a;首先如果都是-1的话&#xff0c;我们可以输出1 2循环&#xff0c;否则。 首先处理首尾的连续-1串&#xff0c;这个也很好处理&#xff0c;最后我们处理两个非-1之间的-1串。 对于左边的数a[l]和右边的数a[r]&#xff1a; 如果a[l] > a[r]&#xff0c;那么…

CompassArena 司南大模型测评--代码编写

测试角度 要说测试模型&#xff0c;对咱们程序员来说&#xff0c;那自然是写代码的能力强不强比较重要了。那么下面我们以 leetcode 中的一道表面上是困难题的题目来考考各家大模型&#xff0c;看看哪个才应该是咱们日常写程序的帮手。 部分模型回答 问题部分如下截图&#…

ArcGIS教程(05):计算服务区和创建 OD 成本矩阵

准备视图 启动【ArcMap】->双击打开【Exercise05.mxd】->启用【Network Analyst 扩展模块】。前面的文章已经讲过&#xff0c;这里不再赘述。 创建服务区分析图层 1、在 Network Analyst 工具栏上&#xff0c;单击 【Network Analyst】&#xff0c;然后单击【新建服务…

CAC2.0全生命周期防护,助力企业构建安全闭环

5月29日&#xff0c;CACTER邮件安全团队凭借多年的邮件安全防护经验&#xff0c;在“防御邮件威胁-企业如何筑起最后防线”直播分享会上展示了构建安全闭环的重要性&#xff0c;并深入介绍了全新CAC 2.0中的“威胁邮件提示”功能。 下滑查看更多直播精彩内容 构建安全闭环的必要…

Solidity学习-投票合约示例

以下的合约有一些复杂&#xff0c;但展示了很多Solidity的语言特性。它实现了一个投票合约。 当然&#xff0c;电子投票的主要问题是如何将投票权分配给正确的人员以及如何防止被操纵。 我们不会在这里解决所有的问题&#xff0c;但至少我们会展示如何进行委托投票&#xff0c;…

显示器与电脑如何分屏显示?

1.点击电脑屏幕右键--显示设置 2、然后找到屏幕---找到多显示器---选择扩展显示器

亚马逊云科技峰会盛大举行 | 光环新网携光环云数据以高性能智算服务助力企业创新发展

2024年5月29日&#xff0c;云计算行业的年度盛典”2024亚马逊云科技中国峰会”在上海世博中心再次盛大启幕。作为全球领先的云计算服务提供商&#xff0c;亚马逊云科技峰会聚焦前沿科技&#xff0c;与来自不同行业、不同科技领域的优秀企业和用户共同探索AI时代的云端创新发展。…

游戏逆向工具分析及解决方案

游戏逆向&#xff0c;是指通过各类工具对游戏进行反编译及源码分析&#xff0c;尝试分析游戏的实现逻辑的过程。这个过程需要使用解密、反编译、解压缩等技术&#xff0c;目的是还原或分析出游戏的代码逻辑及资源。 游戏逆向工具可以按照不同功能进行划分&#xff0c;如&#…

分布式任务队列系统 celery 原理及入门

基本 Celery 是一个简单、灵活且可靠的分布式任务队列系统&#xff0c;用于在后台执行异步任务处理大量消息。支持任务调度、任务分发和结果存储&#xff0c;并且可以与消息代理&#xff08;如 RabbitMQ、Redis 等&#xff09;一起工作&#xff0c;以实现任务的队列管理和执行…

对于vsc中的vue命令 vue.json

打开vsc 然后在左下角有一个设置 2.点击用户代码片段 3.输入 vue.json回车 将此代码粘贴 &#xff08;我的不一定都适合&#xff09; { "vue2 template": { "prefix": "v2", "body": [ "<template>", " <…

更新详情 | Flutter 3.22 与 Dart 3.4

作者 / Michael Thomsen 过去几个月&#xff0c;Dart & Flutter 部门可谓忙碌非凡&#xff0c;但我们很高兴地宣布&#xff0c;Flutter 3.22 和 Dart 3.4 已经在今年的 Google I/O 大会上精彩亮相&#xff01; Google I/Ohttps://io.google/2024/intl/zh/ 我们始终致力于提…

【调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站】

调试笔记-系列文章目录 调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站 文章目录 调试笔记-系列文章目录调试笔记-20240530-Linux-在 OpenWRT-23.05 上为 nginx 配置 HTTPS 网站 前言一、调试环境操作系统&#xff1a;OpenWrt 23.05.3调试环境调试目标…

CS61C | lecture2

# CS61C | lecture2 C 语言是一种编译语言。C 编译器将 C 程序映射到特定与体系结构的机器代码(实际上是一串 0 和 1)。 而 Java 会通过 JVM(Java 虚拟机) 将代码转换为独立于架构的字节码。 Python 则会直接解释代码。C 不会直接解释代码&#xff0c;而是将其编译成机器代码之…