Nacos2.X 配置中心源码分析:客户端如何拉取配置、服务端配置发布客户端监听机制

文章目录

  • Nacos配置中心源码
    • 总流程图
    • NacosClient源码分析
      • 获取配置
      • 注册监听器
    • NacosServer源码分析
      • 配置dump
      • 配置发布

Nacos配置中心源码

总流程图

Nacos2.1.0源码分析在线流程图

在这里插入图片描述

源码的版本为2.1.0 ,并在配置了下面两个启动参数,一个表示单机启动,一个是指定nacos的工作目录,其中会存放各种运行文件方便查看

-Dnacos.standalone=true
-Dnacos.home=D:\nacos-cluster\nacos2.1.0standalone



NacosClient源码分析

在NacosClient端服务注册中心核心的接口是NamingService,而配置中心核心的接口是ConfigService

我们可以添加一个配置,然后查看这里的实例代码

在这里插入图片描述

/*
* Demo for Nacos
* pom.xml
    <dependency>
        <groupId>com.alibaba.nacos</groupId>
        <artifactId>nacos-client</artifactId>
        <version>${version}</version>
    </dependency>
*/
package com.alibaba.nacos.example;

import java.util.Properties;
import java.util.concurrent.Executor;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;

/**
 * Config service example
 *
 * @author Nacos
 *
 */
public class ConfigExample {

	public static void main(String[] args) throws NacosException, InterruptedException {
		String serverAddr = "localhost";
		String dataId = "nacos-config-demo.yaml";
		String group = "DEFAULT_GROUP";
		Properties properties = new Properties();
		properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        //获取配置服务
		ConfigService configService = NacosFactory.createConfigService(properties);
        //获取配置
		String content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);
        //注册监听器
		configService.addListener(dataId, group, new Listener() {
			@Override
			public void receiveConfigInfo(String configInfo) {
				System.out.println("recieve:" + configInfo);
			}

			@Override
			public Executor getExecutor() {
				return null;
			}
		});

        //发布配置
		//boolean isPublishOk = configService.publishConfig(dataId, group, "content");
		//System.out.println(isPublishOk);
        //发送properties格式
        configService.publishConfig(dataId,group,"common.age=30", ConfigType.PROPERTIES.getType());

		Thread.sleep(3000);
		content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);

/*		boolean isRemoveOk = configService.removeConfig(dataId, group);
		System.out.println(isRemoveOk);
		Thread.sleep(3000);

		content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);
		Thread.sleep(300000);*/

	}
}



获取配置

总结:

获取配置的主要方法是 NacosConfigService 类的 getConfig 方法,通常情况下该方法直接从本地文件中取得配置的值,如果本地文件不存在或者内容为空,则再通过grpc从远端拉取配置,并保存到本地快照中。

NacosServer端的处理是从磁盘读取配置文件./nacosHome/data/config-data/DEFAULT_GROUP/dataId,然后将读取到的content返回

在这里插入图片描述



接下来的源码就是这里一块的流程,它是如何调用到NacosConfigService 类的 getConfig ()方法

public interface ConfigService {
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    ......
}

在这里插入图片描述


还是一样,从spring.factiries文件中起步

在这里插入图片描述



进入到NacosConfigBootstrapConfiguration自动配置类的,这其中会创建一个NacosPropertySourceLocatorbean对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(
    name = {"spring.cloud.nacos.config.enabled"},
    matchIfMissing = true
)
public class NacosConfigBootstrapConfiguration {
    public NacosConfigBootstrapConfiguration() {
    }

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigProperties nacosConfigProperties() {
        return new NacosConfigProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {
        return new NacosConfigManager(nacosConfigProperties);
    }

    // 核心bean
    @Bean
    public NacosPropertySourceLocator nacosPropertySourceLocator(NacosConfigManager nacosConfigManager) {
        return new NacosPropertySourceLocator(nacosConfigManager);
    }

    @Bean
    @ConditionalOnMissingBean(
        search = SearchStrategy.CURRENT
    )
    @ConditionalOnNonDefaultBehavior
    public ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans) {
        return new SmartConfigurationPropertiesRebinder(beans);
    }
}




