全网第一篇把Nacos配置中心服务端讲明白的

入口

getServerConfig对应:ConfigQueryRequestHandler�
image.png
getBatchServiceConfig对应:ConfigChangeBatchListenResponse�
image.png
admin对应:ConfigController
image.png

我们重点就要2个,一个是服务端如何完成客户端获取配置请求,一个是服务端更新配置,客户端如何更新, 也就是说ConfigQueryReustHandler和ConfigController

ConfigQueryRequestHandler

image.png

ConfigQueryRequestHandler.getContext

private ConfigQueryResponse getContext(ConfigQueryRequest configQueryRequest, RequestMeta meta, boolean notify)
            throws UnsupportedEncodingException {
        // todo
        String dataId = configQueryRequest.getDataId();
        String group = configQueryRequest.getGroup();
        String tenant = configQueryRequest.getTenant();
        String clientIp = meta.getClientIp();
        String tag = configQueryRequest.getTag();
        ConfigQueryResponse response = new ConfigQueryResponse();
        
        final String groupKey = GroupKey2
                .getKey(configQueryRequest.getDataId(), configQueryRequest.getGroup(), configQueryRequest.getTenant());
        
        String autoTag = configQueryRequest.getHeader(com.alibaba.nacos.api.common.Constants.VIPSERVER_TAG);
        
        String requestIpApp = meta.getLabels().get(CLIENT_APPNAME_HEADER);
        
        int lockResult = tryConfigReadLock(groupKey);
        
        boolean isBeta = false;
        boolean isSli = false;
        if (lockResult > 0) {
            //FileInputStream fis = null;
            try {
                String md5 = Constants.NULL;
                long lastModified = 0L;
                // todo 从缓存中获取
                CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
                if (cacheItem != null) {
                    if (cacheItem.isBeta()) {
                        if (cacheItem.getIps4Beta().contains(clientIp)) {
                            isBeta = true;
                        }
                    }
                    String configType = cacheItem.getType();
                    response.setContentType((null != configType) ? configType : "text");
                }
                File file = null;
                ConfigInfoBase configInfoBase = null;
                PrintWriter out = null;
                if (isBeta) {
                    md5 = cacheItem.getMd54Beta();
                    lastModified = cacheItem.getLastModifiedTs4Beta();
                    if (PropertyUtil.isDirectRead()) {
                        configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
                    } else {
                        file = DiskUtil.targetBetaFile(dataId, group, tenant);
                    }
                    response.setBeta(true);
                } else {
                    if (StringUtils.isBlank(tag)) {
                        if (isUseTag(cacheItem, autoTag)) {
                            if (cacheItem != null) {
                                if (cacheItem.tagMd5 != null) {
                                    md5 = cacheItem.tagMd5.get(autoTag);
                                }
                                if (cacheItem.tagLastModifiedTs != null) {
                                    lastModified = cacheItem.tagLastModifiedTs.get(autoTag);
                                }
                            }
                            if (PropertyUtil.isDirectRead()) {
                                configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);
                            } else {
                                file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag);
                            }
                            response.setTag(URLEncoder.encode(autoTag, Constants.ENCODE));
                            
                        } else {
                            md5 = cacheItem.getMd5();
                            lastModified = cacheItem.getLastModifiedTs();
                            // todo 是否是数据读
                            if (PropertyUtil.isDirectRead()) {
                                configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
                            } else {
                                file = DiskUtil.targetFile(dataId, group, tenant);
                            }
                            if (configInfoBase == null && fileNotExist(file)) {
                                // FIXME CacheItem
                                // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
                                ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                        ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);
                                
                                // pullLog.info("[client-get] clientIp={}, {},
                                // no data",
                                // new Object[]{clientIp, groupKey});
                                
                                response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
                                return response;
                            }
                        }
                    } else {
                        if (cacheItem != null) {
                            if (cacheItem.tagMd5 != null) {
                                md5 = cacheItem.tagMd5.get(tag);
                            }
                            if (cacheItem.tagLastModifiedTs != null) {
                                Long lm = cacheItem.tagLastModifiedTs.get(tag);
                                if (lm != null) {
                                    lastModified = lm;
                                }
                            }
                        }
                        if (PropertyUtil.isDirectRead()) {
                            configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
                        } else {
                            file = DiskUtil.targetTagFile(dataId, group, tenant, tag);
                        }
                        if (configInfoBase == null && fileNotExist(file)) {
                            // FIXME CacheItem
                            // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
                            ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,
                                    ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);
                            
                            // pullLog.info("[client-get] clientIp={}, {},
                            // no data",
                            // new Object[]{clientIp, groupKey});
                            
                            response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
                            return response;
                            
                        }
                    }
                }
                
                response.setMd5(md5);
                
                if (PropertyUtil.isDirectRead()) {
                    response.setLastModified(lastModified);
                    response.setContent(configInfoBase.getContent());
                    response.setResultCode(ResponseCode.SUCCESS.getCode());
                    
                } else {
                    //read from file
                    String content = null;
                    try {
                        content = readFileContent(file);
                        response.setContent(content);
                        response.setLastModified(lastModified);
                        response.setResultCode(ResponseCode.SUCCESS.getCode());
                    } catch (IOException e) {
                        response.setErrorInfo(ResponseCode.FAIL.getCode(), e.getMessage());
                        return response;
                    }
                    
                }
                
                LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, clientIp, md5, TimeUtils.getCurrentTimeStr());
                
                final long delayed = System.currentTimeMillis() - lastModified;
                
                // TODO distinguish pull-get && push-get
                /*
                 Otherwise, delayed cannot be used as the basis of push delay directly,
                 because the delayed value of active get requests is very large.
                 */
                ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
                        ConfigTraceService.PULL_EVENT_OK, notify ? delayed : -1, clientIp, notify);
                
            } finally {
                releaseConfigReadLock(groupKey);
            }
        } else if (lockResult == 0) {
            
            // FIXME CacheItem No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.
            ConfigTraceService
                    .logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1,
                            clientIp, notify);
            response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");
            
        } else {
            PULL_LOG.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);
            response.setErrorInfo(ConfigQueryResponse.CONFIG_QUERY_CONFLICT,
                    "requested file is being modified, please try later.");
        }
        return response;
    }

