十四、Nacos源码系列:Nacos配置发布原理

目录

一、简介

二、加密处理

三、发布配置

3.1、插入或更新配置信息

3.2、发布配置数据变动事件

3.2.1、目标节点是当前节点

3.2.2、目标节点非当前节点

四、总结


一、简介

一般情况下,我们是通过Nacos提供的Web控制台登录,然后通过界面新增配置信息。后续客户端只要配置了对应的NameSpace,Group,DataId就可以在客户端获取到对应的配置信息。既然这样,Nacos服务端肯定会存储在Web控制台配置的配置信息。

Web控制台发布配置的入口肯定也是一个controller接口:com.alibaba.nacos.config.server.controller.ConfigController#publishConfig。

@PostMapping
@TpsControl(pointName = "ConfigPublish")
@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema,
        @RequestParam(required = false) String encryptedDataKey) throws NacosException {
    
    String encryptedDataKeyFinal = null;
    // 内容加密
    if (StringUtils.isNotBlank(encryptedDataKey)) {
        encryptedDataKeyFinal = encryptedDataKey;
    } else {
        // 使用到插件化的思想进行加密
        Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
        content = pair.getSecond();
        encryptedDataKeyFinal = pair.getFirst();
    }

    // 参数检查
    ParamUtils.checkTenant(tenant);
    ParamUtils.checkParam(dataId, group, "datumId", content);
    ParamUtils.checkParam(tag);
    // 构造配置信息,包括namespaceId、groupId、dataId、配置内容、描述信息等
    ConfigForm configForm = new ConfigForm();
    configForm.setDataId(dataId);
    configForm.setGroup(group);
    configForm.setNamespaceId(tenant);
    configForm.setContent(content);
    configForm.setTag(tag);
    configForm.setAppName(appName);
    configForm.setSrcUser(srcUser);
    configForm.setConfigTags(configTags);
    configForm.setDesc(desc);
    configForm.setUse(use);
    configForm.setEffect(effect);
    configForm.setType(type);
    configForm.setSchema(schema);
    
    if (StringUtils.isBlank(srcUser)) {
        configForm.setSrcUser(RequestUtil.getSrcUserName(request));
    }
    if (!ConfigType.isValidType(type)) {
        configForm.setType(ConfigType.getDefaultType().getType());
    }
    // 构造请求对象
    ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();
    configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
    configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
    configRequestInfo.setBetaIps(request.getHeader("betaIps"));
    // 发布配置
    return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKeyFinal);
}

上述的代码主要完成了五件事情:

  • 1、加密处理
  • 2、参数检查
  • 3、构造配置信息
  • 4、构造请求对象
  • 5、发布配置

下面我们分析一些重要代码。

二、加密处理

加密处理使用了插件化思想。我们分析下插件化的思想,看看是如何使用插件或者扩展来进行加解密的。

public static Pair<String, String> encryptHandler(String dataId, String content) {
    // 检查是否需要加密
    if (!checkCipher(dataId)) {
        return Pair.with("", content);
    }
    Optional<String> algorithmName = parseAlgorithmName(dataId);
    // 获取加密的处理类
    // EncryptionPluginManager.instance(): 返回单例实例
    Optional<EncryptionPluginService> optional = algorithmName
            .flatMap(EncryptionPluginManager.instance()::findEncryptionService);
    if (!optional.isPresent()) {
        LOGGER.warn("[EncryptionHandler] [encryptHandler] No encryption program with the corresponding name found");
        // 获取不到,还是走非加密型
        return Pair.with("", content);
    }
    EncryptionPluginService encryptionPluginService = optional.get();
    // 根据扩展的插件类,获取密钥
    String secretKey = encryptionPluginService.generateSecretKey();
    // 利用密钥加密
    String encryptContent = encryptionPluginService.encrypt(secretKey, content);
    return Pair.with(encryptionPluginService.encryptSecretKey(secretKey), encryptContent);
}

首先判断是否需要处理加密,如果需要的话,去插件里面获取对应的处理类,如果获取不到则打日志,然后使用非加密方式进行处理;获取到加密插件,利用插件获取秘钥,然后再加密。

我们来分析下如何获取加密处理类的:

Optional<EncryptionPluginService> optional = algorithmName
.flatMap(EncryptionPluginManager.instance()::findEncryptionService)

这个EncryptionPluginManager.instance()执行返回的是一个单例对象,看看它的构造方法:

private EncryptionPluginManager() {
    // 初始化: 根据自己写的扩展机制,获取EncryptionPluginService,然后再进行反射初始化。
    loadInitial();
}

private void loadInitial() {
    // 通过NacosServiceLoader扩展机制,获取EncryptionPluginService加密处理类的集合
    Collection<EncryptionPluginService> encryptionPluginServices = NacosServiceLoader.load(
            EncryptionPluginService.class);
    for (EncryptionPluginService encryptionPluginService : encryptionPluginServices) {
        if (StringUtils.isBlank(encryptionPluginService.algorithmName())) {
            LOGGER.warn("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName(null/empty) fail."
                    + " Please Add algorithmName to resolve.", encryptionPluginService.getClass());
            continue;
        }
        // 放入集合
        ENCRYPTION_SPI_MAP.put(encryptionPluginService.algorithmName(), encryptionPluginService);
        LOGGER.info("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName({}) successfully.",
                encryptionPluginService.getClass(), encryptionPluginService.algorithmName());
    }
}

因为是单例,所以获取单例的时候通过loadInitial()进行初始化,初始化的时候会根据自己写的扩展机制,获取EncryptionPluginService加密处理类集合,然后再进行反射初始化,并缓存起来。