NacosPropertySourceLocator这个bean中,它的接口中的默认方法会调用locate()方法:

  • 加载共享配置文件,也就是shared-configs配置项指定的数组

  • 加载加载扩展的配置文件,也就是extension-configs配置项指定的数组

  • 加载和应用名相关的几个默认配置文件,比如order-service-dev.yml

  • 上面三个方法中都会各自调用到loadNacosDataIfPresent() --> loadNacosPropertySource(...) --> NacosPropertySourceBuilder.build()

public class NacosPropertySourceLocator implements PropertySourceLocator {
    
    public PropertySource<?> locate(Environment env) {
        this.nacosConfigProperties.setEnvironment(env);
        // 获取配置中心服务ConfigService
        ConfigService configService = this.nacosConfigManager.getConfigService();
        if (null == configService) {
            log.warn("no instance of config service found, can't load config from nacos");
            return null;
        } else {
            long timeout = (long)this.nacosConfigProperties.getTimeout();
            this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
            String name = this.nacosConfigProperties.getName();
            String dataIdPrefix = this.nacosConfigProperties.getPrefix();
            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = name;
            }

            if (StringUtils.isEmpty(dataIdPrefix)) {
                dataIdPrefix = env.getProperty("spring.application.name");
            }

            CompositePropertySource composite = new CompositePropertySource("NACOS");
            // 加载共享配置文件
            this.loadSharedConfiguration(composite);
            // 加载扩展的配置文件
            this.loadExtConfiguration(composite);
            // 加载当前应用配置文件
            // 在该方法中会进行下面三行的逻辑
            /*
            dataIdPrefix
            dataIdPrefix + "." + fileExtension
            dataIdPrefix + "-" + profile + "." + fileExtension
            */
            this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
            return composite;
        }
    }
    
    // 可以详细看一下,NacosClient启动时,是怎么根据微服务名去取配置文件的
    private void loadApplicationConfiguration() {
        String fileExtension = properties.getFileExtension();
        String nacosGroup = properties.getGroup();
        // 最先使用微服务名 调用下面的loadNacosDataIfPresent()方法
        this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);
        // 接下来是使用微服务名+文件后缀名的方式 调用下面的loadNacosDataIfPresent()方法
      this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);
        String[] var7 = environment.getActiveProfiles();
        int var8 = var7.length;

        for(int var9 = 0; var9 < var8; ++var9) {
            String profile = var7[var9];
            // 第三次使用 微服务名+profile + 文件后缀名 调用下面的loadNacosDataIfPresent()方法
            String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;
            this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);
        }

    }
    
    // 在上面三个加载共享、扩展、当前应用名方法中,最终都会调用到下面的loadNacosDataIfPresent(...) 方法中
    private void loadNacosDataIfPresent(...) {
        if (null != dataId && dataId.trim().length() >= 1) {
            if (null != group && group.trim().length() >= 1) {
                // 调用loadNacosPropertySource()方法
                NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);
                this.addFirstPropertySource(composite, propertySource, false);
            }
        }
    }
    
    // loadNacosDataIfPresent(...) ---> loadNacosPropertySource(...)
    private NacosPropertySource loadNacosPropertySource(...) {
        return NacosContextRefresher.getRefreshCount() != 0L && !isRefreshable ? 	
            			NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : 
        				// 这里会进入到NacosPropertySourceBuilder类的build方法
        				this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
    }
    
}




NacosPropertySourceBuilder类的代码调用流程:

  • 这里就会调用到核心接口configService接口实现类的getConfig()方法
public class NacosPropertySourceBuilder {
    ......

    NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
        // 这里会先调用loadNacosData()方法
        List<PropertySource<?>> propertySources = this.loadNacosData(dataId, group, fileExtension);
        NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);
        NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);
        return nacosPropertySource;
    }
    
    // build(...) ---> loadNacosData(...)
    private List<PropertySource<?>> loadNacosData(String dataId, String group, String fileExtension) {
        String data = null;

        try {
            // 这里进入到了configService接口实现类的getConfig()方法
            data = this.configService.getConfig(dataId, group, this.timeout);
            if (StringUtils.isEmpty(data)) {
                log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group);
                return Collections.emptyList();
            }

            if (log.isDebugEnabled()) {
                log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data));
            }

            return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension);
        } catch (NacosException var6) {
            log.error("get data from Nacos error,dataId:{} ", dataId, var6);
        } catch (Exception var7) {
            log.error("parse data from Nacos error,dataId:{},data:{}", new Object[]{dataId, data, var7});
        }

        return Collections.emptyList();
    }
    
}



