目录
一、简介
二、加密处理
三、发布配置
3.1、插入或更新配置信息
3.2、发布配置数据变动事件
3.2.1、目标节点是当前节点
3.2.2、目标节点非当前节点
四、总结
一、简介
一般情况下,我们是通过Nacos提供的Web控制台登录,然后通过界面新增配置信息。后续客户端只要配置了对应的NameSpace,Group,DataId就可以在客户端获取到对应的配置信息。既然这样,Nacos服务端肯定会存储在Web控制台配置的配置信息。
Web控制台发布配置的入口肯定也是一个controller接口:com.alibaba.nacos.config.server.controller.ConfigController#publishConfig。
@PostMapping
@TpsControl(pointName = "ConfigPublish")
@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema,
@RequestParam(required = false) String encryptedDataKey) throws NacosException {
String encryptedDataKeyFinal = null;
// 内容加密
if (StringUtils.isNotBlank(encryptedDataKey)) {
encryptedDataKeyFinal = encryptedDataKey;
} else {
// 使用到插件化的思想进行加密
Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
content = pair.getSecond();
encryptedDataKeyFinal = pair.getFirst();
}
// 参数检查
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
// 构造配置信息,包括namespaceId、groupId、dataId、配置内容、描述信息等
ConfigForm configForm = new ConfigForm();
configForm.setDataId(dataId);
configForm.setGroup(group);
configForm.setNamespaceId(tenant);
configForm.setContent(content);
configForm.setTag(tag);
configForm.setAppName(appName);
configForm.setSrcUser(srcUser);
configForm.setConfigTags(configTags);
configForm.setDesc(desc);
configForm.setUse(use);
configForm.setEffect(effect);
configForm.setType(type);
configForm.setSchema(schema);
if (StringUtils.isBlank(srcUser)) {
configForm.setSrcUser(RequestUtil.getSrcUserName(request));
}
if (!ConfigType.isValidType(type)) {
configForm.setType(ConfigType.getDefaultType().getType());
}
// 构造请求对象
ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();
configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
configRequestInfo.setBetaIps(request.getHeader("betaIps"));
// 发布配置
return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKeyFinal);
}
上述的代码主要完成了五件事情:
- 1、加密处理
- 2、参数检查
- 3、构造配置信息
- 4、构造请求对象
- 5、发布配置
下面我们分析一些重要代码。
二、加密处理
加密处理使用了插件化思想。我们分析下插件化的思想,看看是如何使用插件或者扩展来进行加解密的。
public static Pair<String, String> encryptHandler(String dataId, String content) {
// 检查是否需要加密
if (!checkCipher(dataId)) {
return Pair.with("", content);
}
Optional<String> algorithmName = parseAlgorithmName(dataId);
// 获取加密的处理类
// EncryptionPluginManager.instance(): 返回单例实例
Optional<EncryptionPluginService> optional = algorithmName
.flatMap(EncryptionPluginManager.instance()::findEncryptionService);
if (!optional.isPresent()) {
LOGGER.warn("[EncryptionHandler] [encryptHandler] No encryption program with the corresponding name found");
// 获取不到,还是走非加密型
return Pair.with("", content);
}
EncryptionPluginService encryptionPluginService = optional.get();
// 根据扩展的插件类,获取密钥
String secretKey = encryptionPluginService.generateSecretKey();
// 利用密钥加密
String encryptContent = encryptionPluginService.encrypt(secretKey, content);
return Pair.with(encryptionPluginService.encryptSecretKey(secretKey), encryptContent);
}
首先判断是否需要处理加密,如果需要的话,去插件里面获取对应的处理类,如果获取不到则打日志,然后使用非加密方式进行处理;获取到加密插件,利用插件获取秘钥,然后再加密。
我们来分析下如何获取加密处理类的:
Optional<EncryptionPluginService> optional = algorithmName
.flatMap(EncryptionPluginManager.instance()::findEncryptionService)
这个EncryptionPluginManager.instance()执行返回的是一个单例对象,看看它的构造方法:
private EncryptionPluginManager() {
// 初始化: 根据自己写的扩展机制,获取EncryptionPluginService,然后再进行反射初始化。
loadInitial();
}
private void loadInitial() {
// 通过NacosServiceLoader扩展机制,获取EncryptionPluginService加密处理类的集合
Collection<EncryptionPluginService> encryptionPluginServices = NacosServiceLoader.load(
EncryptionPluginService.class);
for (EncryptionPluginService encryptionPluginService : encryptionPluginServices) {
if (StringUtils.isBlank(encryptionPluginService.algorithmName())) {
LOGGER.warn("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName(null/empty) fail."
+ " Please Add algorithmName to resolve.", encryptionPluginService.getClass());
continue;
}
// 放入集合
ENCRYPTION_SPI_MAP.put(encryptionPluginService.algorithmName(), encryptionPluginService);
LOGGER.info("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName({}) successfully.",
encryptionPluginService.getClass(), encryptionPluginService.algorithmName());
}
}
因为是单例,所以获取单例的时候通过loadInitial()进行初始化,初始化的时候会根据自己写的扩展机制,获取EncryptionPluginService加密处理类集合,然后再进行反射初始化,并缓存起来。
重要的还是这种插件化的思想,它仅仅依赖于原生JDK的SPI机制,可以按需扩展和定制:
- 1、提供给插件化的接口,由第三方去实现(自定义功能);
- 2、在初始化的时候,Nacos去加载处理类;
三、发布配置
发布配置调用的是ConfigOperationService#publishConfig方法:
public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)
throws NacosException {
// 将配置高级信息转成Map键值对
Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
// 检查参数
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {
LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),
configForm.getDataId(), configForm.getGroup());
throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,
"dataId:" + configForm.getDataId() + " is aggr");
}
// 构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容
ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configForm.getAppName(), configForm.getContent());
configInfo.setType(configForm.getType());
configInfo.setEncryptedDataKey(encryptedDataKey);
ConfigOperateResult configOperateResult = null;
String persistEvent = ConfigTraceService.PERSISTENCE_EVENT;
// 判断是否是beta测试版本
if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {
// 正常发布,大部分情况下,我们都没有指定tag
if (StringUtils.isBlank(configForm.getTag())) {
// 1、插入 or 更新配置信息
// 这里分为内置数据库(EmbeddedConfigInfoPersistServiceImpl)和外置数据库(ExternalConfigInfoPersistServiceImpl)操作,通常我们都是使用MySQL进行持久化存储
configOperateResult = configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(),
configForm.getSrcUser(), configInfo, configAdvanceInfo);
// 2、发布配置数据变动事件
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configOperateResult.getLastModified()));
} else {
// 指定tag
persistEvent = ConfigTraceService.PERSISTENCE_EVENT_TAG + "-" + configForm.getTag();
configOperateResult = configInfoTagPersistService.insertOrUpdateTag(configInfo, configForm.getTag(),
configRequestInfo.getSrcIp(), configForm.getSrcUser());
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configForm.getTag(),
configOperateResult.getLastModified()));
}
} else {
persistEvent = ConfigTraceService.PERSISTENCE_EVENT_BETA;
// beta publish
configOperateResult = configInfoBetaPersistService.insertOrUpdateBeta(configInfo,
configRequestInfo.getBetaIps(), configRequestInfo.getSrcIp(), configForm.getSrcUser());
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(true, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configOperateResult.getLastModified()));
}
// 日志跟踪
ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configRequestInfo.getRequestIpApp(), configOperateResult.getLastModified(),
InetUtils.getSelfIP(), persistEvent, ConfigTraceService.PERSISTENCE_TYPE_PUB, configForm.getContent());
return true;
}
首先组装好一些参数,我们需要重点关注的是构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容。然后包含一些测试版本和tag的分支逻辑判断,我们关注最常用的正常发布流程。
通常情况下,我们发布配置,都不指定tag,其实就做了两件事:
- 1、插入或更新配置信息
- 2、发布配置数据变动事件
3.1、插入或更新配置信息
插入或更新配置信息,其实就是操作数据库,数据库操作分为了内置数据库和外置数据库,我们通常使用外置数据库MySQL来存储配置信息,也就是ExternalConfigInfoPersistServiceImpl,内置数据库对应的操作类是EmbeddedConfigInfoPersistServiceImpl。
我们这里主要分析外置数据库MySQL的方式:ExternalConfigInfoPersistServiceImpl#insertOrUpdate
public ConfigOperateResult insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo,
Map<String, Object> configAdvanceInfo) {
// 没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新。
try {
// 添加配置信息
return addConfigInfo(srcIp, srcUser, configInfo, configAdvanceInfo);
} catch (DuplicateKeyException ive) { // Unique constraint conflict
// 如果报唯一约束冲突,则更新配置内容
return updateConfigInfo(configInfo, srcIp, srcUser, configAdvanceInfo);
}
}
从源码可以看到,这里没有直接判断是新增还是更新配置,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新。
我们先看下新增配置addConfigInfo:
public ConfigOperateResult addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Map<String, Object> configAdvanceInfo) {
return tjt.execute(status -> {
try {
// jdbcTemplate操作,自动插入到数据库表(config_info)中,返回主键id
long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
// 新增tag管理
addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
Timestamp now = new Timestamp(System.currentTimeMillis());
// 插入历史数据到表中(his_config_info)
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, now, "I");
ConfigInfoStateWrapper configInfoCurrent = this.findConfigInfoState(configInfo.getDataId(),
configInfo.getGroup(), configInfo.getTenant());
if (configInfoCurrent == null) {
return new ConfigOperateResult(false);
}
return new ConfigOperateResult(configInfoCurrent.getId(), configInfoCurrent.getLastModified());
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
});
}
插入数据库的操作是在addConfigInfoAtomic()方法:
public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {
// 取出配置信息
final String appNameTmp =
StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
final String tenantTmp =
StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");
final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");
final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
// 将配置内容进行MD5加密
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
KeyHolder keyHolder = new GeneratedKeyHolder();
// 根据数据库表获取对应的mapper, 通过插件化的形式, 灵活应对使用不同数据库的场景
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
// 将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句
final String sql = configInfoMapper.insert(
Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
"gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
"encrypted_data_key"));
// 获取主键名称,默认值为id
String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
try {
jt.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
Timestamp now = new Timestamp(System.currentTimeMillis());
// 通过预编译的PreparedStatement,设置每个字段的值
PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
ps.setString(1, configInfo.getDataId());
ps.setString(2, configInfo.getGroup());
ps.setString(3, tenantTmp);
ps.setString(4, appNameTmp);
ps.setString(5, configInfo.getContent());
ps.setString(6, md5Tmp);
ps.setString(7, srcIp);
ps.setString(8, srcUser);
ps.setTimestamp(9, now);
ps.setTimestamp(10, now);
ps.setString(11, desc);
ps.setString(12, use);
ps.setString(13, effect);
ps.setString(14, type);
ps.setString(15, schema);
ps.setString(16, encryptedDataKey);
return ps;
}
}, keyHolder);
Number nu = keyHolder.getKey();
if (nu == null) {
throw new IllegalArgumentException("insert config_info fail");
}
return nu.longValue();
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
}
首先取出配置信息,对配置的内容进行MD5加密,然后根据数据库表获取对应的mapper,这里还是通过插件化的形式,灵活应对使用不同数据库的场景。
获取到mapper之后,将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句,最后通过JdbcTemplate执行sql,完成配置的插入。
我们再来分析下如何利用插件化思想完成对mapper的获取的:
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
public <R extends Mapper> R findMapper(String dataSource, String tableName) {
LOGGER.info("[MapperManager] findMapper dataSource: {}, tableName: {}", dataSource, tableName);
if (StringUtils.isBlank(dataSource) || StringUtils.isBlank(tableName)) {
throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE, "dataSource or tableName is null");
}
// 从SPI缓存中获取,这个是在MapperManager构造方法中初始化的
Map<String, Mapper> tableMapper = MAPPER_SPI_MAP.get(dataSource);
if (Objects.isNull(tableMapper)) {
throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE,
"[MapperManager] Failed to find the datasource,dataSource:" + dataSource);
}
// 根据表名称获取mapper
Mapper mapper = tableMapper.get(tableName);
if (Objects.isNull(mapper)) {
throw new NacosRuntimeException(FIND_TABLE_ERROR_CODE,
"[MapperManager] Failed to find the table ,tableName:" + tableName);
}
if (dataSourceLogEnable) {
return (R) MapperProxy.createSingleProxy(mapper);
}
return (R) mapper;
}
首先从MAPPER_SPI_MAP缓存中获取,这个是在MapperManager构造方法中初始化的。然后根据表名称获取到对应的mapper。
这个MAPPER_SPI_MAP初始化也和之前EncryptionPluginService的一样,在单例的构造方法中加载:
private MapperManager() {
loadInitial();
}
public void loadInitial() {
Collection<Mapper> mappers = NacosServiceLoader.load(Mapper.class);
for (Mapper mapper : mappers) {
Map<String, Mapper> mapperMap = MAPPER_SPI_MAP.computeIfAbsent(mapper.getDataSource(), (r) -> new HashMap<>(16));
mapperMap.put(mapper.getTableName(), mapper);
LOGGER.info("[MapperManager] Load Mapper({}) datasource({}) tableName({}) successfully.",
mapper.getClass(), mapper.getDataSource(), mapper.getTableName());
}
}
我们也可以看到Nacos源码加载的Mapper插件:
获取到插件配置的具体mapper实现类后,在调用mapper.Mapper#insert()方法时,就可以根据插件的扩展,通过不同的实现类去处理了,就能解决不同数据库类型中sql存在差异的问题。
更新配置的大体流程跟新增一样,首先查出旧的配置信息,然后做一些判断,最后根据dataType和表名称获取对应的mapper,然后组装好sql,通过JdbcTemplate执行。
3.2、发布配置数据变动事件
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configOperateResult.getLastModified()));
public static void notifyConfigChange(ConfigDataChangeEvent event) {
// 如果是内部存储并且Nacos非单机模式启动,就不处理了
if (DatasourceConfiguration.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
return;
}
NotifyCenter.publishEvent(event);
}
可以看到,还是利用了Nacos的事件统一发布中心NotifyCenter类,我们直接查找ConfigDataChangeEvent的onEvent方法来查看处理逻辑。
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// 注册ConfigDataChangeEvent到NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// 注册一个订阅ConfigDataChangeEvent事件的处理类
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
// 获取所有的Nacos服务节点(包括当前客户端)
Collection<Member> ipList = memberManager.allMembers();
// 创建一个队列,将相关配置的其他服务节点都存放进来
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
for (Member member : ipList) {
// grpc report data change only
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, evt.isBatch,
member));
}
if (!rpcQueue.isEmpty()) {
// 通过线程池执行异步通知
// AsyncRpcTask实现了runnable接口,关注其run方法
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
在AsyncNotifyService构造方法中,将ConfigDataChangeEvent事件注册到NotifyCenter通知中心,然后还注册一个订阅ConfigDataChangeEvent事件的处理类。
AsyncNotifyService使用spring进行托管,在IOC容器启动的时候,就会创建这个bean对象,就会执行AsyncNotifyService构造方法。我们重点关注onEvent()具体的事件处理逻辑:
- 1、获取所有的Nacos服务节点(包括当前客户端)
- 2、创建一个队列,将相关配置的其他服务节点都存放进来
- 3、通过线程池执行异步通知
// 获取所有的Nacos服务节点(包括当前客户端)
Collection<Member> ipList = memberManager.allMembers();
// 创建一个队列,将相关配置的其他服务节点都存放进来
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
for (Member member : ipList) {
// grpc report data change only
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, evt.isBatch,
member));
}
if (!rpcQueue.isEmpty()) {
// 通过线程池执行异步通知
// AsyncRpcTask实现了runnable接口,关注其run方法
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
获取到服务列表后,通过线程池调用异步任务AsyncRpcTask,AsyncRpcTask实现了Runnable接口,看看run()的逻辑:
class AsyncRpcTask implements Runnable {
private Queue<NotifySingleRpcTask> queue;
public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
// 构造方法放入rpcTask的队列
this.queue = queue;
}
@Override
public void run() {
while (!queue.isEmpty()) {
// 从队列中取出任务
NotifySingleRpcTask task = queue.poll();
// 构造配置变动集群同步请求
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId(task.getDataId());
syncRequest.setGroup(task.getGroup());
syncRequest.setBeta(task.isBeta);
syncRequest.setLastModified(task.getLastModified());
syncRequest.setTag(task.tag);
syncRequest.setBatch(task.isBatch);
syncRequest.setTenant(task.getTenant());
// 通知的目标节点
Member member = task.member;
// 如果是当前节点,直接调用dumpService执行dump操作
if (memberManager.getSelf().equals(member)) {
if (syncRequest.isBeta()) {
dumpService.dumpBeta(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
} else if (syncRequest.isBatch()) {
dumpService.dumpBatch(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
} else if (StringUtils.isNotBlank(syncRequest.getTag())) {
dumpService.dumpTag(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
} else {
dumpService.dumpFormal(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP());
}
continue;
}
String event = getNotifyEvent(task);
if (memberManager.hasMember(member.getAddress())) {
// 启动健康检查,有IP未被监控,直接放入通知队列,否则通知
boolean unHealthNeedDelay = isUnHealthy(member.getAddress());
if (unHealthNeedDelay) {
// 目标 IP 运行状况不健康,然后将其放入通知列表中
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), event,
ConfigTraceService.NOTIFY_TYPE_UNHEALTH, 0, member.getAddress());
// 异步任务执行
// 可延迟的处理,因为是不健康的节点,不知道什么时候能恢复
asyncTaskExecute(task);
} else {
// 发送grpc请求
try {
configClusterRpcClientProxy.syncConfigChange(member, syncRequest,
new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}
}
} else {
//No nothing if member has offline.
}
}
}
}
只要队列不为空,就会从队列中取出NotifySingleRpcTask任务来执行,然后构造配置变动集群同步的请求对象,包括namespaceId、dataId、groupId、标签等,然后通知目标节点。
3.2.1、目标节点是当前节点
如果目标节点是当前节点,则会直接调用dumpService执行dump操作,其实就是更新本地内存和磁盘中的配置信息为最新的配置信息。
public void dumpFormal(String dataId, String group, String tenant, long lastModified, String handleIp) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = dataId + group + tenant;
// 将DumpTask添加到TaskManager任务管理器,它将异步执行
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, false, false, null, lastModified, handleIp));
DUMP_LOG.info("[dump] add formal task. groupKey={}", groupKey);
}
上面是将task放入到了TaskManager中,那在哪里执行的呢?我们看下DumpService的构造方法:
public DumpService(ConfigInfoPersistService configInfoPersistService,
NamespacePersistService namespacePersistService,
HistoryConfigInfoPersistService historyConfigInfoPersistService,
ConfigInfoAggrPersistService configInfoAggrPersistService,
ConfigInfoBetaPersistService configInfoBetaPersistService,
ConfigInfoTagPersistService configInfoTagPersistService, ServerMemberManager memberManager) {
this.configInfoPersistService = configInfoPersistService;
this.namespacePersistService = namespacePersistService;
this.historyConfigInfoPersistService = historyConfigInfoPersistService;
this.configInfoAggrPersistService = configInfoAggrPersistService;
this.configInfoBetaPersistService = configInfoBetaPersistService;
this.configInfoTagPersistService = configInfoTagPersistService;
this.memberManager = memberManager;
this.processor = new DumpProcessor(this);
this.dumpAllProcessor = new DumpAllProcessor(this);
this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
// 创建一个TaskManager
this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
// 设置默认的Processor处理(DumpProcessor)
this.dumpTaskMgr.setDefaultTaskProcessor(processor);
this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
DynamicDataSource.getInstance().getDataSource();
}
可以看到,创建了一个任务管理器TaskManager,并设置了默认的处理类DumpProcessor。
我们再看下TaskManager的构造方法:
public TaskManager(String name) {
super(name, LOGGER, 100L);
this.name = name;
}
TaskManager继承自NacosDelayTaskExecuteEngine延时任务执行引擎,所以实际上执行的是:
/**
* 定时任务线程池,在构造方法中初始化
*/
private final ScheduledExecutorService processingExecutor;
/**
* 任务队列
* key:对应的服务
*/
protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
// 初始化任务队列
tasks = new ConcurrentHashMap<>(initCapacity);
// 创建定时任务的线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 在指定的初始延迟时间(100毫秒)后开始执行任务,并按固定的时间间隔周期性(100毫秒)地执行任务。
// 默认延时100毫秒执行ProcessRunnable,然后每隔100毫秒周期性执行ProcessRunnable
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
熟悉nacos服务注册流程的小伙伴对这一块应该不陌生,服务注册也是大量使用到任务引擎。从上面的代码中,我们可以看到,NacosDelayTaskExecuteEngine内部包含一个阻塞队列,用来存放任务的,然后初始化了一个定时执行的线程池,每隔100毫秒周期性执行ProcessRunnable。ProcessRunnable的run方法中就是从阻塞队列中不单取出任务来执行,查看是否有对应的处理类,如果没有就用默认的处理类。
在本例中,实际上就是用的默认的处理类DumpProcessor。 我们查看DumpProcessor#process具体的处理方法:
public boolean process(NacosTask task) {
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
// 构建ConfigDumpEventBuild
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
if (isBeta) {
// 如果发布测试版,则转储配置,更新测试版缓存
ConfigInfo4Beta cf = configInfoBetaPersistService.findConfigInfo4Beta(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
return DumpConfigHandler.configDump(build.build());
}
if (StringUtils.isBlank(tag)) {
// tag为空的情况,正常情况下都是走的这个分支
// 查看配置信息
ConfigInfo cf = configInfoPersistService.findConfigInfo(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
} else {
ConfigInfo4Tag cf = configInfoTagPersistService.findConfigInfo4Tag(dataId, group, tenant, tag);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
}
// 构建出ConfigDumpEvent,然后触发dump配置
return DumpConfigHandler.configDump(build.build());
}
上面的逻辑主要是构建出ConfigDumpEvent,然后触发dump配置,通过DumpConfigHandler处理。
public static boolean configDump(ConfigDumpEvent event) {
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
//beta测试版
if (event.isBeta()) {
boolean result = false;
if (event.isRemove()) {
result = ConfigCacheService.removeBeta(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpBetaEvent(dataId, group, namespaceId, null, lastModified,
event.getHandleIp(), ConfigTraceService.DUMP_TYPE_REMOVE_OK,
System.currentTimeMillis() - lastModified, 0);
}
return result;
} else {
result = ConfigCacheService.dumpBeta(dataId, group, namespaceId, content, lastModified,
event.getBetaIps(), event.getEncryptedDataKey());
if (result) {
ConfigTraceService.logDumpBetaEvent(dataId, group, namespaceId, null, lastModified,
event.getHandleIp(), ConfigTraceService.DUMP_TYPE_OK,
System.currentTimeMillis() - lastModified, content.length());
}
}
return result;
}
//tag不为空的处理
if (StringUtils.isNotBlank(event.getTag())) {
//
boolean result;
if (!event.isRemove()) {
// 非删除配置事件
result = ConfigCacheService.dumpTag(dataId, group, namespaceId, event.getTag(), content, lastModified,
event.getEncryptedDataKey());
if (result) {
ConfigTraceService.logDumpTagEvent(dataId, group, namespaceId, event.getTag(), null, lastModified,
event.getHandleIp(), ConfigTraceService.DUMP_TYPE_OK,
System.currentTimeMillis() - lastModified, content.length());
}
} else {
// 删除配置事件,移除配置缓存
result = ConfigCacheService.removeTag(dataId, group, namespaceId, event.getTag());
if (result) {
ConfigTraceService.logDumpTagEvent(dataId, group, namespaceId, event.getTag(), null, lastModified,
event.getHandleIp(), ConfigTraceService.DUMP_TYPE_REMOVE_OK,
System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
// 内置的一些特殊配置
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(content);
}
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(content);
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(content);
}
boolean result;
if (!event.isRemove()) {
// 非删除事件:配置缓存服务dump配置信息
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, event.getType(),
event.getEncryptedDataKey());
if (result) {
// 记录日志
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_TYPE_OK, System.currentTimeMillis() - lastModified, content.length());
}
} else {
// 删除配置事件,移除配置缓存
result = ConfigCacheService.remove(dataId, group, namespaceId);
if (result) {
// 记录日志
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_TYPE_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
因为ConfigDumpEvent分为了两类事件,一类是新增或更新的事件,另一类是删除的事件,对于这两种事件是不同的两种处理方式。
首先看下删除的逻辑:
public static boolean remove(String dataId, String group, String tenant) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);
// 获取写锁
final int lockResult = tryWriteLock(groupKey);
// 如果数据不存在了
if (0 == lockResult) {
DUMP_LOG.info("[remove-ok] {} not exist.", groupKey);
return true;
}
// 获取写锁失败了
if (lockResult < 0) {
DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey);
return false;
}
try {
// 移除配置
if (!PropertyUtil.isDirectRead()) {
DUMP_LOG.info("[dump] remove local disk cache,groupKey={} ", groupKey);
ConfigDiskServiceFactory.getInstance().removeConfigInfo(dataId, group, tenant);
}
// 移除配置缓存
CACHE.remove(groupKey);
DUMP_LOG.info("[dump] remove local jvm cache,groupKey={} ", groupKey);
// 发布本地配置变动通知
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
return true;
} finally {
// 释放写锁
releaseWriteLock(groupKey);
}
}
主要做了三件事情:
- 1、获取写锁
- 2、移除配置信息、移除配置缓存
- 3、发布本地配置变动通知
再看下新增,修改的逻辑:
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type, String encryptedDataKey) {
return dumpWithMd5(dataId, group, tenant, content, null, lastModifiedTs, type, encryptedDataKey);
}
public static boolean dumpWithMd5(String dataId, String group, String tenant, String content, String md5,
long lastModifiedTs, String type, String encryptedDataKey) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey, encryptedDataKey);
ci.setType(type);
// 获取写锁
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
// 获取锁失败
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
// 校验最后更新时间,如果这个事件滞后了则不处理了
boolean lastModifiedOutDated = lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey);
// 小于缓存中的最后更新时间,说明滞后了,不处理
if (lastModifiedOutDated) {
DUMP_LOG.warn("[dump-ignore] timestamp is outdated,groupKey={}", groupKey);
return true;
}
boolean newLastModified = lastModifiedTs > ConfigCacheService.getLastModifiedTs(groupKey);
// 计算配置信息的md5值
if (md5 == null) {
md5 = MD5Utils.md5Hex(content, ENCODE);
}
//check md5 & update local disk cache.
String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
boolean md5Changed = !md5.equals(localContentMd5);
// 如果配置内容发生变更,需要保存到磁盘
if (md5Changed) {
if (!PropertyUtil.isDirectRead()) {
DUMP_LOG.info("[dump] md5 changed, save to disk cache ,groupKey={}, newMd5={},oldMd5={}", groupKey,
md5, localContentMd5);
ConfigDiskServiceFactory.getInstance().saveToDisk(dataId, group, tenant, content);
} else {
//ignore to save disk cache in direct model
}
} else {
DUMP_LOG.warn("[dump-ignore] ignore to save to disk cache. md5 consistent,groupKey={}, md5={}",
groupKey, md5);
}
//check md5 and timestamp & update local jvm cache.
if (md5Changed) {
DUMP_LOG.info(
"[dump] md5 changed, update md5 and timestamp in jvm cache ,groupKey={}, newMd5={},oldMd5={},lastModifiedTs={}",
groupKey, md5, localContentMd5, lastModifiedTs);
// 如果配置内容发生变更,需要更新MD5值,更新本地内存中的配置信息,并发布本地配置变更事件
updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
} else if (newLastModified) {
DUMP_LOG.info(
"[dump] md5 consistent ,timestamp changed, update timestamp only in jvm cache ,groupKey={},lastModifiedTs={}",
groupKey, lastModifiedTs);
// 设置缓存中配置最后变更时间
updateTimeStamp(groupKey, lastModifiedTs, encryptedDataKey);
} else {
DUMP_LOG.warn(
"[dump-ignore] ignore to save to jvm cache. md5 consistent and no new timestamp changed.groupKey={}",
groupKey);
}
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN)
|| errMsg.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("Local Disk Full,Exit", ioe);
System.exit(0);
}
}
return false;
} finally {
// 释放写锁
releaseWriteLock(groupKey);
}
}
public static void updateMd5(String groupKey, String md5Utf8, long lastModifiedTs, String encryptedDataKey) {
CacheItem cache = makeSure(groupKey, encryptedDataKey);
if (cache.getConfigCache().getMd5Utf8() == null || !cache.getConfigCache().getMd5Utf8().equals(md5Utf8)) {
cache.getConfigCache().setMd5Utf8(md5Utf8);
cache.getConfigCache().setLastModifiedTs(lastModifiedTs);
cache.getConfigCache().setEncryptedDataKey(encryptedDataKey);
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
这里与删除逻辑不同的是,需要比较md5值,不一致可能会有个磁盘存储的处理。
3.2.2、目标节点非当前节点
如果目标节点是其它节点,还会区分是否健康实例,进行不同的处理。如果节点不健康,则会延迟处理同步的逻辑;如果节点健康,则会发送grpc请求,目标服务进行配置的同步。
在服务节点间的配置同步有两个主要的逻辑:
- 1、节点不健康的情况,采用异步定时任务去执行,但是这个定时并不是严格意义的定时,因为他会有个延迟的过程,会随着失败次数的增加,延迟不断加大,不过当达到最大失败次数后,就不会再增加,以一个固定的时间去触发。最大时间间隔是500ms + 7 * 7 * 1000ms。
private void asyncTaskExecute(NotifySingleRpcTask task) {
// 随着失败次数的增加,延迟不断加大,不过当达到最大失败次数后,就不会再增加,以一个固定的时间去触发。最大时间间隔是500ms + 7 * 7 * 1000ms
int delay = getDelayTime(task);
Queue<NotifySingleRpcTask> queue = new LinkedList<>();
queue.add(task);
AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
}
private static int getDelayTime(NotifyTask task) {
int failCount = task.getFailCount();
// 最大时间间隔是500ms + 7 * 7 * 1000ms
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
task.setFailCount(failCount + 1);
}
return delay;
}
- 2、节点不健康的情况,发送grpc同步请求
configClusterRpcClientProxy.syncConfigChange(member, syncRequest,
new AsyncRpcNotifyCallBack(task));
public void syncConfigChange(Member member, ConfigChangeClusterSyncRequest request, RequestCallBack callBack)
throws NacosException {
// 异步处理
// grpc真正的处理类是在:com.alibaba.nacos.config.server.remote.ConfigChangeClusterSyncRequestHandler.handle
clusterRpcClientProxy.asyncRequest(member, request, callBack);
}
具体处理ConfigChangeClusterSyncRequest请求是在ConfigChangeClusterSyncRequestHandler#handle方法:
public ConfigChangeClusterSyncResponse handle(ConfigChangeClusterSyncRequest configChangeSyncRequest,
RequestMeta meta) throws NacosException {
// 调用到其他节点,其他节点也是执行dump服务,然后通知和本机连接的客户端,通知他们进行配置更新。
if (configChangeSyncRequest.isBeta()) {
dumpService.dumpBeta(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else if (configChangeSyncRequest.isBatch()) {
dumpService.dumpBatch(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else if (StringUtils.isNotBlank(configChangeSyncRequest.getTag())) {
dumpService.dumpTag(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getTag(),
configChangeSyncRequest.getLastModified(), meta.getClientIp());
} else {
// 本机的dump服务
dumpService.dumpFormal(configChangeSyncRequest.getDataId(), configChangeSyncRequest.getGroup(),
configChangeSyncRequest.getTenant(), configChangeSyncRequest.getLastModified(), meta.getClientIp());
}
return new ConfigChangeClusterSyncResponse();
}
可以看到,调用到其他节点,其他节点也是执行dump服务,然后通知和本机连接的客户端,通知他们进行配置更新。