重要的还是这种插件化的思想,它仅仅依赖于原生JDK的SPI机制,可以按需扩展和定制:

  • 1、提供给插件化的接口,由第三方去实现(自定义功能);
  • 2、在初始化的时候,Nacos去加载处理类;

三、发布配置

发布配置调用的是ConfigOperationService#publishConfig方法:

public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)
        throws NacosException {
    // 将配置高级信息转成Map键值对
    Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
    // 检查参数
    ParamUtils.checkParam(configAdvanceInfo);
    
    if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {
        LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),
                configForm.getDataId(), configForm.getGroup());
        throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,
                "dataId:" + configForm.getDataId() + " is aggr");
    }

    // 构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容
    ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(),
            configForm.getNamespaceId(), configForm.getAppName(), configForm.getContent());
    
    configInfo.setType(configForm.getType());
    configInfo.setEncryptedDataKey(encryptedDataKey);
    ConfigOperateResult configOperateResult = null;
    
    String persistEvent = ConfigTraceService.PERSISTENCE_EVENT;

    // 判断是否是beta测试版本
    if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {
        // 正常发布,大部分情况下,我们都没有指定tag
        if (StringUtils.isBlank(configForm.getTag())) {
            // 1、插入 or 更新配置信息
            // 这里分为内置数据库(EmbeddedConfigInfoPersistServiceImpl)和外置数据库(ExternalConfigInfoPersistServiceImpl)操作,通常我们都是使用MySQL进行持久化存储
            configOperateResult = configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(),
                    configForm.getSrcUser(), configInfo, configAdvanceInfo);

            // 2、发布配置数据变动事件
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
                            configForm.getNamespaceId(), configOperateResult.getLastModified()));
        } else {
            // 指定tag
            persistEvent = ConfigTraceService.PERSISTENCE_EVENT_TAG + "-" + configForm.getTag();
            configOperateResult = configInfoTagPersistService.insertOrUpdateTag(configInfo, configForm.getTag(),
                    configRequestInfo.getSrcIp(), configForm.getSrcUser());
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
                            configForm.getNamespaceId(), configForm.getTag(),
                            configOperateResult.getLastModified()));
        }
    } else {
        persistEvent = ConfigTraceService.PERSISTENCE_EVENT_BETA;
        
        // beta publish
        configOperateResult = configInfoBetaPersistService.insertOrUpdateBeta(configInfo,
                configRequestInfo.getBetaIps(), configRequestInfo.getSrcIp(), configForm.getSrcUser());
        ConfigChangePublisher.notifyConfigChange(
                new ConfigDataChangeEvent(true, configForm.getDataId(), configForm.getGroup(),
                        configForm.getNamespaceId(), configOperateResult.getLastModified()));
    }
    // 日志跟踪
    ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(),
            configForm.getNamespaceId(), configRequestInfo.getRequestIpApp(), configOperateResult.getLastModified(),
            InetUtils.getSelfIP(), persistEvent, ConfigTraceService.PERSISTENCE_TYPE_PUB, configForm.getContent());
    
    return true;
}

首先组装好一些参数,我们需要重点关注的是构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容。然后包含一些测试版本和tag的分支逻辑判断,我们关注最常用的正常发布流程。

通常情况下,我们发布配置,都不指定tag,其实就做了两件事:

  • 1、插入或更新配置信息
  • 2、发布配置数据变动事件

3.1、插入或更新配置信息

插入或更新配置信息,其实就是操作数据库,数据库操作分为了内置数据库和外置数据库,我们通常使用外置数据库MySQL来存储配置信息,也就是ExternalConfigInfoPersistServiceImpl,内置数据库对应的操作类是EmbeddedConfigInfoPersistServiceImpl。

我们这里主要分析外置数据库MySQL的方式:ExternalConfigInfoPersistServiceImpl#insertOrUpdate

public ConfigOperateResult insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo,
        Map<String, Object> configAdvanceInfo) {
    // 没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新。
    try {
        // 添加配置信息
        return addConfigInfo(srcIp, srcUser, configInfo, configAdvanceInfo);
    } catch (DuplicateKeyException ive) { // Unique constraint conflict
        // 如果报唯一约束冲突,则更新配置内容
        return updateConfigInfo(configInfo, srcIp, srcUser, configAdvanceInfo);
    }
}

从源码可以看到,这里没有直接判断是新增还是更新配置,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新。

我们先看下新增配置addConfigInfo:

public ConfigOperateResult addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
        final Map<String, Object> configAdvanceInfo) {
    return tjt.execute(status -> {
        try {
            // jdbcTemplate操作,自动插入到数据库表(config_info)中,返回主键id
            long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, configAdvanceInfo);
            String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
            // 新增tag管理
            addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                    configInfo.getTenant());
            Timestamp now = new Timestamp(System.currentTimeMillis());
            // 插入历史数据到表中(his_config_info)
            historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I");
            ConfigInfoStateWrapper configInfoCurrent = this.findConfigInfoState(configInfo.getDataId(),
                    configInfo.getGroup(), configInfo.getTenant());
            if (configInfoCurrent == null) {
                return new ConfigOperateResult(false);
            }
            return new ConfigOperateResult(configInfoCurrent.getId(), configInfoCurrent.getLastModified());
            
        } catch (CannotGetJdbcConnectionException e) {
            LogUtil.FATAL_LOG.error("[db-error] " + e, e);
            throw e;
        }
    });
}

插入数据库的操作是在addConfigInfoAtomic()方法:

public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
        final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {
    // 取出配置信息
    final String appNameTmp =
            StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
    final String tenantTmp =
            StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
    
    final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");
    final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");
    final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
    final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
    final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
    final String encryptedDataKey =
            configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();

    // 将配置内容进行MD5加密
    final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
    
    KeyHolder keyHolder = new GeneratedKeyHolder();

    // 根据数据库表获取对应的mapper, 通过插件化的形式, 灵活应对使用不同数据库的场景
    ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
            TableConstant.CONFIG_INFO);
    // 将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句
    final String sql = configInfoMapper.insert(
            Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
                    "gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
                    "encrypted_data_key"));
    // 获取主键名称,默认值为id
    String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
    try {
        jt.update(new PreparedStatementCreator() {
            @Override
            public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
                Timestamp now = new Timestamp(System.currentTimeMillis());

                // 通过预编译的PreparedStatement,设置每个字段的值
                PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
                ps.setString(1, configInfo.getDataId());
                ps.setString(2, configInfo.getGroup());
                ps.setString(3, tenantTmp);
                ps.setString(4, appNameTmp);
                ps.setString(5, configInfo.getContent());
                ps.setString(6, md5Tmp);
                ps.setString(7, srcIp);
                ps.setString(8, srcUser);
                ps.setTimestamp(9, now);
                ps.setTimestamp(10, now);
                ps.setString(11, desc);
                ps.setString(12, use);
                ps.setString(13, effect);
                ps.setString(14, type);
                ps.setString(15, schema);
                ps.setString(16, encryptedDataKey);
                return ps;
            }
        }, keyHolder);
        Number nu = keyHolder.getKey();
        if (nu == null) {
            throw new IllegalArgumentException("insert config_info fail");
        }
        return nu.longValue();
    } catch (CannotGetJdbcConnectionException e) {
        LogUtil.FATAL_LOG.error("[db-error] " + e, e);
        throw e;
    }
}

首先取出配置信息,对配置的内容进行MD5加密,然后根据数据库表获取对应的mapper,这里还是通过插件化的形式,灵活应对使用不同数据库的场景。

获取到mapper之后,将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句,最后通过JdbcTemplate执行sql,完成配置的插入。

我们再来分析下如何利用插件化思想完成对mapper的获取的:

ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
                TableConstant.CONFIG_INFO);

public <R extends Mapper> R findMapper(String dataSource, String tableName) {
    LOGGER.info("[MapperManager] findMapper dataSource: {}, tableName: {}", dataSource, tableName);
    if (StringUtils.isBlank(dataSource) || StringUtils.isBlank(tableName)) {
        throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE, "dataSource or tableName is null");
    }
    // 从SPI缓存中获取,这个是在MapperManager构造方法中初始化的
    Map<String, Mapper> tableMapper = MAPPER_SPI_MAP.get(dataSource);
    if (Objects.isNull(tableMapper)) {
        throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE,
                "[MapperManager] Failed to find the datasource,dataSource:" + dataSource);
    }
    // 根据表名称获取mapper
    Mapper mapper = tableMapper.get(tableName);
    if (Objects.isNull(mapper)) {
        throw new NacosRuntimeException(FIND_TABLE_ERROR_CODE,
                "[MapperManager] Failed to find the table ,tableName:" + tableName);
    }
    if (dataSourceLogEnable) {
        return (R) MapperProxy.createSingleProxy(mapper);
    }
    return (R) mapper;
}

首先从MAPPER_SPI_MAP缓存中获取,这个是在MapperManager构造方法中初始化的。然后根据表名称获取到对应的mapper。

这个MAPPER_SPI_MAP初始化也和之前EncryptionPluginService的一样,在单例的构造方法中加载: 

private MapperManager() {
    loadInitial();
}

public void loadInitial() {
    Collection<Mapper> mappers = NacosServiceLoader.load(Mapper.class);
    for (Mapper mapper : mappers) {
        Map<String, Mapper> mapperMap = MAPPER_SPI_MAP.computeIfAbsent(mapper.getDataSource(), (r) -> new HashMap<>(16));
        mapperMap.put(mapper.getTableName(), mapper);
        LOGGER.info("[MapperManager] Load Mapper({}) datasource({}) tableName({}) successfully.",
                mapper.getClass(), mapper.getDataSource(), mapper.getTableName());
    }
}

 我们也可以看到Nacos源码加载的Mapper插件:

获取到插件配置的具体mapper实现类后,在调用mapper.Mapper#insert()方法时,就可以根据插件的扩展,通过不同的实现类去处理了,就能解决不同数据库类型中sql存在差异的问题。

更新配置的大体流程跟新增一样,首先查出旧的配置信息,然后做一些判断,最后根据dataType和表名称获取对应的mapper,然后组装好sql,通过JdbcTemplate执行。

3.2、发布配置数据变动事件

ConfigChangePublisher.notifyConfigChange(
                new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
                        configForm.getNamespaceId(), configOperateResult.getLastModified()));

public static void notifyConfigChange(ConfigDataChangeEvent event) {
    // 如果是内部存储并且Nacos非单机模式启动,就不处理了
    if (DatasourceConfiguration.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
        return;
    }
    NotifyCenter.publishEvent(event);
}

可以看到,还是利用了Nacos的事件统一发布中心NotifyCenter类,我们直接查找ConfigDataChangeEvent的onEvent方法来查看处理逻辑。

public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // 注册ConfigDataChangeEvent到NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // 注册一个订阅ConfigDataChangeEvent事件的处理类
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);

                // 获取所有的Nacos服务节点(包括当前客户端)
                Collection<Member> ipList = memberManager.allMembers();
                
                // 创建一个队列,将相关配置的其他服务节点都存放进来
                Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
                
                for (Member member : ipList) {
                    // grpc report data change only
                    rpcQueue.add(
                            new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, evt.isBatch,
                                    member));
                }
                if (!rpcQueue.isEmpty()) {
                    // 通过线程池执行异步通知
                    // AsyncRpcTask实现了runnable接口,关注其run方法
                    ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                }
                
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