核心方法,NacosClient向NacosServer发送请求,拉取配置的方法。

前面的调用栈如果不会,可以直接在下面getConfig()方法出打一个断点,然后从debug中看调用栈。方法具体的实现:

  • NacosClient端,这里首先会读取本地文件,本地是有一个缓存的
  • 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了
  • 发送请求,获取响应数据
  • 将数据在本地文件中缓存一份
// 只是方法调用
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
    return getConfigInner(namespace, dataId, group, timeoutMs);
}


private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = blank2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();

    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // use local config first
    // NacosClient端,这里首先会读取本地文件,本地是有一个缓存的
    String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
    if (content != null) {
        LOGGER.warn(..);
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
            .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

    // 如果本地缓存中没有我们需要的配置,那么就需要从NacosServer端拉取配置了
    try {
        // 从服务端获取配置
        ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
        cr.setContent(response.getContent());
        cr.setEncryptedDataKey(response.getEncryptedDataKey());
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();

        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        LOGGER.warn(..);
    }

    LOGGER.warn(..);
    // 再从本地文件缓存中找
    content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);

    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor
        .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
}



//--------------------------
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)
    throws NacosException {
    // 默认组名
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    // 从服务端查询配置
    return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
}

// 向NacosServer发送请求,拉取配置数据,并在本地文件中缓存一份
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
    throws NacosException {
    ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
    request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
    RpcClient rpcClient = getOneRunningClient();
    if (notify) {
        CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
        if (cacheData != null) {
            rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
        }
    }
    // 发送请求,获取响应数据
    ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);

    ConfigResponse configResponse = new ConfigResponse();
    if (response.isSuccess()) {
        // 将数据在本地文件中缓存一份
        LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
        configResponse.setContent(response.getContent());
        String configType;
        if (StringUtils.isNotBlank(response.getContentType())) {
            configType = response.getContentType();
        } else {
            configType = ConfigType.TEXT.getType();
        }
        configResponse.setConfigType(configType);
        String encryptedDataKey = response.getEncryptedDataKey();
        LocalEncryptedDataKeyProcessor
            .saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
        configResponse.setEncryptedDataKey(encryptedDataKey);
        return configResponse;
    } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_NOT_FOUND) {
        LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
        LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);
        return configResponse;
    } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT) {
        ...

    }
}



注册监听器

结论:

  • 从spring.facotries文件中开始,其中一个bean会监听spring容器启动完成的事件
  • 然后它会为当前应用添加监听器:遍历每个dataId,添加监听器。
  • 当nacosServer端更改了配置,这里监听器中的方法就会运行,这里都会发布一个RefreshEvent事件
  • 处理RefreshEvent事件的方法中会
    • 刷新环境变量
    • 销毁@RefreshScope注解修改的bean实例




NacosServer端如果修改了配置,就会发布一个事件,而在NacosClient端这边就会有一个EventListener去监听该事件并进行相应的处理。

ConfigService接口中,有三个和监听器相关的方法

public interface ConfigService {

    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
        throws NacosException;

    void addListener(String dataId, String group, Listener listener) throws NacosException;

    void removeListener(String dataId, String group, Listener listener);

}

在这里插入图片描述



接下来进入源码中,入口是NacosConfigAutoConfiguration自动配置的NacosContextRefresherbean 对象

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(
    name = {"spring.cloud.nacos.config.enabled"},
    matchIfMissing = true
)
public class NacosConfigAutoConfiguration {
	...

    @Bean
    public NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {
        return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
    }
    
	...
}

该类它监听了ApplicationReadyEvent事件,

  • 在spring容器启动完成后就会调用该类的onApplicationEvent()方法

  • 给当前应用注册nacos监听器

  • 为每个 dataId注册监听器

  • 当某个dataId发生了更改,这里都会发布一个RefreshEvent事件