这一段代码很长,感觉写的也不是很好,很冗长,总结一下:

  1. 通过groupKey从ConfigCacheService.getContentCache(groupKey)获取cacheItem, 这里的cacheItem要等到pushConfig在看一下,具体用来做什么,只是做一些数据的暂存吗?
  2. 然后看了一下是否从数据读,如果是就从数据库读出配置,如果不是,就从文件中读取


PropertyUtil.isDirectRead

image.png

persistService.findConfigInfo

image.png

ConfigController

ConfigController.pushConfig

  @PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {

        // todo 2.2以上这里有个插件思想学一下,基本上也是spi的思想
        final String srcIp = RequestUtil.getRemoteIp(request);
        final String requestIpApp = RequestUtil.getAppName(request);
        srcUser = RequestUtil.getSrcUserName(request);
        //check type
        if (!ConfigType.isValidType(type)) {
            type = ConfigType.getDefaultType().getType();
        }
        // check tenant
        //todo  参数检查
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
        MapUtil.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
        MapUtil.putIfValNoNull(configAdvanceInfo, "desc", desc);
        MapUtil.putIfValNoNull(configAdvanceInfo, "use", use);
        MapUtil.putIfValNoNull(configAdvanceInfo, "effect", effect);
        MapUtil.putIfValNoNull(configAdvanceInfo, "type", type);
        MapUtil.putIfValNoNull(configAdvanceInfo, "schema", schema);
        ParamUtils.checkParam(configAdvanceInfo);
        
        if (AggrWhitelist.isAggrDataId(dataId)) {
            LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
                    dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }
        
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        // todo // 构造配置信息
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                // todo 由AsyncNotifyService处理
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                // todo 由AsyncNotifyService处理
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

一共做了这么几件事:

  1. persistsService.insertOrUpdate
  2. 发送ConfigDataChangeEvent,这个事件最终由AsyncNotifyService执行到

AsyncNotifyService.onEvent

image.png
这里做了这么几件事:

  1. memberManager.allMembers():获取到所有的服务节点,我这里因为是单机启动的,所以就一台

image.png

  1. 看对应的服务是否支持长链接,如果支持就放到rpcQueue中,如果不支持,就放到httpQueue,基本到这里就知道后面要做什么,肯定又是什么线程要消费这里的queue
  2. 将rpcQueue放到AsyncRpcTask里面去,然后用一个异步线程池调用它

其中ConfigExecutor:
image.png
image.png
不往下追了,就是一个简单的定时调度线程池,反正最终也是调度到AsyncRpcTask的run方法,这个更重要

其中AsyncRpcTask:
image.png
每一个AsyncRpcTask里面有一个队列,这个队列里面存放的是NotifySingleRpcTask:就是rpc通知任务

AsyncRpcTask.run

image.png
image.png
这里主要做了几件事:

  1. 从queue中获取通知任务,如果任务目标是当前节点,就直接调用dumpService.dump
  2. 如果是其他服务节点的话,判断是否支持长链接,如果支持就发送rpc请求,如果不支持就发送http请求

DumpService.dump

image.png
比较简单,将task放到TaskManager中,然后TaskManager异步执行

TaskManager

首先DumpService里面有一个TaskManager,这个类是继承NacosDelayTaskExecuteEngine�,这个类之前是详细分析过的,就是延迟执行的,核心逻辑就是有一个线程不断的从一个map中获取任务,
image.png
如果获取到任务,就看这个taskKey有没有对应的处理器,如果没有设置,那就找一个默认的,
image.png
刚好DumpService为TaskManager设置了一个默认的Processor,位置在DumpService的构造方法上
image.png

也就说说TaskManager.processTasks -> NacosDelayTaskExecuteEngine.processTasks ->DumpProcessor�.process方法

DumpProcessor.process

image.png
会调用到DumpConfigHandler

DumpConfigHandler.configDump

image.png
最终又调用到ConfigCacheService.dump

ConfigCacheService.dump

image.png

  1. 获取写锁
  2. 创建CacheItem
  3. 非单点 非数据库存储,需要保存到磁盘
  4. updateMD5:发送本地配置变更事件, 这个事件意义非凡,得好好看看

image.png
这里比较了一下md5值,如果不一样,就发送LocalDataChangeEvent事件,我们来看看这个事件又是谁处理的?PpcConfigChangeNotifier.onEvent(LocalDataChangeEvent)

RpcConfigChangeNotifier.onEvent

image.png
image.png
这里的大致逻辑就是:

  1. 判断groupKey对应有没有监听,也就是需不需要回调,如果有,会向客户端发送消息
  2. 通过clientId拿到客户端连接,然后就是往客户端发送Rpc ConfigChangeNofifyChangeRequest

image.png

之前我们在客户端源码里面讲过,在ensureRpcClient =>initRpcClientHandler里面会注册一个客户端处理服务端->客户端的handler
image.png
当客户端接受到notifyListenConfig,会往队列丢一个信号,客户端会马上执行,从服务端拉取最新的配置

ConfigClusterRpcClientProxy�.syncConfigChange

上面从3.4到3.8都是讲了当配置更新完毕,本机如何处理,那如果是其他Server或者客户端又该怎么处理那?
入口就是这个
image.png
也是是RpcClient发送请求

通过搜索ConfigChangeClusterSyncRequest�这个请求 找到具体的处理类是ConfigChangeClusterSyncRequestHandler�

ConfigChangeClusterSyncRequestHandler.handle

其他server通过这个handler.handle方法最终也是调用dumpService方法
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/372251.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Springboot简单设计两级缓存

两级缓存相比单纯使用远程缓存&#xff0c;具有什么优势呢&#xff1f; 本地缓存基于本地环境的内存&#xff0c;访问速度非常快&#xff0c;对于一些变更频率低、实时性要求低的数据&#xff0c;可以放在本地缓存中&#xff0c;提升访问速度 使用本地缓存能够减少和Redis类的远…

你知道网页采集工具吗?

一、网页采集器的定义和作用 网页采集器是一种自动化工具,用于从互联网上获取信息并将其保存到本地或远程数据库中。其作用在于帮助用户快速、自动地收集并整理网络上的信息,提高工作效率并且节省时间成本。网页采集器通过模拟人工浏览网页的行为,访问并提取目标网页的数据…

L1-037 A除以B-java

输入样例1&#xff1a; -1 2输出样例1&#xff1a; -1/2-0.50输入样例2&#xff1a; 1 -3输出样例2&#xff1a; 1/(-3)-0.33输入样例3&#xff1a; 5 0输出样例3&#xff1a; 5/0Error java import java.util.*; class Main{public static void main(String[] args){Sc…

机器学习中常用的性能度量—— ROC 和 AUC

什么是泛化能力&#xff1f; 通常我们用泛化能力来评判一个模型的好坏&#xff0c;通俗的说&#xff0c;泛化能力是指一个机器学期算法对新样本&#xff08;即模型没有见过的样本&#xff09;的举一反三的能力&#xff0c;也就是学以致用的能力。 举个例子&#xff0c;高三的…

BUUCTF-Real-[ThinkPHP]IN SQL INJECTION

目录 漏洞描述 漏洞分析 漏洞复现 漏洞描述 漏洞发现时间&#xff1a; 2018-09-04 CVE 参考&#xff1a;CVE-2018-16385 最高严重级别&#xff1a;低风险 受影响的系统&#xff1a;ThinkPHP < 5.1.23 漏洞描述&#xff1a; ThinkPHP是一款快速、兼容、简单的轻量级国产P…

Stable Diffusion 模型下载:ReV Animated

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十下载地址模型介绍 该模型能够创建 2.5D 类图像生成。此模型是检查点合并,这意味着它是其他模型的产物,以创建从原始模型派生的产品。 条目内容类型大模型

游戏视频录制软件推荐,打造专业电竞视频(3款)

随着游戏产业的快速发展&#xff0c;越来越多的玩家开始关注游戏视频录制软件。一款好的录制软件不仅可以帮助玩家记录游戏中的精彩瞬间&#xff0c;还可以让其与他人分享自己的游戏体验。接下来&#xff0c;我们将介绍三款热门的游戏视频录制软件&#xff0c;并对其进行详细的…

pwn学习笔记(2)

pwn学习笔记&#xff08;2&#xff09; 1.三种常见的寄存器&#xff1a; ​ ax寄存器&#xff1a;通用寄存器&#xff0c;可用于存放多种数据 ​ bp寄存器&#xff1a;存放的是栈帧的栈底地址 ​ sp寄存器&#xff1a;存放的是栈顶的地址 2.栈帧与栈工作的简介&#xff1a…

arping交叉编译

arping命令依赖libpcap和libnet&#xff0c;需要先交叉编译这两个库。 1.交叉编译libpcap 下载libpcap源文件&#xff0c;从github上克隆: git clone https://github.com/the-tcpdump-group/libpcap.git source交叉编译环境 # environment-setup是本机的交叉编译环境, 里面…

Centos7配置登录失败处理导致root被锁定处理办法

1、应用场景 root用户被系统锁定&#xff0c;无法登录系统。 2、问题现象 root锁定无法登录系统 3、原因 设置登录失败处理并对root用户生效&#xff0c;一直尝试错误的root密码或暴力破解root密码&#xff0c;导致无法自动解锁Linux的root账户 4、解决方案 1.将虚拟机开…

0 代码自动化测试:RF 框架实现企业级 UI 自动化测试

前言 现在大家去找工作&#xff0c;反馈回来的基本上自动化测试都是刚需&#xff01;没有自动化测试技能&#xff0c;纯手工测试基本没有什么市场。 但是很多人怕代码&#xff0c;觉得自动化测试就需要代码&#xff01;代码学习起来很难&#xff01; 当然代码学习不难&#xf…

重生奇迹MU如何挂机

1、重生奇迹MU觉醒哪里挂机经验多挂机收益最大化&#xff0c;在重生奇迹MU中玩家可以通过副本获得大量的经验和金币&#xff0c;甚至挂机也有不错的收益&#xff0c;对于玩家来说 2、卡利玛神庙、血色城堡、迷失之城、恶魔广场甚至是挂机自动刷怪&#xff0c;组队都会有经验加…

骑砍战团MOD开发(43)-顶点着色技术

一.顶点着色(vertex_color) 实际GPU渲染时有顶点着色和纹理着色两种方式,顶点着色消耗资源小,GPU将顶点颜色通过插值运算进行渲染.常用于同一物体的不同颜色渲染,如青苹果,红苹果,可以使用动态切换顶点颜色实现,而不通过设置纹理图片实现. Direct3D9中可声明灵活顶点格式 stru…

TorchVision的使用方法、更改默认路径

TorchVision的使用 1. 转换和增强图像 torchvision.transforms.v2 参数作用Resize将输入调整为给定大小RandomShortestSize随机调整输入的大小RandomResize随机调整输入的大小RandomCrop在随机位置裁剪输入RandomResizedCrop裁剪输入的随机部分并将其调整为给定大小RandomIoU…

为什么PCB地与金属机壳用阻容连接?

笔者电子信息专业硕士毕业&#xff0c;获得过多次电子设计大赛、大学生智能车、数学建模国奖&#xff0c;现就职于南京某半导体芯片公司&#xff0c;从事硬件研发&#xff0c;电路设计研究。对于学电子的小伙伴&#xff0c;深知入门的不易&#xff0c;特开次博客交流分享经验&a…

Java的值传递与“引用传递”辨析

目录 Java的值传递与“引用传递”辨析1. 传递方式概述2. 值传递示例3. “引用传递”示例4. 值传递与"引用传递"的实际应用5. 总结&#xff1a;java只有值传递 Java的值传递与“引用传递”辨析 欢迎来到本博客&#xff0c;今天我们将深入研究Java中是值传递还是引用传…

【文件增量备份系统】前端项目构建

文章目录 创建项目安装项目依赖引入element plus组件下载组件在main.js中使用组件测试 整合路由router下载组件创建路由管理器index.js使用路由App.vue上面使用 <router-view />测试 整合axios下载组件工具类axiosRequest.js工具类使用 创建项目 damwangrunqindeMBP dev…

蓝桥杯刷题day06——平均

1、题目描述 有一个长度为n 的数组&#xff08;n 是 10 的倍数&#xff09;&#xff0c;每个数ai都是区间 [0,9] 中的整数。 小明发现数组里每种数出现的次数不太平均&#xff0c;而更改第i 个数的代价为bi&#xff0c; 他想更改若干个数的值使得这10 种数出现的次数相等&…

论文阅读-一个用于云计算中自我优化的通用工作负载预测框架

论文标题&#xff1a;A Self-Optimized Generic Workload Prediction Framework for Cloud Computing 概述 准确地预测未来的工作负载&#xff0c;如作业到达率和用户请求率&#xff0c;对于云计算中的资源管理和弹性非常关键。然而&#xff0c;设计一个通用的工作负载预测器…

软考 系统分析师系列知识点之信息系统战略规划方法(1)

所属章节&#xff1a; 第7章. 企业信息化战略与实施 第4节. 信息系统战略规划方法 信息系统战略规划&#xff08;Information System Strategic Planning&#xff0c;ISSP&#xff09;是从企业战略出发&#xff0c;构建企业基本的信息架构&#xff0c;对企业内、外信息资源进行…