在AsyncNotifyService构造方法中,将ConfigDataChangeEvent事件注册到NotifyCenter通知中心,然后还注册一个订阅ConfigDataChangeEvent事件的处理类。

AsyncNotifyService使用spring进行托管,在IOC容器启动的时候,就会创建这个bean对象,就会执行AsyncNotifyService构造方法。我们重点关注onEvent()具体的事件处理逻辑:

  • 1、获取所有的Nacos服务节点(包括当前客户端)
  • 2、创建一个队列,将相关配置的其他服务节点都存放进来
  • 3、通过线程池执行异步通知
// 获取所有的Nacos服务节点(包括当前客户端)
Collection<Member> ipList = memberManager.allMembers();

// 创建一个队列,将相关配置的其他服务节点都存放进来
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();

for (Member member : ipList) {
    // grpc report data change only
    rpcQueue.add(
            new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, evt.isBatch,
                    member));
}
if (!rpcQueue.isEmpty()) {
    // 通过线程池执行异步通知
    // AsyncRpcTask实现了runnable接口,关注其run方法
    ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}

获取到服务列表后,通过线程池调用异步任务AsyncRpcTask,AsyncRpcTask实现了Runnable接口,看看run()的逻辑:

class AsyncRpcTask implements Runnable {
    
    private Queue<NotifySingleRpcTask> queue;
    
    public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
        // 构造方法放入rpcTask的队列
        this.queue = queue;
    }
    
    @Override
    public void run() {
        while (!queue.isEmpty()) {
            // 从队列中取出任务
            NotifySingleRpcTask task = queue.poll();
            // 构造配置变动集群同步请求
            ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
            syncRequest.setDataId(task.getDataId());
            syncRequest.setGroup(task.getGroup());
            syncRequest.setBeta(task.isBeta);
            syncRequest.setLastModified(task.getLastModified());
            syncRequest.setTag(task.tag);
            syncRequest.setBatch(task.isBatch);
            syncRequest.setTenant(task.getTenant());

            // 通知的目标节点
            Member member = task.member;
            // 如果是当前节点,直接调用dumpService执行dump操作
            if (memberManager.getSelf().equals(member)) {
                if (syncRequest.isBeta()) {
                    dumpService.dumpBeta(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                            syncRequest.getLastModified(), NetUtils.localIP());
                } else if (syncRequest.isBatch()) {
                    dumpService.dumpBatch(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                            syncRequest.getLastModified(), NetUtils.localIP());
                } else if (StringUtils.isNotBlank(syncRequest.getTag())) {
                    dumpService.dumpTag(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                            syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                } else {
                    dumpService.dumpFormal(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                            syncRequest.getLastModified(), NetUtils.localIP());
                }
                continue;
            }
            String event = getNotifyEvent(task);
            
            if (memberManager.hasMember(member.getAddress())) {
                // 启动健康检查,有IP未被监控,直接放入通知队列,否则通知
                boolean unHealthNeedDelay = isUnHealthy(member.getAddress());
                if (unHealthNeedDelay) {
                    // 目标 IP 运行状况不健康,然后将其放入通知列表中
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                            task.getLastModified(), InetUtils.getSelfIP(), event,
                            ConfigTraceService.NOTIFY_TYPE_UNHEALTH, 0, member.getAddress());

                    // 异步任务执行
                    // 可延迟的处理,因为是不健康的节点,不知道什么时候能恢复
                    asyncTaskExecute(task);
                } else {
                    // 发送grpc请求
                    try {
                        configClusterRpcClientProxy.syncConfigChange(member, syncRequest,
                                new AsyncRpcNotifyCallBack(task));
                    } catch (Exception e) {
                        MetricsMonitor.getConfigNotifyException().increment();
                        asyncTaskExecute(task);
                    }
                    
                }
            } else {
                //No nothing if  member has offline.
            }
            
        }
    }
}

只要队列不为空,就会从队列中取出NotifySingleRpcTask任务来执行,然后构造配置变动集群同步的请求对象,包括namespaceId、dataId、groupId、标签等,然后通知目标节点。

3.2.1、目标节点是当前节点

如果目标节点是当前节点,则会直接调用dumpService执行dump操作,其实就是更新本地内存和磁盘中的配置信息为最新的配置信息。 

public void dumpFormal(String dataId, String group, String tenant, long lastModified, String handleIp) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = dataId + group + tenant;
    // 将DumpTask添加到TaskManager任务管理器,它将异步执行
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, false, false, null, lastModified, handleIp));
    DUMP_LOG.info("[dump] add formal task. groupKey={}", groupKey);
}

上面是将task放入到了TaskManager中,那在哪里执行的呢?我们看下DumpService的构造方法:

public DumpService(ConfigInfoPersistService configInfoPersistService,
        NamespacePersistService namespacePersistService,
        HistoryConfigInfoPersistService historyConfigInfoPersistService,
        ConfigInfoAggrPersistService configInfoAggrPersistService,
        ConfigInfoBetaPersistService configInfoBetaPersistService,
        ConfigInfoTagPersistService configInfoTagPersistService, ServerMemberManager memberManager) {
    this.configInfoPersistService = configInfoPersistService;
    this.namespacePersistService = namespacePersistService;
    this.historyConfigInfoPersistService = historyConfigInfoPersistService;
    this.configInfoAggrPersistService = configInfoAggrPersistService;
    this.configInfoBetaPersistService = configInfoBetaPersistService;
    this.configInfoTagPersistService = configInfoTagPersistService;
    this.memberManager = memberManager;
    this.processor = new DumpProcessor(this);
    this.dumpAllProcessor = new DumpAllProcessor(this);
    this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
    this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
    // 创建一个TaskManager
    this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
    // 设置默认的Processor处理(DumpProcessor)
    this.dumpTaskMgr.setDefaultTaskProcessor(processor);
    
    this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
    this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
    
    this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
    this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
    this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
    
    DynamicDataSource.getInstance().getDataSource();
}