public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent> , ApplicationContextAware {
 
    // 在spring容器启动完成后就会调用该类的onApplicationEvent()方法
     public void onApplicationEvent(ApplicationReadyEvent event) {
        if (this.ready.compareAndSet(false, true)) {
            // 给当前应用注册nacos监听器
            this.registerNacosListenersForApplications();
        }
    }
    
    
    
    // 给当前应用注册nacos监听器
    private void registerNacosListenersForApplications() {
        // 是否刷新配置,默认为true
        if (this.isRefreshEnabled()) {
            Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
			// 遍历每个dataId
            while(var1.hasNext()) {
                NacosPropertySource propertySource = (NacosPropertySource)var1.next();
                if (propertySource.isRefreshable()) {
                    String dataId = propertySource.getDataId();
                    // 为每个 dataId注册监听器
                    this.registerNacosListener(propertySource.getGroup(), dataId);
                }
            }
        }
    }
    
    
    
    // 为每个 dataId注册监听器
    private void registerNacosListener(final String groupKey, final String dataKey) {
        String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
        // 定义一个监听器
        Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {
            return new AbstractSharedListener() {
                public void innerReceive(String dataId, String group, String configInfo) {
                    NacosContextRefresher.refreshCountIncrement();
                    // 配置的历史记录
                    NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                    // 发布一个RefreshEvent事件,会在处理该事件的位置真正进行刷新配置项
                    NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "..."));

                }
            };
        });

        try {
            
            // 调用configService接口的addListener()添加监听器
            this.configService.addListener(dataKey, groupKey, listener);
        } catch (NacosException var6) {
            log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);
        }
    }
}



当NacosServer端某个配置文件改动后,就会回调上面监听器的innerReceive()方法,在该方法中就会发布RefreshEvent事件,处理该事件的是RefreshEventListener类中的onApplicationEvent()方法:

  • 直接调用refresh()方法
public class RefreshEventListener implements SmartApplicationListener {
	...

    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ApplicationReadyEvent) {
            this.handle((ApplicationReadyEvent)event);
        } else if (event instanceof RefreshEvent) {
            // 处理RefreshEvent事件,调用handler()方法
            this.handle((RefreshEvent)event);
        }

    }

    public void handle(ApplicationReadyEvent event) {
        this.ready.compareAndSet(false, true);
    }

    public void handle(RefreshEvent event) {
        if (this.ready.get()) {
            // 这里就会调用refresh()方法进行刷新
            Set<String> keys = this.refresh.refresh();
        }

    }
}




接下来就进入到了ContextRefresher类的refresh()方法:

  • 刷新环境变量
  • 销毁@RefreshScope注解修改的bean实例
public synchronized Set<String> refresh() {
    // 刷新环境变量
    Set<String> keys = this.refreshEnvironment();
    // 销毁@RefreshScope注解修改的bean实例
    this.scope.refreshAll();
    return keys;
}



NacosServer源码分析

配置dump

服务端启动时就会依赖 DumpService 的 init 方法,从数据库中 load 配置存储在本地磁盘上,并将一些重要的元信息例如 MD5 值缓存在内存中。服务端会根据心跳文件中保存的最后一次心跳时间,来判断到底是从数据库 dump 全量配置数据还是部分增量配置数据(如果机器上次心跳间隔是 6h 以内的话)。

全量 dump 当然先清空磁盘缓存,然后根据主键 ID 每次捞取一千条配置刷进磁盘和内存。增量 dump 就是捞取最近六小时的新增配置(包括更新的和删除的),先按照这批数据刷新一遍内存和文件,再根据内存里所有的数据全量去比对一遍数据库,如果有改变的再同步一次,相比于全量 dump 的话会减少一定的数据库 IO 和磁盘 IO 次数。



配置发布

在这里插入图片描述


结论:

  • 更改数据库中的数据,持久化信息到mysql

  • 触发一个ConfigDataChangeEvent事件。至此请求结束。

  • 接下来就处理上面的事件:

    • 遍历Nacos集群下的所有节点,包括自己

    • 生成一个http/rpc的任务对象去执行,这里就直接看rpc任务对象的处理

    • 判断是不是当前节点,如果是就调用dump()方法去处理

      • 将更改的数据保存至本地磁盘中

      • 生成md5,并通过一个key将md5存入cache中,再发布一个LocalDataChangeEvent事件,该事件存了key

        处理上方事件的方法中会开启一个任务,在任务的run()方法中会真正调用客户端发送grpc请求,发送一个ConfigChangeNotifyRequest请求对象

    • 如果不是当前节点就发送grpc请求为其他节点同步修改配置项



NacosClient端的处理

  • 接收到ConfigChangeNotifyRequest请求对象,然后就放入了一个阻塞队列中。
  • 客户端while死循环,队列中有任务了/每隔5s 从队列中获取任务/null,去执行配置监听器方法
  • 根据CacheData对象远程获取配置内容,进行md5的比较
  • 如果有变化就通知监听器去处理,这就回到了nacosClient端获取配置中的流程了



我们接下来分析,在NacosServer端修改了配置,点击发布配置,NacosClient怎么就能接收到是哪一个dataId修改了嘞

发布配置官方接口文档

这里实际上是调用的NacosServer的/nacos/v2/cs/config接口,处理该请求的是ConfigController.publishConfig()方法

在这一次请求中其实就是做了两件事:将更新写入数据库中,然后发布一个事件,将事件添加进队列中,此时请求就结束了。

在controller方法中有两行核心的方法

// 进入service层,核心方法
// 持久化配置信息到数据库
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);

// 触发ConfigDataChangeEvent事件,这是客户端能感知配置更新的根本原因
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));




持久化配置信息到数据库就没必须继续看下去了,我们接下来看看notifyConfigChange()方法的实现:

  • 该方法就是单纯的一层一层方法调用
public class ConfigChangePublisher {

    public static void notifyConfigChange(ConfigDataChangeEvent event) {
        if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
            return;
        }
        // 该方法继续调用
        NotifyCenter.publishEvent(event);
    }
}

public static boolean publishEvent(final Event event) {
    try {
        // 该方法继续调用
        return publishEvent(event.getClass(), event);
    } catch (Throwable ex) {
        LOGGER.error("There was an exception to the message publishing : ", ex);
        return false;
    }
}

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
    if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        return INSTANCE.sharePublisher.publish(event);
    }

    final String topic = ClassUtils.getCanonicalName(eventType);

    EventPublisher publisher = INSTANCE.publisherMap.get(topic);
    if (publisher != null) {
        // 该方法继续调用
        return publisher.publish(event);
    }
    return false;
}



这里就会进入到DefaultPublisher类的publish(event)方法中。该类非常重要,Nacos很多功能都用的这统一的一套事件发布与订阅。

public boolean publish(Event event) {
    checkIsStart();
    // 如果队列中写满了,那么就返回false,下面就直接处理了
    // 该类的run()方法中会死循环从队列中取任务执行
    boolean success = this.queue.offer(event);
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        // 处理事件
        receiveEvent(event);
        return true;
    }
    return true;
}




此时,本次http请求就已经结束了,这里将事件放入队列中后就会有其他的订阅者来异步处理事件。

这样的设计也实现了发布任务与处理任务之间的解耦

此时队列中有了任务,在NacosServer中任务订阅者此时还需要做两件事:

  • 通知集群其他Nacos节点进行更新
  • 通知NacosClient端配置发生了更改
public void notifySubscriber(final Subscriber subscriber, final Event event) {
    
    LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

    // 订阅者需要去处理事件
    // 主要做两件事 通知集群其他Nacos节点进行更新、通知NacosClient端配置发生了更改
    final Runnable job = () -> subscriber.onEvent(event);
    final Executor executor = subscriber.executor();
    
    if (executor != null) {
        executor.execute(job);
    } else {
        try {
            job.run();
        } catch (Throwable e) {
            LOGGER.error("Event callback exception: ", e);
        }
    }
}




这里会进入到AsyncNotifyService的构造方法中:

  • 遍历集群环境下的所有节点
  • 创建任务添加进http/grpc的队列中
  • 从http/grpc的队列中取任务执行
public AsyncNotifyService(ServerMemberManager memberManager) {
        ...
        
        // Register A Subscriber to subscribe ConfigDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            @Override
            public void onEvent(Event event) {
                // Generate ConfigDataChangeEvent concurrently
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    // 获取nacos集群下的各个节点
                    Collection<Member> ipList = memberManager.allMembers();
                    
                    // In fact, any type of queue here can be
                    Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>();
                    Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    
                    for (Member member : ipList) {
                        // 使用http/rpc的方式通知各节点,具体的dataId被修改了
                        // 这里先添加进队列,下面的if中处理
                        if (!MemberUtil.isSupportedLongCon(member)) {
                            httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                    evt.isBeta));
                        } else {
                            rpcQueue.add(
                                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                        }
                    }
                    // 处理队列中的任务
                    if (!httpQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                    }
                    if (!rpcQueue.isEmpty()) {
                        // 直接看AsyncRpcTask类中的run()方法
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                    
                }
            }
            
            ......
        });
    }