可以看到,创建了一个任务管理器TaskManager,并设置了默认的处理类DumpProcessor。

我们再看下TaskManager的构造方法:

public TaskManager(String name) {
    super(name, LOGGER, 100L);
    this.name = name;
}

TaskManager继承自NacosDelayTaskExecuteEngine延时任务执行引擎,所以实际上执行的是:

/**
 * 定时任务线程池,在构造方法中初始化
 */
private final ScheduledExecutorService processingExecutor;

/**
 * 任务队列
 * key:对应的服务
 */
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    // 初始化任务队列
    tasks = new ConcurrentHashMap<>(initCapacity);
    // 创建定时任务的线程池
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    // 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
    // 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

熟悉nacos服务注册流程的小伙伴对这一块应该不陌生,服务注册也是大量使用到任务引擎。从上面的代码中,我们可以看到,NacosDelayTaskExecuteEngine内部包含一个阻塞队列,用来存放任务的,然后初始化了一个定时执行的线程池,每隔100毫秒周期性执行ProcessRunnable。ProcessRunnable的run方法中就是从阻塞队列中不单取出任务来执行,查看是否有对应的处理类,如果没有就用默认的处理类。

在本例中,实际上就是用的默认的处理类DumpProcessor。 我们查看DumpProcessor#process具体的处理方法:

public boolean process(NacosTask task) {
    DumpTask dumpTask = (DumpTask) task;
    String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
    String dataId = pair[0];
    String group = pair[1];
    String tenant = pair[2];
    long lastModified = dumpTask.getLastModified();
    String handleIp = dumpTask.getHandleIp();
    boolean isBeta = dumpTask.isBeta();
    String tag = dumpTask.getTag();

    // 构建ConfigDumpEventBuild
    ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
            .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
    
    if (isBeta) {
        // 如果发布测试版,则转储配置,更新测试版缓存
        ConfigInfo4Beta cf = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
        
        build.remove(Objects.isNull(cf));
        build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
        build.content(Objects.isNull(cf) ? null : cf.getContent());
        build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
        
        return DumpConfigHandler.configDump(build.build());
    }
    if (StringUtils.isBlank(tag)) {
        // tag为空的情况,正常情况下都是走的这个分支

        // 查看配置信息
        ConfigInfo cf = configInfoPersistService.findConfigInfo(dataId, group, tenant);
        
        build.remove(Objects.isNull(cf));
        build.content(Objects.isNull(cf) ? null : cf.getContent());
        build.type(Objects.isNull(cf) ? null : cf.getType());
        build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
    } else {
        ConfigInfo4Tag cf = configInfoTagPersistService.findConfigInfo4Tag(dataId, group, tenant, tag);
        
        build.remove(Objects.isNull(cf));
        build.content(Objects.isNull(cf) ? null : cf.getContent());
        
    }
    // 构建出ConfigDumpEvent,然后触发dump配置
    return DumpConfigHandler.configDump(build.build());
}

上面的逻辑主要是构建出ConfigDumpEvent,然后触发dump配置,通过DumpConfigHandler处理。

public static boolean configDump(ConfigDumpEvent event) {
    final String dataId = event.getDataId();
    final String group = event.getGroup();
    final String namespaceId = event.getNamespaceId();
    final String content = event.getContent();
    final String type = event.getType();
    final long lastModified = event.getLastModifiedTs();
    //beta测试版
    if (event.isBeta()) {
        boolean result = false;
        if (event.isRemove()) {
            result = ConfigCacheService.removeBeta(dataId, group, namespaceId);
            if (result) {
                ConfigTraceService.logDumpBetaEvent(dataId, group, namespaceId, null, lastModified,
                        event.getHandleIp(), ConfigTraceService.DUMP_TYPE_REMOVE_OK,
                        System.currentTimeMillis() - lastModified, 0);
            }
            return result;
        } else {
            result = ConfigCacheService.dumpBeta(dataId, group, namespaceId, content, lastModified,
                    event.getBetaIps(), event.getEncryptedDataKey());
            if (result) {
                ConfigTraceService.logDumpBetaEvent(dataId, group, namespaceId, null, lastModified,
                        event.getHandleIp(), ConfigTraceService.DUMP_TYPE_OK,
                        System.currentTimeMillis() - lastModified, content.length());
            }
        }
        
        return result;
    }
    
    //tag不为空的处理
    if (StringUtils.isNotBlank(event.getTag())) {
        //
        boolean result;
        if (!event.isRemove()) {
            // 非删除配置事件
            result = ConfigCacheService.dumpTag(dataId, group, namespaceId, event.getTag(), content, lastModified,
                    event.getEncryptedDataKey());
            if (result) {
                ConfigTraceService.logDumpTagEvent(dataId, group, namespaceId, event.getTag(), null, lastModified,
                        event.getHandleIp(), ConfigTraceService.DUMP_TYPE_OK,
                        System.currentTimeMillis() - lastModified, content.length());
            }
        } else {
            // 删除配置事件,移除配置缓存
            result = ConfigCacheService.removeTag(dataId, group, namespaceId, event.getTag());
            if (result) {
                ConfigTraceService.logDumpTagEvent(dataId, group, namespaceId, event.getTag(), null, lastModified,
                        event.getHandleIp(), ConfigTraceService.DUMP_TYPE_REMOVE_OK,
                        System.currentTimeMillis() - lastModified, 0);
            }
        }
        return result;
    }
    
    // 内置的一些特殊配置
    if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
        AggrWhitelist.load(content);
    }
    
    if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
        ClientIpWhiteList.load(content);
    }
    
    if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
        SwitchService.load(content);
    }
    
    boolean result;
    if (!event.isRemove()) {
        // 非删除事件:配置缓存服务dump配置信息
        result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, event.getType(),
                event.getEncryptedDataKey());
        
        if (result) {
            // 记录日志
            ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                    ConfigTraceService.DUMP_TYPE_OK, System.currentTimeMillis() - lastModified, content.length());
        }
    } else {
        // 删除配置事件,移除配置缓存
        result = ConfigCacheService.remove(dataId, group, namespaceId);
        
        if (result) {
            // 记录日志
            ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                    ConfigTraceService.DUMP_TYPE_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
        }
    }
    return result;
    
}