我们这里是nacos2.X的版本,所以我这里就自己看AsyncRpcTask类的run()方法:

  • 调用dump()方法
  • 发送请求,同步其他节点数据变化
public void run() {
    while (!queue.isEmpty()) {
        NotifySingleRpcTask task = queue.poll();

        ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
        syncRequest.setDataId(task.getDataId());
        syncRequest.setGroup(task.getGroup());
        syncRequest.setBeta(task.isBeta);
        syncRequest.setLastModified(task.getLastModified());
        syncRequest.setTag(task.tag);
        syncRequest.setTenant(task.getTenant());
        Member member = task.member;
        // 判断member是不是当前节点
        if (memberManager.getSelf().equals(member)) {
            // 如果是当前节点就直接调用dump()方法
            // 这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中,这里最终是会到DumpProcessor.process()方法
            if (syncRequest.isBeta()) {
                dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                 syncRequest.getLastModified(), NetUtils.localIP(), true);
            } else {
                dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                 syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
            }
            continue;
        }

        // 其他节点
        if (memberManager.hasMember(member.getAddress())) {
            boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
            if (unHealthNeedDelay) {
                ...
            } else {

                if (!MemberUtil.isSupportedLongCon(member)) {
                    asyncTaskExecute(
                        new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                             task.getLastModified(), member.getAddress(), task.isBeta));
                } else {
                    try {
                        // 为nacos集群中的其他节点进行同步配置变化
                        configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                    } catch (Exception e) {
                        MetricsMonitor.getConfigNotifyException().increment();
                        asyncTaskExecute(task);
                    }
                }

            }
        } else {
            //No nothig if  member has offline.
        }

    }
}



dump()这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中:先将task存入一个队列中 --> 去队列中的任务 --> 各自的任务处理类去处理。这里最终是会到DumpProcessor.process()方法:

  • 方法调用process() --> configDump() —>dump()
  • 将配置保存在磁盘文件中
  • 配置发生变化,更新md5
  • 发布LocalDataChangeEvent事件
    目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()
public boolean process(NacosTask task) {
    ...
        // 直接看这里最后的configDump()方法
        return DumpConfigHandler.configDump(build.build());
}



public static boolean configDump(ConfigDumpEvent event){
    ......
        if (StringUtils.isBlank(event.getTag())) {
            ......

                boolean result;
            if (!event.isRemove()) {
                // 核心方法,进入到这里
                // 写入磁盘
                result = ConfigCacheService
                    .dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);

                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
                                                    ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                                                    content.length());
                }
            } else {
                ......
            }
            return result;
        } else {
            .......

        }

    public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
                               String type, String encryptedDataKey) {
        ......

        try {
            // 根据content配置内容生成一个md5。content中的内容有变化那么生成的md5也肯定是不一样的
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            // 配置的最后一次更新时间
            if (lastModifiedTs < ConfigCacheService.getLastModifiedTs(groupKey)) {
                DUMP_LOG.warn(...);
                return true;
            }
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) && DiskUtil.targetFile(dataId, group, tenant).exists()) {
                DUMP_LOG.warn(...);
            } else if (!PropertyUtil.isDirectRead()) {
                // 将配置保存在磁盘文件中
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            // 配置发生变化,更新md5
            // 继续跟入该方法
            updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);
            return true;
        } catch (IOException ioe) {
            ......
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }
    
    
    public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {
        // 根据groupKey,将md5数据保存在缓存中
        CacheItem cache = makeSure(groupKey, encryptedDataKey, false);
        if (cache.md5 == null || !cache.md5.equals(md5)) {
            cache.md5 = md5;
            cache.lastModifiedTs = lastModifiedTs;
            // 发布LocalDataChangeEvent事件,包含groupKey
            // 目的告诉NacosClient端,配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()
            NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
        }
    }



接下来看看RpcConfigChangeNotifier.onEvent()的方法逻辑:

  • 遍历各个客户端
  • 发送grpc请求
@Override
public void onEvent(LocalDataChangeEvent event) {
    ...
    configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);

}


public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
                              List<String> betaIps, String tag) {

    // 其实就代表着Client集合
    Set<String> listeners = configChangeListenContext.getListeners(groupKey);
    if (CollectionUtils.isEmpty(listeners)) {
        return;
    }
    int notifyClientCount = 0;
    // 遍历各个客户端
    for (final String client : listeners) {
        Connection connection = connectionManager.getConnection(client);
        if (connection == null) {
            continue;
        }

        ConnectionMeta metaInfo = connection.getMetaInfo();
        //一些检查校验
        String clientIp = metaInfo.getClientIp();
        String clientTag = metaInfo.getTag();
        if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {
            continue;
        }
        if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {
            continue;
        }

        // 封装一个请求对象ConfigChangeNotifyRequest
        ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);

        // 创建rpc推送任务,在RpcPushTask.run()方法中推送客户端
        RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());
        push(rpcPushRetryTask);
        notifyClientCount++;
    }
    
}



public void run() {
    tryTimes++;
    if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {
        push(this);
    } else {
        // 这里推送客户端,客户端再进行refresh操作
        rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
            @Override
            public void onSuccess() {
                tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
            }

            @Override
            public void onFail(Throwable e) {
                tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
                Loggers.REMOTE_PUSH.warn("Push fail", e);
                push(RpcPushTask.this);
            }

        }, ConfigExecutor.getClientConfigNotifierServiceExecutor());

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/787353.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

源码编译安装 LAMP

源码编译安装 LAMP Apache 网站服务基础Apache 简介安装 httpd 服务器 httpd 服务器的基本配置Web 站点的部署过程httpd.conf 配置文件 构建虚拟 Web 主机基于域名的虚拟主机基于IP 地址、基于端口的虚拟主机 MySQL 的编译安装构建 PHP 运行环境安装PHP软件包设置 LAMP 组件环境…

数据挖掘——matplotlib

matplotlib概述 Mat指的是Matlab&#xff0c;plot指的是画图&#xff0c;lib即library&#xff0c;顾名思义&#xff0c;matplotlib是python专门用于开发2D图表的第三方库&#xff0c;使用之前需要下载该库&#xff0c;使用pip命令即可下载。 pip install matplotlib1、matpl…

Nuxt框架中内置组件详解及使用指南(四)

title: Nuxt框架中内置组件详解及使用指南&#xff08;四&#xff09; date: 2024/7/9 updated: 2024/7/9 author: cmdragon excerpt: 摘要&#xff1a;本文详细介绍了Nuxt 3框架中的两个内置组件&#xff1a;和的使用方法与示例。用于捕获并处理客户端错误&#xff0c;提供…

【漏洞复现】29网课交单平台 SQL注入

声明&#xff1a;本文档或演示材料仅用于教育和教学目的。如果任何个人或组织利用本文档中的信息进行非法活动&#xff0c;将与本文档的作者或发布者无关。 一、漏洞描述 29网课交单平台是一个在线学习平台&#xff0c;用于帮助学生完成网络课程的学习任务。这个平台提供了包括…

过滤器与拦截器区别、应用场景介绍

我们在进行 Web 应用开发时&#xff0c;时常需要对请求进行拦截或处理&#xff0c;故 Spring 为我们提供了过滤器和拦截器来应对这种情况。 那么两者之间有什么不同呢&#xff1f;本文将详细讲解两者的区别和对应的使用场景。 过滤器 过滤器是一种在 Java Web 应用中用于处理…

Celery,一个实时处理的 Python 分布式系统

大家好&#xff01;我是爱摸鱼的小鸿&#xff0c;关注我&#xff0c;收看每期的编程干货。 一个简单的库&#xff0c;也许能够开启我们的智慧之门&#xff0c; 一个普通的方法&#xff0c;也许能在危急时刻挽救我们于水深火热&#xff0c; 一个新颖的思维方式&#xff0c;也许能…

Start LoongArch64 Alpine Linux VM on x86_64

一、Build from source(build on x86_64) Obtain the latest libvirt, virt manager, and QEMU source code, compile and install them 1.1 Build libvirt from source sudo apt-get update sudo apt-get install augeas-tools bash-completion debhelper-compat dh-apparmo…

Python学习笔记33:进阶篇(二十二)pygame的使用之image模块

前言 基础模块的知识通过这么长时间的学习已经有所了解&#xff0c;更加深入的话需要通过完成各种项目&#xff0c;在这个过程中逐渐学习&#xff0c;成长。 我们的下一步目标是完成python crash course中的外星人入侵项目&#xff0c;这是一个2D游戏项目。在这之前&#xff…

Codeforces Round 954 (Div. 3) F. Non-academic Problem