因为ConfigDumpEvent分为了两类事件,一类是新增或更新的事件,另一类是删除的事件,对于这两种事件是不同的两种处理方式。

首先看下删除的逻辑:

public static boolean remove(String dataId, String group, String tenant) {
    final String groupKey = GroupKey2.getKey(dataId, group, tenant);
    // 获取写锁
    final int lockResult = tryWriteLock(groupKey);

    // 如果数据不存在了
    if (0 == lockResult) {
        DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
        return true;
    }

    // 获取写锁失败了
    if (lockResult < 0) {
        DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
        return false;
    }
    
    try {
        // 移除配置
        if (!PropertyUtil.isDirectRead()) {
            DUMP_LOG.info("[dump] remove  local disk cache,groupKey={} ", groupKey);
            ConfigDiskServiceFactory.getInstance().removeConfigInfo(dataId, group, tenant);
        }
        // 移除配置缓存
        CACHE.remove(groupKey);
        DUMP_LOG.info("[dump] remove  local jvm cache,groupKey={} ", groupKey);
        // 发布本地配置变动通知
        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        
        return true;
    } finally {
        // 释放写锁
        releaseWriteLock(groupKey);
    }
}

主要做了三件事情:

  • 1、获取写锁
  • 2、移除配置信息、移除配置缓存
  • 3、发布本地配置变动通知

再看下新增,修改的逻辑:

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
        String type, String encryptedDataKey) {
    return dumpWithMd5(dataId, group, tenant, content, null, lastModifiedTs, type, encryptedDataKey);
}

public static boolean dumpWithMd5(String dataId, String group, String tenant, String content, String md5,
        long lastModifiedTs, String type, String encryptedDataKey) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    CacheItem ci = makeSure(groupKey, encryptedDataKey);
    ci.setType(type);
    // 获取写锁
    final int lockResult = tryWriteLock(groupKey);
    assert (lockResult != 0);

    // 获取锁失败
    if (lockResult < 0) {
        DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
        return false;
    }
    
    try {
        
        // 校验最后更新时间,如果这个事件滞后了则不处理了
        boolean lastModifiedOutDated = lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey);
        // 小于缓存中的最后更新时间,说明滞后了,不处理
        if (lastModifiedOutDated) {
            DUMP_LOG.warn("[dump-ignore] timestamp is outdated,groupKey={}", groupKey);
            return true;
        }
        
        boolean newLastModified = lastModifiedTs > ConfigCacheService.getLastModifiedTs(groupKey);

        // 计算配置信息的md5值
        if (md5 == null) {
            md5 = MD5Utils.md5Hex(content, ENCODE);
        }
        
        //check md5 & update local disk cache.
        String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
        boolean md5Changed = !md5.equals(localContentMd5);
        // 如果配置内容发生变更,需要保存到磁盘
        if (md5Changed) {
            if (!PropertyUtil.isDirectRead()) {
                DUMP_LOG.info("[dump] md5 changed, save to disk cache ,groupKey={}, newMd5={},oldMd5={}", groupKey,
                        md5, localContentMd5);
                ConfigDiskServiceFactory.getInstance().saveToDisk(dataId, group, tenant, content);
            } else {
                //ignore to save disk cache in direct model
            }
        } else {
            DUMP_LOG.warn("[dump-ignore] ignore to save to disk cache. md5 consistent,groupKey={}, md5={}",
                    groupKey, md5);
        }
        
        //check  md5 and timestamp & update local jvm cache.
        if (md5Changed) {
            DUMP_LOG.info(
                    "[dump] md5 changed, update md5 and timestamp in jvm cache ,groupKey={}, newMd5={},oldMd5={},lastModifiedTs={}",
                    groupKey, md5, localContentMd5, lastModifiedTs);
            // 如果配置内容发生变更,需要更新MD5值,更新本地内存中的配置信息,并发布本地配置变更事件
            updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
        } else if (newLastModified) {
            DUMP_LOG.info(
                    "[dump] md5 consistent ,timestamp changed, update timestamp only in jvm cache ,groupKey={},lastModifiedTs={}",
                    groupKey, lastModifiedTs);
            // 设置缓存中配置最后变更时间
            updateTimeStamp(groupKey, lastModifiedTs, encryptedDataKey);
        } else {
            DUMP_LOG.warn(
                    "[dump-ignore] ignore to save to jvm cache. md5 consistent and no new timestamp changed.groupKey={}",
                    groupKey);
        }
        
        return true;
    } catch (IOException ioe) {
        DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);
        if (ioe.getMessage() != null) {
            String errMsg = ioe.getMessage();
            if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN)
                    || errMsg.contains(DISK_QUATA_EN)) {
                // Protect from disk full.
                FATAL_LOG.error("Local Disk Full,Exit", ioe);
                System.exit(0);
            }
        }
        return false;
    } finally {
        // 释放写锁
        releaseWriteLock(groupKey);
    }
    
}