思路&#xff1a;考虑缩点&#xff0c;因为是无向图&#xff0c;所以双连通分量缩完点后是一棵树&#xff0c;我们去枚举删除每一条树边的答案&#xff0c;然后取最小值即可。 #include <bits/stdc.h>using namespace std; const int N 3e5 5; typedef long long ll; …

Profibus转ModbusTCP网关模块实现Profibus_DP向ModbusTCP转换

Profibus和ModbusTCP是工业控制自动化常用的二种通信协议。Profibus是一种串口通信协议&#xff0c;它提供了迅速靠谱的数据传输和各种拓扑结构&#xff0c;如总线和星型构造。Profibus可以和感应器、执行器、PLC等各类设备进行通信。 ModbusTCP是一种基于TCP/IP协议的通信协议…

Clickhouse的联合索引

Clickhouse 有了单独的键索引&#xff0c;为什么还需要有联合索引呢&#xff1f;了解过mysql的兄弟们应该都知道这个事。 对sql比较熟悉的兄弟们估计看见这个联合索引心里大概有点数了&#xff0c;不过clickhouse的联合索引相比mysql的又有些不一样了&#xff0c;mysql 很遵循最…

Springboot各个版本维护时间

Springboot各个版本维护时间

【 正己化人】 把自己做好,能解决所有问题

阳明先生说&#xff1a;与朋友一起辩论学问&#xff0c;纵然有人言辞观点浅近粗疏&#xff0c;或者是炫耀才华、显扬自己&#xff0c;也都不过是毛病发作。只要去对症下药就好&#xff0c;千万不能怀有轻视别人的心理&#xff0c;因为那不是君子与人为善的心。 人会爱发脾气、…

微信服务里底部的不常用功能如何优化的数据分析思路

图片.png 昨天下午茶时光&#xff0c;和闺蜜偶然聊起&#xff0c;其实在微信服务底部&#xff0c;有很多被我们忽略遗忘&#xff0c;很少点过用过的功能服务&#xff0c;往往进入服务只为了收付款或进入钱包&#xff0c;用完就走了&#xff0c;很少拉到底部&#xff0c;看到和用…

Python函数 之 函数基础

print() 在控制台输出 input() 获取控制台输⼊的内容 type() 获取变量的数据类型 len() 获取容器的⻓度 (元素的个数) range() ⽣成⼀个序列[0, n) 以上都是我们学过的函数&#xff0c;函数可以实现⼀个特定的功能。我们将学习⾃⼰如何定义函数, 实现特定的功能。 1.函数是什么…

LiveNVR监控流媒体Onvif/RTSP用户手册-录像计划:批量配置、单个配置、录像保存(天)、配置时间段录像

TOC 1、录像计划 支持单个通道 或是 通道范围内配置支持快速滑选支持录像时间段配置 1.1、录像存储位置如何配置&#xff1f; 2、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 支持 Windows Linux 及其它CPU架构&#xff08;国产、嵌入式…&#xff09;操作系统安装包下载 、 安装…

亚马逊跟卖采集选品,2小时自动检索3000条商品数据与...

自动查商标局2个小时2928条数据。 ERP采集3000条数据需要多久&#xff1f;10&#xff1a;34开始的&#xff0c;12&#xff1a;52分&#xff0c;应该是两个小时多。采集3000条数据&#xff0c;2928条&#xff0c;平均每个就是3秒左右。 可以看一下采集出来的数据&#xff0c;打…

【C++知识点总结全系列 (08)】:面向对象编程OOP

这里写目录标题 1、OOP概述(1)面向对象四大特征A.抽象B.封装C.继承D.多态 (2)构造函数A.What&#xff08;什么是构造函数&#xff09;B.Why&#xff08;构造函数的作用&#xff09;C. Which&#xff08;有哪些构造函数&#xff09; (3)析构函数A.What&#xff08;什么是析构函数…

Python基础知识——(003)

文章目录 P12——11. 保留字和标识符 1. 保留字 2. Python标识符的命名规则&#xff08;必须遵守&#xff09; 3. Python标识符的命名规范&#xff08;建议遵守&#xff09; P13——12. 变量与常量 变量的语法结构 变量命名应遵循以下几条规则 常量 P14——13. 数值类型…

数据结构作业/2024/7/9

2>实现双向循环链表的创建、判空、尾插、遍历、尾删、销毁 fun.c #include "head.h" //1.双向循环链表的创建 doubleloop_ptr create_list() …