public static void updateMd5(String groupKey, String md5Utf8, long lastModifiedTs, String encryptedDataKey) {
    CacheItem cache = makeSure(groupKey, encryptedDataKey);
    if (cache.getConfigCache().getMd5Utf8() == null || !cache.getConfigCache().getMd5Utf8().equals(md5Utf8)) {
        cache.getConfigCache().setMd5Utf8(md5Utf8);
        cache.getConfigCache().setLastModifiedTs(lastModifiedTs);
        cache.getConfigCache().setEncryptedDataKey(encryptedDataKey);
        NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
    }
}

这里与删除逻辑不同的是,需要比较md5值,不一致可能会有个磁盘存储的处理。

3.2.2、目标节点非当前节点

如果目标节点是其它节点,还会区分是否健康实例,进行不同的处理。如果节点不健康,则会延迟处理同步的逻辑;如果节点健康,则会发送grpc请求,目标服务进行配置的同步。

在服务节点间的配置同步有两个主要的逻辑:

  • 1、节点不健康的情况,采用异步定时任务去执行,但是这个定时并不是严格意义的定时,因为他会有个延迟的过程,会随着失败次数的增加,延迟不断加大,不过当达到最大失败次数后,就不会再增加,以一个固定的时间去触发。最大时间间隔是500ms + 7 * 7 * 1000ms。
private void asyncTaskExecute(NotifySingleRpcTask task) {
    // 随着失败次数的增加,延迟不断加大,不过当达到最大失败次数后,就不会再增加,以一个固定的时间去触发。最大时间间隔是500ms + 7 * 7 * 1000ms
    int delay = getDelayTime(task);
    Queue<NotifySingleRpcTask> queue = new LinkedList<>();
    queue.add(task);
    AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
    ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}

private static int getDelayTime(NotifyTask task) {
    int failCount = task.getFailCount();
    // 最大时间间隔是500ms + 7 * 7 * 1000ms
    int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
    if (failCount <= MAX_COUNT) {
        task.setFailCount(failCount + 1);
    }
    return delay;
}
  • 2、节点不健康的情况,发送grpc同步请求
configClusterRpcClientProxy.syncConfigChange(member, syncRequest,
                                new AsyncRpcNotifyCallBack(task));

public void syncConfigChange(Member member, ConfigChangeClusterSyncRequest request, RequestCallBack callBack)
        throws NacosException {
    // 异步处理
    // grpc真正的处理类是在:com.alibaba.nacos.config.server.remote.ConfigChangeClusterSyncRequestHandler.handle
    clusterRpcClientProxy.asyncRequest(member, request, callBack);
}

具体处理ConfigChangeClusterSyncRequest请求是在ConfigChangeClusterSyncRequestHandler#handle方法:

public ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest,
        RequestMeta meta) throws NacosException {
    // 调用到其他节点,其他节点也是执行dump服务,然后通知和本机连接的客户端,通知他们进行配置更新。
    if (configChangeSyncRequest.isBeta()) {
        dumpService.dumpBeta(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
    } else if (configChangeSyncRequest.isBatch()) {
        dumpService.dumpBatch(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
    } else if (StringUtils.isNotBlank(configChangeSyncRequest.getTag())) {
        dumpService.dumpTag(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                configChangeSyncRequest.getTenant(), configChangeSyncRequest.getTag(),
                configChangeSyncRequest.getLastModified(), meta.getClientIp());
    } else {
		// 本机的dump服务
        dumpService.dumpFormal(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
                configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
    }
    return new ConfigChangeClusterSyncResponse();
}

可以看到,调用到其他节点,其他节点也是执行dump服务,然后通知和本机连接的客户端,通知他们进行配置更新。

四、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/453857.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

个人博客系列-后端项目-用户注册功能(7)

介绍 用户注册API的主要流程&#xff1a;1.前端用户提交用户名&#xff0c;密码 2. 序列化器校验用户名&#xff0c;密码是否合法。3.存入数据库。4.签发token 创建序列化器 from rest_framework import serializers from rest_framework_simplejwt.serializers import Toke…

图【数据结构】

文章目录 图的基本概念邻接矩阵邻接表图的遍历BFSDFS 图的基本概念 图是由顶点集合及顶点间的关系组成的一种数据结构 顶点和边&#xff1a;图中结点称为顶点 权值:边附带的数据信息 路径 &#xff1a; 简单路径 和 回路&#xff1a; 子图&#xff1a;设图G {V, E}和图G1…

计算机网络:关键性能指标与非性能特征解析

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

VR文化旅游虚拟现实介绍|虚拟现实元宇宙|VR设备购买

虚拟现实&#xff08;VR&#xff09;技术正在改变我们对文化旅游的认知和体验。通过VR技术&#xff0c;人们可以身临其境地探索世界各地的文化遗产和旅游景点&#xff0c;无需亲临现场也能感受到逼真的体验。以下是VR文化旅游虚拟现实的介绍&#xff1a; 身临其境的体验&#x…

c++之旅——第六弹

大家好啊&#xff0c;这里是c之旅第六弹&#xff0c;跟随我的步伐来开始这一篇的学习吧&#xff01; 如果有知识性错误&#xff0c;欢迎各位指正&#xff01;&#xff01;一起加油&#xff01;&#xff01; 创作不易&#xff0c;希望大家多多支持哦&#xff01; 一,静态成员&…

安装Mysql和Mycli插件

一、安装数据库 1.重定向生成配置文件 cat >/etc/yum.repos.d/mysql.repo <<EOF [mysql57-community] nameMySQL 5.7 Community Server baseurlhttp://repo.mysql.com/yum/mysql-5.7-community/el/7/x86_64/ enabled1 gpgcheck0 EOF 2.yum安装 yum -y install mysq…

eclipse导入项目出现中文乱码

eclipse导入java项目的时候有时会出现乱码问题&#xff0c;很苦恼&#xff0c;网上找了很多方法都没用&#xff0c;所以得自己记录一下。导入项目可参考链接 eclipse中导入java项目-CSDN博客 1、点击 Windows --> Pereferences 2、依次点击下图内流程 3、看到下面的就修改成…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的血细胞智能检测与计数(深度学习模型+UI界面代码+训练数据集)

摘要&#xff1a;开发血细胞智能检测与计数系统对于疾病的预防、诊断和治疗具有关键作用。本篇博客详细介绍了如何运用深度学习构建一个血细胞智能检测与计数系统&#xff0c;并提供了完整的实现代码。该系统基于强大的YOLOv8算法&#xff0c;并对比了YOLOv7、YOLOv6、YOLOv5&a…

弧形导轨的设计要求

制造业设备种类越来越多&#xff0c;非标自动化设备渐渐成了主力市场&#xff0c;其中弧形导轨线体作为非标自动化运输中的基石&#xff0c;承担了运输&#xff0c;定位&#xff0c;特殊工位组装&#xff0c;其设计要求也非常严格。 1、精度要求&#xff1a;弧形导轨需要具备高…

【C++】开源:iceoryx通信中间件配置与使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍iceoryx通信中间件配置与使用。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新…

Linux运维之管理工具篇

一、前言 因运维过程中&#xff0c;经常会借助于很多工具来实现我们的监控、备份、校验&#xff0c;安全测试&#xff0c;批量操作&#xff0c;可视化辅助&#xff0c;集中管理等&#xff0c;甚至AI相关&#xff0c;本文特对常用工具进行梳理记录&#xff0c;以备不时之需及后…

记OnlyOffice的两个大坑

开发版&#xff0c;容器部署&#xff0c;试用许可已安装。 word&#xff0c;ppt&#xff0c;excel均能正常浏览。 自带的下载菜单按钮能用。 但config里自定义的downloadAs方法却不一而足。 word能正常下载&#xff0c;excel和ppt都不行。 仔细比对调试了代码。发现app.js…

2024年3月份实时获取地图边界数据方法,省市区县街道多级联动【附实时geoJson数据下载】

首先&#xff0c;来看下效果图 在线体验地址&#xff1a;https://geojson.hxkj.vip&#xff0c;并提供实时geoJson数据文件下载 可下载的数据包含省级geojson行政边界数据、市级geojson行政边界数据、区/县级geojson行政边界数据、省市区县街道行政编码四级联动数据&#xff0…

Antd中s-table组件某字段进行排序

Antd中s-table组件某字段进行排序 提前说明&#xff0c;s-table组件包含分页等功能 <s-tableref"table":columns"columns":data"loadData"bordered:row-key"(record) > record.id"></s-table>而其中loadData为获取表数…

strcat函数

函数理解记忆&#xff1a;str表示是<string.g>中的函数&#xff0c;cat表示附加。意思是将一个字符串的内容附加到另一个字符串的末尾。 注意要点&#xff1a;既然要附加&#xff0c;附加的字符串和被附加的字符串都要有\0。否则不知道附加多少&#xff0c;不知附加在哪…

【Linux进程的知识点】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档文章目录 前言 操作系统的知识补充 我们来理解一个用户操作接口&#xff1a; 进程的理解 进程的基本概念 描述进程-PCB task_struct-PCB的一种 task_ struct内容分类 …

算法刷题Day8 | 344.反转字符串、541. 反转字符串II、卡码网:54.替换数字、151.翻转字符串里的单词、卡码网:55.右旋转字符串

目录 0 引言1 反转字符串1.1 我的解题1.2 2 反转字符串II2.1 解题 3 替换数字3.1 双指针法 4 翻转字符串里的单词4.1 我的解题思路4.2 另一种思路 5 右旋转字符串5.15.2 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;算法专栏&#x1f4a5; 标题…

什么是序列化?为什么需要序列化?

1、典型回答 序列化(Serialization)序列化是将对象转换为可存储或传输的形式的过程(例如: 将对象转换为字节流) 反序列化(Deserialization) 是将序列化后的数据(例如: 二进制文件)转换回原始对象的过程。通过反序列化&#xff0c;可以从存储介质 (如磁盘、数据库) 或通过网络…

程序员们的“薪饭碗”鸿蒙迎来“薪”的就业岗位

随着科技行业的不断创新和发展&#xff0c;程序员们所面对的技术挑战和机遇也在不断增加。近年来&#xff0c;鸿蒙操作系统的崛起为程序员们提供了新的就业机会和发展前景。 鸿蒙系统作为一种全新的操作系统&#xff0c;在市场上占据一席之地。在当前就业市场中&#xff0c;鸿…

网络套接字-UDP服务器

一 预备知识 1 端口号和进程id 主机间的数据传输本质是两个进程在通信&#xff0c;就像是我们打开抖音刷视频&#xff0c;视频不是都保存在手机上的&#xff0c;而是服务器发送给你的&#xff0c;这里就是用到了网络。 那如何保证把数据给指定进程呢? 就是用端口号去标识主机中…