Sentinel规则持久化Push模式两种实现方式

文章目录

  • sentinel持久化push推模式
    • 微服务端的实现
      • 具体实现
      • 源码分析
        • 读数据源
        • 写数据源的实现
      • 微服务端解析读数据源流程
    • 修改源码的实现
      • 官方demo
      • 修改源码实现
        • 配置类
        • flow
        • authority
        • degread
        • param
        • system
        • gateway
        • 修改源码
      • 测试
      • 补充


前置知识 pull模式


sentinel持久化push推模式

pull拉模式的缺点,以保存本地文件举例:

  • 定时任务是每隔3s执行一次,去判断规则持久化文件的最后修改时间。这里有一定时间的延迟,但如果时间设置的太短,有影响服务器的性能
  • 我们的微服务是集群部署的,其他服务实例可读取不到我这台服务器的本地文件



所以还有一种push推送模式。我们一般会引入第三方中间件来实现,以Nacos为例。我们修改了nacos中的配置,它就会将更新后的数据推送给微服务。



push模式有两种实现方式:

  • 在微服务端添加读数据源,为dataId添加监听器,当规则配置文件更改之后我就获取到更改后的规则内存并更新内存中的数据;再添加一个写数据源,每当dashboard中更新了规则,我除了更新内存中的数据之外,我通过ConfigService.publishConfig()方法还往Nacos端进行写入

  • 在dashboard源码中进行更改,在获取规则内容、更新规则内容的接口中,不要和微服务端进行交互,直接去和Nacos通信,通过ConfigService.publishConfig()ConfigService.getConfig()来实现。这种方式主要注意dashboard端的规则实体对象和微服务端的规则实体对象不一致问题,需要经过转换相关的操作。sentinel默认情况下就直接把规则实体转换为json字符串推送给Nacos,Nacos配置文件更改了,又推送给微服务,微服务这边再把json字符串转换为规则实体对象这一步就会发现,转换失败了,某些属性对应不上。进而就导致了dashboard端设置的规则在微服务这边未生效。



微服务端的实现

具体实现

引入读数据源的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>



配置文件中添加规则持久化的dataId

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            dataId: ${spring.application.name}-flow-rules
            # 我这里在Nacos配置中心,单独使用了一个组
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



在Nacos配置中心中创建对应的配置文件

在这里插入图片描述



编写java类,定义写数据源

import com.alibaba.cloud.sentinel.SentinelProperties;
import com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 添加往Nacos的写数据源,只不过未使用InitFunc
 * 如果要使用就需要放开注解
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(SentinelAutoConfiguration.class)
public class SentinelNacosDataSourceConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public SentinelNacosDataSourceHandler sentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) {
        return new SentinelNacosDataSourceHandler(sentinelProperties);
    }
}
import com.alibaba.cloud.sentinel.SentinelProperties;
import com.alibaba.cloud.sentinel.datasource.RuleType;
import com.alibaba.cloud.sentinel.datasource.config.DataSourcePropertiesConfiguration;
import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;
import com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.SmartInitializingSingleton;

import java.util.List;

/**
 * sentinel 规则持久化到 nacos配置中心
 */
public class SentinelNacosDataSourceHandler implements SmartInitializingSingleton {

    private final SentinelProperties sentinelProperties;

    public SentinelNacosDataSourceHandler(SentinelProperties sentinelProperties) {
        this.sentinelProperties = sentinelProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        // 遍历我们配置文件中指定的多个spring.cloud.sentinel.datasource的多个配置
        sentinelProperties.getDatasource().values().forEach(this::registryWriter);
    }

    private void registryWriter(DataSourcePropertiesConfiguration dataSourceProperties) {
        // 只获取application.yml文件中 nacos配置的数据源
        final NacosDataSourceProperties nacosDataSourceProperties = dataSourceProperties.getNacos();
        if (nacosDataSourceProperties == null) {
            return;
        }
        // 获取规则类型,然后根据各个类型创建相应的写数据源
        final RuleType ruleType = nacosDataSourceProperties.getRuleType();
        switch (ruleType) {
            case FLOW:
                WritableDataSource<List<FlowRule>> flowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerFlowDataSource(flowRuleWriter);
                break;
            case DEGRADE:
                WritableDataSource<List<DegradeRule>> degradeRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWriter);
                break;
            case PARAM_FLOW:
                WritableDataSource<List<ParamFlowRule>> paramFlowRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                ModifyParamFlowRulesCommandHandler.setWritableDataSource(paramFlowRuleWriter);
                break;
            case SYSTEM:
                WritableDataSource<List<SystemRule>> systemRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerSystemDataSource(systemRuleWriter);
                break;
            case AUTHORITY:
                WritableDataSource<List<AuthorityRule>> authRuleWriter = new NacosWritableDataSource<>(nacosDataSourceProperties, JSON::toJSONString);
                WritableDataSourceRegistry.registerAuthorityDataSource(authRuleWriter);
                break;
            default:
                break;
        }
    }
}
import com.alibaba.cloud.sentinel.datasource.config.NacosDataSourceProperties;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 将sentinel规则写入到nacos配置中心
 * @param <T>
 */
@Slf4j
public class NacosWritableDataSource<T> implements WritableDataSource<T> {

    private final Converter<T, String> configEncoder;
    private final NacosDataSourceProperties nacosDataSourceProperties;

    private final Lock lock = new ReentrantLock(true);
    private ConfigService configService = null;

    public NacosWritableDataSource(NacosDataSourceProperties nacosDataSourceProperties, Converter<T, String> configEncoder) {
        if (configEncoder == null) {
            throw new IllegalArgumentException("Config encoder cannot be null");
        }
        if (nacosDataSourceProperties == null) {
            throw new IllegalArgumentException("Config nacosDataSourceProperties cannot be null");
        }
        this.configEncoder = configEncoder;
        this.nacosDataSourceProperties = nacosDataSourceProperties;
        final Properties properties = buildProperties(nacosDataSourceProperties);
        try {
            // 也可以直接注入NacosDataSource,然后反射获取其configService属性
            this.configService = NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            log.error("create configService failed.", e);
        }
    }

    private Properties buildProperties(NacosDataSourceProperties nacosDataSourceProperties) {
        Properties properties = new Properties();
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getServerAddr())) {
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDataSourceProperties.getServerAddr());
        } else {
            properties.setProperty(PropertyKeyConst.ACCESS_KEY, nacosDataSourceProperties.getAccessKey());
            properties.setProperty(PropertyKeyConst.SECRET_KEY, nacosDataSourceProperties.getSecretKey());
            properties.setProperty(PropertyKeyConst.ENDPOINT, nacosDataSourceProperties.getEndpoint());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getNamespace())) {
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDataSourceProperties.getNamespace());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getUsername())) {
            properties.setProperty(PropertyKeyConst.USERNAME, nacosDataSourceProperties.getUsername());
        }
        if (!StringUtils.isEmpty(nacosDataSourceProperties.getPassword())) {
            properties.setProperty(PropertyKeyConst.PASSWORD, nacosDataSourceProperties.getPassword());
        }
        return properties;
    }

    @Override
    public void write(T value) throws Exception {
        lock.lock();
        // todo handle cluster concurrent problem
        try {
            String convertResult = configEncoder.convert(value);
            if (configService == null) {
                log.error("configServer is null, can not continue.");
                return;
            }
            // 规则配置数据推送到nacos配置中心
            final boolean published = configService.publishConfig(nacosDataSourceProperties.getDataId(), nacosDataSourceProperties.getGroupId(), convertResult);
            if (!published) {
                log.error("sentinel {} publish to nacos failed.", nacosDataSourceProperties.getRuleType());
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void close() throws Exception {

    }
}



启动微服务进行测试。

dashboard中为某个接口定义一个流控规则

在这里插入图片描述


调用接口测试,发送三次请求

在这里插入图片描述



查看Nacos中的配置文件,就会发现也成功写入了

在这里插入图片描述



源码分析

读数据源

引入读数据源的依赖,我们来看看具体是怎么实现的

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>



实现思路:

  • 和文件的读数据源一样,继承了AbstractDataSource类,这样就不需要我们再去写一遍加载配置、更新内存中的配置

在源码中的这个扩展包下面,就有nacos读数据源的实现

在这里插入图片描述



我们先看看NacosDataSource类的父类的代码

  • 创建一个DynamicSentinelProperty对象,主要作用是更新内存中的规则配置
  • 加载配置、解析配置
public abstract class AbstractDataSource<S, T> implements ReadableDataSource<S, T> {

    protected final Converter<S, T> parser;
    protected final SentinelProperty<T> property;

    public AbstractDataSource(Converter<S, T> parser) {
        if (parser == null) {
            throw new IllegalArgumentException("parser can't be null");
        }
        // 子类传过来的解析器
        this.parser = parser;
        // 更新内存中的配置
        // 我们会经常看见 getProperty().updateValue(newValue); 这样的代码
        this.property = new DynamicSentinelProperty<T>();
    }

    @Override
    public T loadConfig() throws Exception {
        // 调用子类的readSource()方法,一般会得到一个String,
        // 在通过解析器Converter 并解析配置转换成对应的对象
        return loadConfig(readSource());
    }

    public T loadConfig(S conf) throws Exception {
        // 解析配置
        T value = parser.convert(conf);
        return value;
    }

    @Override
    public SentinelProperty<T> getProperty() {
        return property;
    }
}



读配置源的具体实现:

  • 通过Nacos的serverAddr构建一个Properties对象,该对象会用于初始化ConfigService接口的对象
  • 利用线程池中唯一一个线程,创建一个监听器,监听dataId,当配置中心的配置更改后就会调用微服务客户端,微服务客户端这边有一个while+阻塞队列实现的轮询机制,它调用监听器的方法,监听器里面会更新内存中的规则配置
  • 初始化configService对象,并通过configService.addListener(…)为指定的dataId添加监听器
  • 微服务刚启动会调用父类的loadConfig()方法,父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析;再更新内存中的规则配置
public class NacosDataSource<T> extends AbstractDataSource<String, T> {

    private static final int DEFAULT_TIMEOUT = 3000;
	// 创建一个只有一个线程的线程池,用来执行dataId的监听器
    private final ExecutorService pool = new ThreadPoolExecutor(...);

    private final Listener configListener;
    private final String groupId;
    private final String dataId;
    private final Properties properties;

    private ConfigService configService = null;


    public NacosDataSource(final String serverAddr, final String groupId, final String dataId,Converter<String, T> parser) {
        this(NacosDataSource.buildProperties(serverAddr), groupId, dataId, parser);
    }


    public NacosDataSource(final Properties properties, final String groupId, final String dataId,Converter<String, T> parser) {
        super(parser);
        if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
            throw new IllegalArgumentException(...);
        }
        AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
        this.groupId = groupId;
        this.dataId = dataId;
        this.properties = properties;
        // 创建一个监听器
        this.configListener = new Listener() {
            @Override
            public Executor getExecutor() {
                return pool;
            }

            @Override
            public void receiveConfigInfo(final String configInfo) {
                RecordLog.info(...);
                // 通过转换器进行转换
                T newValue = NacosDataSource.this.parser.convert(configInfo);
                // 调用父类的SentinelProperty对象,更新内存中的规则配置
                getProperty().updateValue(newValue);
            }
        };
        // 初始化configService对象,并通过configService.addListener(..)为指定的dataId添加监听器
        initNacosListener();
        // 微服务刚启动,会从Nacos配置中心加载一次配置
        loadInitialConfig();
    }

    private void loadInitialConfig() {
        try {
            // 调用父类的loadConfig()  父类最终又会调用本类中的readSource()方法得到配置中心中的数据,并进行解析
            T newValue = loadConfig();
            if (newValue == null) {
                RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");
            }
            // 调用父类的SentinelProperty对象,更新内存中的规则配置
            getProperty().updateValue(newValue);
        } catch (Exception ex) {
            RecordLog.warn("[NacosDataSource] Error when loading initial config", ex);
        }
    }

    private void initNacosListener() {
        try {
            // 初始化configService对象
            this.configService = NacosFactory.createConfigService(this.properties);
            // Add config listener.
            // 通过configService.addListener(..)为指定的dataId添加监听器
            configService.addListener(dataId, groupId, configListener);
        } catch (Exception e) {
            RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e);
            e.printStackTrace();
        }
    }

    @Override
    public String readSource() throws Exception {
        if (configService == null) {
            throw new IllegalStateException("Nacos config service has not been initialized or error occurred");
        }
        // 通过ConfigService接口中的getConfig()方法,从Nacos配置中心获取配置
        return configService.getConfig(dataId, groupId, DEFAULT_TIMEOUT);
    }

    @Override
    public void close() {
        if (configService != null) {
            configService.removeListener(dataId, groupId, configListener);
        }
        pool.shutdownNow();
    }

    private static Properties buildProperties(String serverAddr) {
        // 构建一个Properties对象,该对象会在初始化ConfigService时会用上
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return properties;
    }
}



写数据源的实现

写数据源源码实现流程相对简单。我们知道dashboard更新配置后调用微服务端,微服务这边的ModifyRulesCommandHandler类会处理规则更改的请求。这里会有一个写数据源相关的操作

// 注意name = "setRules",这就是控制台请求服务端的url路径
@CommandMapping(name = "setRules", desc = "modify the rules, accept param: type={ruleType}&data={ruleJson}")
public class ModifyRulesCommandHandler implements CommandHandler<String> {

    public CommandResponse<String> handle(CommandRequest request) {

        //......

        // 处理流控规则
        if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
            List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
            FlowRuleManager.loadRules(flowRules);
            // 关键一步,这里会有一个写数据源的操作。默认情况下是没有WritableDataSource,我们可以在这里进行扩展
            if (!writeToDataSource(getFlowDataSource(), flowRules)) {
                result = WRITE_DS_FAILURE_MSG;
            }
            return CommandResponse.ofSuccess(result);

            // 处理权限规则
        } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
            ...

            // 处理熔断规则
        } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
            ...

            // 处理系统规则
        } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
            ...
        }
        return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    }
}



所以我们要做的事情就是创建一个写数据源,并进行注册写数据源WritableDataSourceRegistry。我们先来看看源码中的Demo,通过读写文件的方式实现的读写数据源。

public void init() throws Exception {
    // 文件保存路径
    String flowRuleDir = System.getProperty("user.home") + File.separator + "sentinel" + File.separator + "rules";
    String flowRuleFile = "flowRule.json";
    String flowRulePath = flowRuleDir + File.separator + flowRuleFile;

    // 添加读数据源
    ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>(
        flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})
    );
    FlowRuleManager.register2Property(ds.getProperty());

    // 添加写数据源
    WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson);
    WritableDataSourceRegistry.registerFlowDataSource(wds);
}



我在定义一个往Nacos的写数据源,一个简单的实现,具体项目中能用的请参考上面 <具体实现>一章 。这里只是用更少的代码来理解nacos的写数据源

import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;

import java.util.List;


public class NacosDataSourceInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        //流控规则
        WritableDataSource<List<FlowRule>> writableDataSource = new NacosWritableDataSource<>(
                "127.0.0.1:8848", "DEFAULT_GROUP", "mall-user-sentinel-rule-push-demo-flow", JSON::toJSONString);
        WritableDataSourceRegistry.registerFlowDataSource(writableDataSource);
    }
}
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.exception.NacosException;

import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class NacosWritableDataSource<T> implements WritableDataSource<T> {

    private final String serverAddr;
    private final String groupId;
    private final String dataId;
    private final Properties properties;
    private ConfigService configService;
    private final Converter<T, String> configEncoder;
    private final Lock lock = new ReentrantLock(true);

    public NacosWritableDataSource(String serverAddr, String groupId, String dataId, Converter<T, String> configEncoder) {
        this.serverAddr = serverAddr;
        this.groupId = groupId;
        this.dataId = dataId;
        // 通过serverAddr构建一个properties对象
        this.properties = NacosWritableDataSource.buildProperties(serverAddr);
        this.configEncoder = configEncoder;
        initConfigService();
    }

    private void initConfigService() {
        try {
            // 通过properties对象初始化ConfigService
            this.configService = NacosFactory.createConfigService(properties);
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }

    private static Properties buildProperties(String serverAddr) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return properties;
    }

    @Override
    public void write(T t) throws Exception {
        lock.lock();
        try {
            // 通过ConfigService往Nacos配置中心写入数据
            configService.publishConfig(dataId, groupId, this.configEncoder.convert(t), ConfigType.JSON.getType());
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void close() throws Exception {

    }

}



微服务端解析读数据源流程

我们引入了下面的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

并在配置文件中指定了多个读数据源。这些数据源是如何创建的嘞?

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        # 每一个都是一个读数据源
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            dataId: ${spring.application.name}-flow-rules
            # 我这里在Nacos配置中心,单独使用了一个组
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        # 读数据源
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



源码的入口是SentinelDataSourceHandler类,它实现了SmartInitializingSingleton接口,这是Spring中的接口,所有非懒加载单例bean创建完成之后会调用这个接口的实现类:

  • 在构造函数中依赖注入SentinelProperties对象,该对象中保存了我们配置文件中所有读数据源的配置
  • 遍历SentinelProperties对象中的读数据源,并为每一个读数据源生成一个beanName
  • 为每一个读数据源对象 + beanName 创建一个BeanDefinition
  • 将BeanDefinition添加进BeanFactory中
  • BeanFactory.getBean(beanName) 创建读数据源对象。该对象其实是FactoryBean类型的
  • 上方的getBean()方法最终会调用至NacosDataSourceFactoryBean.getObject()方法,在这里创建NacosDataSource对象。该对象就是上方引入maven依赖中的读数据源对象。
public class SentinelDataSourceHandler implements SmartInitializingSingleton {

    //......
    // SentinelProperties中保存着Map<String, DataSourcePropertiesConfiguration> datasource
    // 也就是我们上方yml文件中定义的多个数据源,我们自定义的名字就是String
    private final SentinelProperties sentinelProperties;

    // 构造方法中进行依赖注入 sentinelProperties对象
    public SentinelDataSourceHandler(DefaultListableBeanFactory beanFactory,SentinelProperties sentinelProperties,...) {
        //...
        this.sentinelProperties = sentinelProperties;
    }

    // 遍历Map<String, DataSourcePropertiesConfiguration>集合,最终取出我们的每一个配置的数据源
    @Override
    public void afterSingletonsInstantiated() {
        sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> {
            try {
                List<String> validFields = dataSourceProperties.getValidField();
                // ...

                // AbstractDataSourceProperties就是我们在配置文件中具体的每一个配置对象的公共父类
                AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties
                    .getValidDataSourceProperties();
                abstractDataSourceProperties.setEnv(env);
                abstractDataSourceProperties.preCheck(dataSourceName);
                // 把我们配置的每一个数据源,还有这里字符串凭借的一个beanName。调用下面的registerBean()方法
                // beanName为   flow-rules + "-sentinel-" + nacos + "-datasource"
                // flow-rules是我们在yml文件中自定义的名字,nacos就是下面的validFields.get(0)值
                registerBean(abstractDataSourceProperties, dataSourceName+ "-sentinel-" + validFields.get(0) + "-datasource");
            }
            catch (Exception e) {
                log.error(...);
            }
        });
    }


    private void registerBean(final AbstractDataSourceProperties dataSourceProperties,String dataSourceName) {
        // 对我们的数据源生成一个BeanDefinition
        BeanDefinitionBuilder builder = parseBeanDefinition(dataSourceProperties,dataSourceName);
        // 将BeanDefinition添加进BeanFactory中
        this.beanFactory.registerBeanDefinition(dataSourceName,builder.getBeanDefinition());
        // 通过beanFactory.getBean(dataSourceName)方法,创建bean对象
        // 我们配置文件中定义的每一个读数据源就变为了一个一个的bean
        // 注意,我们的读数据源它是一个FactoryBean,这里的getBean()方法最终会去到NacosDataSourceFactoryBean.getObject()
        AbstractDataSource newDataSource = (AbstractDataSource) this.beanFactory.getBean(dataSourceName);

        // 将读数据源添加进对应的规则管理器中
        dataSourceProperties.postRegister(newDataSource);
    }

public class NacosDataSourceFactoryBean implements FactoryBean<NacosDataSource> {

  //......

   @Override
   public NacosDataSource getObject() throws Exception {
       // 为properties对象赋值
      Properties properties = new Properties();
      if (!StringUtils.isEmpty(this.serverAddr)) {
         properties.setProperty(PropertyKeyConst.SERVER_ADDR, this.serverAddr);
      }
      else {
         properties.setProperty(PropertyKeyConst.ENDPOINT, this.endpoint);
      }

      if (!StringUtils.isEmpty(this.contextPath)) {
         properties.setProperty(PropertyKeyConst.CONTEXT_PATH, this.contextPath);
      }
      if (!StringUtils.isEmpty(this.accessKey)) {
         properties.setProperty(PropertyKeyConst.ACCESS_KEY, this.accessKey);
      }
      if (!StringUtils.isEmpty(this.secretKey)) {
         properties.setProperty(PropertyKeyConst.SECRET_KEY, this.secretKey);
      }
      if (!StringUtils.isEmpty(this.namespace)) {
         properties.setProperty(PropertyKeyConst.NAMESPACE, this.namespace);
      }
      if (!StringUtils.isEmpty(this.username)) {
         properties.setProperty(PropertyKeyConst.USERNAME, this.username);
      }
      if (!StringUtils.isEmpty(this.password)) {
         properties.setProperty(PropertyKeyConst.PASSWORD, this.password);
      }
       // 创建一个Nacos读数据源对象,这里也就是上方:<源码分析> —— <读数据源> 的那一个对象
      return new NacosDataSource(properties, groupId, dataId, converter);
   }
 
    // ......
}



修改源码的实现

我们需要在Sentinel源码中进行修改,将dashboard和微服务之间的通信,改为dashboard和nacos的通信。在通过Nacos配置中心的推送机制去更新微服务内存中的规则配置。

从 Sentinel 1.4.0 开始,Sentinel 控制台提供 DynamicRulePublisher DynamicRuleProvider 接口用于实现应用维度的规则推送和拉取:

  • DynamicRuleProvider: 拉取规则

  • DynamicRulePublisher: 推送规则

在dashboard工程下的com.alibaba.csp.sentinel.dashboard.rule包下创建nacos包,然后把各种场景的配置规则拉取和推送的实现类写到此包下

可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑

在这里插入图片描述



官方demo

我们看看官方的demo是如何实现的

首先创建一个NacosConfigUtil类,用来定义常量

public final class NacosConfigUtil {

    // 其实demo中也就用到了上面两个常量
    // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置
    public static final String GROUP_ID = "SENTINEL_GROUP";
    
    // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样
    // 避免你写一个dataId,微服务从另一个dataId去读
    public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules";
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules";
    public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";
    public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config";
    public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config";
    public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config";
    public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set";

    private NacosConfigUtil() {}
}



创建一个NacosConfig配置类,这里就定义了流控规则相关的转换器

@Configuration
public class NacosConfig {

    // 流控规则相关 定义 List<FlowRuleEntity> 到 String的转换器
    @Bean
    public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
        return JSON::toJSONString;
    }

    // 流控规则相关  定义 String 到 List<FlowRuleEntity>的转换器
    @Bean
    public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
        return s -> JSON.parseArray(s, FlowRuleEntity.class);
    }

    // 根据一个Nacos的serverAddr,创建ConfigService对象。推送配置/拉取配置都是通过该对象来完成的
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        return ConfigFactory.createConfigService("localhost");
    }
}



接下来我们来看看dashboard推送规则配置的实现代码,它实现了DynamicRulePublisher接口

@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    // 注入上面配置类的中定义的ConfigService和Converter转换器
    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<List<FlowRuleEntity>, String> converter;

    @Override
    public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        // 调用Nacos的configService.publishConfig(..)方法 推送配置
        // dataId为 appName + 最上方的常量文件后缀-flow-rules  , 分组为最上方定义的常量SENTINEL_GROUP , 并对规则配置集合转换为json字符串
        configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
                                    NacosConfigUtil.GROUP_ID, converter.convert(rules));
    }
}



接下来我们来看看dashboard拉取规则配置的实现代码,它实现了DynamicRuleProvider接口

@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    // 注入上面配置类的中定义的ConfigService和Converter转换器
    @Autowired
    private ConfigService configService;
    @Autowired
    private Converter<String, List<FlowRuleEntity>> converter;

    @Override
    public List<FlowRuleEntity> getRules(String appName) throws Exception {
        // 调用Nacos的configService.getConfig(dataId, group, timeoutMs)方法 拉取配置
        String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
            NacosConfigUtil.GROUP_ID, 3000);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        // 将json字符串转换为 List<FlowRuleEntity> 规则实体对象集合
        return converter.convert(rules);
    }
}



官方Demo这种方式功能上的确是实现了与Nacos通信,对Nacos配置中心进行读写。但存在一个小问题。那就是dashboard这边规则实体对象是FlowRuleEntity,但是微服务端规则实体对象是FlowRule。Nacos把配置推送给微服务端时,微服务端把json字符串转换为实体对象时可能就会出现不匹配的情况 —> 微服务规则实体对象没有相应的值 ----> 内存中的规则也就不完善 ----> 出现了dashboard端更新的规则微服务端未生效情况。

当然,流控规则都还好,如下图所示,这两个之间的实体对象成员属性基本上都能对应上

在这里插入图片描述



但热点规则这边的实体就不行了,他们之间的层级关系就不同了

public class ParamFlowRuleEntity extends AbstractRuleEntity<ParamFlowRule> {

    public ParamFlowRuleEntity() {
    }

    // ParamFlowRule为客户端的规则实体,但是这里将一整个实体对象变为了ParamFlowRuleEntity的其中一个属性
    // 所以这里转json之后的层级关系就发生了改变
    public ParamFlowRuleEntity(ParamFlowRule rule) {
        AssertUtil.notNull(rule, "Authority rule should not be null");
        // 父类中的属性
        this.rule = rule;
    }
    
    ...
}

// 父类
public abstract class AbstractRuleEntity<T extends AbstractRule> implements RuleEntity {

    protected Long id;

    protected String app;
    protected String ip;
    protected Integer port;

    // ParamFlowRule为客户端的规则实体,成为了ParamFlowRuleEntity实体的一个成员属性
    protected T rule;

    private Date gmtCreate;
    private Date gmtModified;
    ...
}



为了解决这种情况,那么就需要定义一个规范,存入Nacos配置中心的数据只能是微服务那边的规则实体对象,不能是dashboard这边的规则实体对象



修改源码实现

naocs配置中心保存的是微服务端的规则实体对象

各个规则都先在dashboard端将规则实体转换为微服务能用的规则实体在推送至Nacos配置中心

从Nacos配置中心获取配置后,都先将json字符串转换为dashboard端的规则实体对象



项目结构如下

在这里插入图片描述



配置类
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.ApiDefinitionEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.gateway.GatewayFlowRuleEntity;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.RuleEntity;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


public final class NacosConfigUtil {

    // 定义配置中心的分组名,这里需要和微服务端进行配对,不然dashboard推送一个分组,微服务结果从另一个分组去读取配置
    public static final String GROUP_ID = "SENTINEL_GROUP";
    
    // 定义配置文件dataId的一个后缀,一般命名就是 serviceName + 后缀。当然dataId也要和微服务那边读取配置保存一样
    // 避免你写一个dataId,微服务从另一个dataId去读
    public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules";
    public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-flow-rules";
    public static final String DEGRADE_DATA_ID_POSTFIX = "-degrade-rules";
    public static final String SYSTEM_DATA_ID_POSTFIX = "-system-rules";
    public static final String AUTHORITY_DATA_ID_POSTFIX = "-authority-rules";
    public static final String GATEWAY_FLOW_DATA_ID_POSTFIX = "-gateway-flow-rules";
    public static final String GATEWAY_API_DATA_ID_POSTFIX = "-gateway-api-rules";

    public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";

    /**
     * cc for `cluster-client`
     */
    public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config";
    /**
     * cs for `cluster-server`
     */
    public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config";
    public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config";
    public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set";
    
    //超时时间
    public static final int READ_TIMEOUT = 3000;

    private NacosConfigUtil() {}
    
    
    /**
     * RuleEntity----->Rule
     * 控制台这边的规则实体都是RuleEntity类型的,这里就调用各个规则实体对象中的toRule()方法,转换为微服务端的规则实体对象
     * 例如 FlowRuleEntity#toRule ----> FlowRule          ParamFlowRuleEntity#toRule ----> ParamFlowRule
     * @param entities
     * @return
     */
    public static String convertToRule(List<? extends RuleEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toRule())
                        .collect(Collectors.toList()));
    }
    
    /**
     * ApiDefinitionEntity----->ApiDefinition
     * @param entities
     * @return
     */
    public static String convertToApiDefinition(List<? extends ApiDefinitionEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toApiDefinition())
                        .collect(Collectors.toList()));
    }
    
    /**
     * GatewayFlowRuleEntity----->GatewayFlowRule
     * @param entities
     * @return
     */
    public static String convertToGatewayFlowRule(List<? extends GatewayFlowRuleEntity> entities){
        return JSON.toJSONString(
                entities.stream().map(r -> r.toGatewayFlowRule())
                        .collect(Collectors.toList()));
    }



}



通过Nacos配置中心的地址,创建对应的ConfigService对象,并存入Spring容器中

@Configuration
public class NacosConfig {

    @Value("${sentinel.nacos.config.serverAddr}")
    private String serverAddr="localhost:8848";
    
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        return ConfigFactory.createConfigService(serverAddr);
    } 
    
    
    /*
    
    对于Nacos开启了认证,那么就需要添加Naocs的用户名和密码了
    
    @Bean
    public ConfigService nacosConfigService() throws Exception {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        properties.put(PropertyKeyConst.USERNAME, "nacos");
        properties.put(PropertyKeyConst.PASSWORD, "nacos");

        return ConfigFactory.createConfigService(properties);
    }*/
}



flow

拉取配置,实现DynamicRuleProvider接口

@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {

    // 注入我们上面创建的ConfigService对象
    @Autowired
    private ConfigService configService;

    @Override
    public List<FlowRuleEntity> getRules(String appName,String ip,Integer port) throws NacosException {
        // 从Nacos配置中心拉取配置
        String rules = configService.getConfig(appName + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        
        // 解析json获取到 List<FlowRule>
        List<FlowRule> list = JSON.parseArray(rules, FlowRule.class);
        
        // 通过FlowRuleEntity.fromFlowRule(..) 方法实现 FlowRule------->FlowRuleEntity
        return list.stream().map(rule ->
                FlowRuleEntity.fromFlowRule(appName,ip,port,rule))
                .collect(Collectors.toList());
    }
}

/*

FlowRuleEntity.fromFlowRule(..) 方法如下所示,Sentinel的dashboard端的规则实体对象内其实都自己写了对应的fromFlowRule()方法

public static FlowRuleEntity fromFlowRule(String app, String ip, Integer port, FlowRule rule) {
    FlowRuleEntity entity = new FlowRuleEntity();
    entity.setApp(app);
    entity.setIp(ip);
    entity.setPort(port);
    entity.setLimitApp(rule.getLimitApp());
    entity.setResource(rule.getResource());
    entity.setGrade(rule.getGrade());
    entity.setCount(rule.getCount());
    entity.setStrategy(rule.getStrategy());
    entity.setRefResource(rule.getRefResource());
    entity.setControlBehavior(rule.getControlBehavior());
    entity.setWarmUpPeriodSec(rule.getWarmUpPeriodSec());
    entity.setMaxQueueingTimeMs(rule.getMaxQueueingTimeMs());
    entity.setClusterMode(rule.isClusterMode());
    entity.setClusterConfig(rule.getClusterConfig());
    return entity;
}


*/



推送配置,实现DynamicRulePublisher接口

@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {

    // 注入我们上面创建的ConfigService对象
    @Autowired
    private ConfigService configService;
    
    @Override
    public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        //发布配置到Nacos配置中心,这里会调用我们工具类中编写的方法NacosConfigUtil.convertToRule(rules)
        configService.publishConfig(app + NacosConfigUtil.FLOW_DATA_ID_POSTFIX,
            NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



authority
@Component("authorityRuleNacosProvider")
public class AuthorityRuleNacosProvider implements DynamicRuleProvider<List<AuthorityRuleEntity>> {
    @Autowired
    private ConfigService configService;

    @Override
    public List<AuthorityRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<AuthorityRule> list = JSON.parseArray(rules, AuthorityRule.class);
        return list.stream().map(rule ->
                AuthorityRuleEntity.fromAuthorityRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }


}
@Component("authorityRuleNacosPublisher")
public class AuthorityRuleNacosPublisher implements DynamicRulePublisher<List<AuthorityRuleEntity>> {
    @Autowired
    private ConfigService configService;
    

    @Override
    public void publish(String app, List<AuthorityRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.AUTHORITY_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



degread
@Component("degradeRuleNacosProvider")
public class DegradeRuleNacosProvider implements DynamicRuleProvider<List<DegradeRuleEntity>> {
    @Autowired
    private ConfigService configService;

    
    @Override
    public List<DegradeRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<DegradeRule> list = JSON.parseArray(rules, DegradeRule.class);
        return list.stream().map(rule ->
                DegradeRuleEntity.fromDegradeRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("degradeRuleNacosPublisher")
public class DegradeRuleNacosPublisher implements DynamicRulePublisher<List<DegradeRuleEntity>> {
    @Autowired
    private ConfigService configService;
    

    @Override
    public void publish(String app, List<DegradeRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.DEGRADE_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



param
@Component("paramFlowRuleNacosProvider")
public class ParamFlowRuleNacosProvider implements DynamicRuleProvider<List<ParamFlowRuleEntity>> {
    @Autowired
    private ConfigService configService;


    @Override
    public List<ParamFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<ParamFlowRule> list = JSON.parseArray(rules, ParamFlowRule.class);
        return list.stream().map(rule ->
                ParamFlowRuleEntity.fromParamFlowRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("paramFlowRuleNacosPublisher")
public class ParamFlowRuleNacosPublisher implements DynamicRulePublisher<List<ParamFlowRuleEntity>> {
    @Autowired
    private ConfigService configService;
    
    @Override
    public void publish(String app, List<ParamFlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.PARAM_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



system
@Component("systemRuleNacosProvider")
public class SystemRuleNacosProvider implements DynamicRuleProvider<List<SystemRuleEntity>> {
    @Autowired
    private ConfigService configService;

    @Override
    public List<SystemRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<SystemRule> list = JSON.parseArray(rules, SystemRule.class);
        return list.stream().map(rule ->
                SystemRuleEntity.fromSystemRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }


}
@Component("systemRuleNacosPublisher")
public class SystemRuleNacosPublisher implements DynamicRulePublisher<List<SystemRuleEntity>> {
    @Autowired
    private ConfigService configService;


    @Override
    public void publish(String app, List<SystemRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.SYSTEM_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToRule(rules));
    }
}



gateway
public class ApiDefinition2 {
    private String apiName;
    private Set<ApiPathPredicateItem> predicateItems;
    
    public ApiDefinition2() {
    }
    
    public String getApiName() {
        return apiName;
    }
    
    public void setApiName(String apiName) {
        this.apiName = apiName;
    }
    
    public Set<ApiPathPredicateItem> getPredicateItems() {
        return predicateItems;
    }
    
    public void setPredicateItems(Set<ApiPathPredicateItem> predicateItems) {
        this.predicateItems = predicateItems;
    }
    
    @Override
    public String toString() {
        return "ApiDefinition2{" + "apiName='" + apiName + '\'' + ", predicateItems=" + predicateItems + '}';
    }
    
    
    public ApiDefinition toApiDefinition() {
        ApiDefinition apiDefinition = new ApiDefinition();
        apiDefinition.setApiName(apiName);
        
        Set<ApiPredicateItem> apiPredicateItems = new LinkedHashSet<>();
        apiDefinition.setPredicateItems(apiPredicateItems);
        
        if (predicateItems != null) {
            for (ApiPathPredicateItem predicateItem : predicateItems) {
                apiPredicateItems.add(predicateItem);
            }
        }
        
        return apiDefinition;
    }
    
}
@Component("gatewayApiRuleNacosProvider")
public class GatewayApiRuleNacosProvider implements DynamicRuleProvider<List<ApiDefinitionEntity>> {

    @Autowired
    private ConfigService configService;


    @Override
    public List<ApiDefinitionEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        
        // 注意 ApiDefinition的属性Set<ApiPredicateItem> predicateItems中元素 是接口类型,JSON解析丢失数据
        // 重写实体类ApiDefinition2,再转换为ApiDefinition
        List<ApiDefinition2> list = JSON.parseArray(rules, ApiDefinition2.class);
        
        return list.stream().map(rule ->
                ApiDefinitionEntity.fromApiDefinition(appName, ip, port, rule.toApiDefinition()))
                .collect(Collectors.toList());
    }
    
    public static void main(String[] args) {
        String rules = "[{\"apiName\":\"/pms/productInfo/${id}\",\"predicateItems\":[{\"matchStrategy\":1,\"pattern\":\"/pms/productInfo/\"}]}]";
        
        
        List<ApiDefinition> list = JSON.parseArray(rules, ApiDefinition.class);
        System.out.println(list);
    
        List<ApiDefinition2> list2 = JSON.parseArray(rules, ApiDefinition2.class);
        System.out.println(list2);
    
        System.out.println(list2.get(0).toApiDefinition());
    
    }
    
}
@Component("gatewayApiRuleNacosPublisher")
public class GatewayApiRuleNacosPublisher implements DynamicRulePublisher<List<ApiDefinitionEntity>> {

    @Autowired
    private ConfigService configService;


    @Override
    public void publish(String app, List<ApiDefinitionEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToApiDefinition(rules));
    }
}
@Component("gatewayFlowRuleNacosProvider")
public class GatewayFlowRuleNacosProvider implements DynamicRuleProvider<List<GatewayFlowRuleEntity>> {

    @Autowired
    private ConfigService configService;

    @Override
    public List<GatewayFlowRuleEntity> getRules(String appName,String ip,Integer port) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.READ_TIMEOUT);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        List<GatewayFlowRule> list = JSON.parseArray(rules, GatewayFlowRule.class);
        return list.stream().map(rule ->
                GatewayFlowRuleEntity.fromGatewayFlowRule(appName, ip, port, rule))
                .collect(Collectors.toList());
    }
}
@Component("gatewayFlowRuleNacosPublisher")
public class GatewayFlowRuleNacosPublisher implements DynamicRulePublisher<List<GatewayFlowRuleEntity>> {

    @Autowired
    private ConfigService configService;

    @Override
    public void publish(String app, List<GatewayFlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                NacosConfigUtil.GROUP_ID, NacosConfigUtil.convertToGatewayFlowRule(rules));
    }
}



修改源码

我们现在需要在controller层,将原本dashboard从微服务获取规则配置、dashboard更新规则后调用微服务,这一过程改为Nacos。

以流控规则举例,在FlowControllerV1层中注入我们写的类

/** 从远程配置中心拉取规则*/
@Autowired
@Qualifier("flowRuleNacosProvider")
private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;

/** 推送规则到远程配置中心*/
@Autowired
@Qualifier("flowRuleNacosPublisher")
private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;

原本dashboard从微服务获取规则配置改为通过flowRuleNacosProvider从Nacos拉取配置

@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
                                                         @RequestParam String ip,
                                                         @RequestParam Integer port) {

    if (StringUtil.isEmpty(app)) {
        return Result.ofFail(-1, "app can't be null or empty");
    }
    if (StringUtil.isEmpty(ip)) {
        return Result.ofFail(-1, "ip can't be null or empty");
    }
    if (port == null) {
        return Result.ofFail(-1, "port can't be null");
    }
    try {
        //从客户端内存获取规则配置
        //List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);

        //从远程配置中心获取规则配置
        List<FlowRuleEntity> rules = ruleProvider.getRules(app,ip,port);
        if (rules != null && !rules.isEmpty()) {
            for (FlowRuleEntity entity : rules) {
                entity.setApp(app);
                if (entity.getClusterConfig() != null && entity.getClusterConfig().getFlowId() != null) {
                    entity.setId(entity.getClusterConfig().getFlowId());
                }
            }
        }

        rules = repository.saveAll(rules);
        return Result.ofSuccess(rules);
    } catch (Throwable throwable) {
        logger.error("Error when querying flow rules", throwable);
        return Result.ofThrowable(-1, throwable);
    }
}

规则更改后推送至Nacos

@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
    Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
    if (checkResult != null) {
        return checkResult;
    }
    entity.setId(null);
    Date date = new Date();
    entity.setGmtCreate(date);
    entity.setGmtModified(date);
    entity.setLimitApp(entity.getLimitApp().trim());
    entity.setResource(entity.getResource().trim());
    try {
        // 规则写入dashboard的内存中,会写入三个map中
        entity = repository.save(entity);
        //发布规则到客户端内存中
        //publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
        //发布规则到远程配置中心
        publishRules(entity.getApp());
        return Result.ofSuccess(entity);
    } catch (Throwable t) {
        Throwable e = t instanceof ExecutionException ? t.getCause() : t;
        logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
        return Result.ofFail(-1, e.getMessage());
    }
}

/**
* 发布规则到远程配置中心
* @param app
* @throws Exception
*/
private void publishRules(/*@NonNull*/ String app) throws Exception {
    // 从三个Map中的其中一个获取规则实体集合
    List<FlowRuleEntity> rules = repository.findAllByApp(app);
    // 推送Nacos
    rulePublisher.publish(app, rules);
}



请添加图片描述



在配置文件中指定NacosConfig的地址,因为在最上方的配置类中使用到了该配置项

#接入nacos配置中心用于规则数据持久化
sentinel.nacos.config.serverAddr=localhost:8848



测试

微服务端引入Nacos的读数据源

还是需要它监听dataId的更改,并更新内存中的规则数据

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

配置文件中添加相应的配置

server:
  port: 8806

spring:
  application:
    name: mall-user-sentinel-rule-push  #微服务名称
    
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080
      datasource:
        # 名称自定义,可以随便定义字符串
        flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            # dataId取了微服务名字,后面再拼接字符串
            # 注意需要和配置类中常量定义的一致
            dataId: ${spring.application.name}-flow-rules
            # 这里的组名需要和配置类中常量定义的一致
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: flow
        degrade-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-degrade-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: degrade
        param-flow-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-param-flow-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: param-flow
        authority-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-authority-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: authority
        system-rules:
          nacos:
            server-addr: 127.0.0.1:8848
            dataId: ${spring.application.name}-system-rules
            groupId: SENTINEL_GROUP
            username: nacos
            password: nacos
            data-type: json
            rule-type: system



在Sentinel中进行了两个限流规则的配置

在这里插入图片描述



Naocs的配置中心也有相应的更改

在这里插入图片描述



微服务中也会生效

在这里插入图片描述



补充

如果在工作中sentinel的持久化这一块已经被其他项目组的人完成了,但他们是直接把dashboard端的规则实体转json,存入了Nacos配置中心。进而导致了热点参数规则不生效,并且不允许我们修改源码。

当出现了上面这种情况,那我们应该怎么处理嘞?

解决方案:

自定义一个解析热点规则配置的解析器FlowParamJsonConverter,继承JsonConverter,重写convert方法。

利用BeanPostProcessor机制替换beanName为param-flow-rules-sentinel-nacos-datasourceconverter属性,注入FlowParamJsonConverter

@Configuration
public class ConverterConfig {
 
    @Bean("sentinel-json-param-flow-converter2")
    @Primary
    public JsonConverter jsonParamFlowConverter() {
        return new FlowParamJsonConverter(new ObjectMapper(), ParamFlowRule.class);
    }
}
 
@Component
public class FlowParamConverterBeanPostProcessor implements BeanPostProcessor {
 
    @Autowired
    private JsonConverter jsonParamFlowConverter;
 
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (beanName.equals("param-flow-rules-sentinel-nacos-datasource")) {
            NacosDataSourceFactoryBean nacosDataSourceFactoryBean = (NacosDataSourceFactoryBean) bean;
            nacosDataSourceFactoryBean.setConverter(jsonParamFlowConverter);
            return bean;
        }
        return bean;
    }
}


public class FlowParamJsonConverter extends JsonConverter {
    Class ruleClass;
 
    public FlowParamJsonConverter(ObjectMapper objectMapper, Class ruleClass) {
        super(objectMapper, ruleClass);
        this.ruleClass = ruleClass;
    }
 
    @Override
    public Collection<Object> convert(String source) {
        List<Object> list = new ArrayList<>();
        JSONArray jsonArray = JSON.parseArray(source);
        for (int i = 0; i < jsonArray.size(); i++) {
            //解析rule属性
            JSONObject jsonObject = (JSONObject) jsonArray.getJSONObject(i).get("rule");
            Object object = JSON.toJavaObject(jsonObject, ruleClass);
            list.add(object);
        }
        return list;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/803266.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

liunx面试题目

如何看当前Linux系统有几颗物理CPU和每颗CPU的核数&#xff1f; 查看物理cup&#xff1a; cat /proc/cpuinfo|grep -c ‘physical id’ 查看每颗cup核数 cat /proc/cpuinfo|grep -c ‘processor’ 若希望自动实现软件包的更新&#xff0c;可以使用yum-cron并启动该服务 yum -y …

解决一下git clone失败的问题

1&#xff09;.不开梯子&#xff0c;我们用https克隆 git clone https://github.com 报错&#xff1a; Failed to connect to github.com port 443 after 2091 ms: Couldnt connect to server 解决办法&#xff1a; 开梯子&#xff0c;然后# 注意修改成自己的IP和端口号 gi…

[HDCTF2019]MFC

[HDCTF2019]MFC-CSDN博客 不会写 完全画瓢 我还以为win32什么系统逆向 原来是小瘪三! VM保护 下载xspy(看雪上有) 打开32位的 再打开 这个窗口 把这个放大镜托到这个大窗口(里面有个小窗口,不要托错了) 下面这个 onmeg 就她不正常,是什么0464 #include <stdio.h&g…

简易ELK搭建

ELK搭建 1. elasticsearch1.1 下载1.2 ES配置1.3 启动ES1.4 开启权限认证1.5 IK分词器配置&#xff08;非必须&#xff09; 2. kibana2.1 下载2.2 配置2.3 启动kibana 3. logstash3.1 下载3.2 配置3.3 启动logstash 4. springboot推送数据 ELK包括elasticsearch、logstash、kib…

自然语言处理(NLP)——法国工程师IMT联盟 期末考试题

1. 问题1 &#xff08;法语&#xff09;En langue arabe lcrasante majorit des mots sont forms par des combinaisons de racines et de schmes. Dans ce mcanisme... &#xff08;英语&#xff09;In Arabic language the vast majority&#xff08;十之八九&#xff09; of…

《昇思25天学习打卡营第23天|onereal》

第23天学习内容简介&#xff1a; ----------------------------------------------------------------------------- 本案例基于MindNLP和ChatGLM-6B实现一个聊天应用。 1 环境配置 配置网络线路 2 代码开发 下载权重大约需要10分钟 ------------------------------- 运…

UI设计工具选择指南:Sketch、XD、Figma、即时设计

在数字产品设计产业链中&#xff0c;UI设计师往往起着连接前后的作用。产品经理从一个“需求”开始&#xff0c;制定一个抽象的产品概念原型。UI设计师通过视觉呈现将抽象概念具体化&#xff0c;完成线框图交互逻辑视觉用户体验&#xff0c;最终输出高保真原型&#xff0c;并将…

基于Java的在线考试系统

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;Java MySQL B/S架构 SpringBoot框架 工具&#xff1a;Eclipse、MySQL环境配置工具 系统展示 首…

ArkUI状态管理

State装饰器 在声明式UI中&#xff0c;是以状态驱动试图更新 状态 (State) 指驱动视图更新的数据(被装饰器标记的变量) 试图(View) 基于UI描述渲染得到用户界面 说明 1.State装饰器标记的变量必须初始化&#xff0c;不能为空 2.State支持Object、classstring、number、b…

【LeetCode:试题 16.06. 最小差 + 双指针 + 防止整型溢出】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

Pythonselenium自动化测试实战项目详解

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 说明&#xff1a;本项目采用流程控制思想&#xff0c;未引用unittest&pytest等单元测试框架 …

SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

上篇博客 SSE&#xff08;Server Sent Event&#xff09;实战&#xff08;2&#xff09;- Spring MVC 实现&#xff0c;我们用 Spring MVC 实现了简单的消息推送&#xff0c;并且留下了两个问题&#xff0c;这篇博客&#xff0c;我们用 Spring Web Flux 实现&#xff0c;并且看…

Unity动画系统(3)---融合树

6.1 动画系统基础2-6_哔哩哔哩_bilibili Animator类 using System.Collections; using System.Collections.Generic; using UnityEngine; public class EthanController : MonoBehaviour { private Animator ani; private void Awake() { ani GetComponen…

【ECharts】使用 ECharts 处理不同时间节点的数据系列展示

使用 ECharts 处理不同时间节点的数据系列展示 在数据可视化中&#xff0c;我们经常遇到这样的问题&#xff1a;不同数据系列的数据点在时间轴上并不对齐。这种情况下&#xff0c;如果直接在 ECharts 中展示&#xff0c;图表可能会出现混乱或不准确。本文将通过一个示例代码&a…

解决VSCode自动识别文件编码

在VScode 的 设置界面 输入 autoGuess 关键字 &#xff0c;勾选启用即可自动识别&#xff01;&#xff01;&#xff01;

【Python与GUI开发】事件处理与打包分发

文章目录 前言 一、高级事件处理 1.自定义事件 2.拖放操作 3.复杂控件的事件处理 二、打包和分发 Tkinter 应用 1.PyInstaller 2.cx_Freeze 3.spec 文件 4.分发注意事项 三、实战示例&#xff1a;文件浏览器 总结 前言 在前面的讨论中&#xff0c;我们深入理解了 T…

Qt MV架构-委托类

一、基本概念 与MVC模式不同&#xff0c;MV视图架构中没有包含一个完全分离的组件来处理与用户的交互。 一般地&#xff0c;视图用来将模型中的数据显示给用户&#xff0c;也用来处理用户的输入。为了获得更高的灵活性&#xff0c;交互可以由委托来执行。 这些组件提供了输入…

每日一 练,java

目录 题目分析代码 题目 选自牛客网 1.小美的平衡矩阵 小美拿到了一个&#x1d45b;∗&#x1d45b;的矩阵&#xff0c;其中每个元素是 0 或者 1。 小美认为一个矩形区域是完美的&#xff0c;当且仅当该区域内 0 的数量恰好等于 1 的数量。现在&#xff0c;小美希望你回答有多…

电瓶车检测AI算法:视频智能分析技术助力电瓶车规范与安全管理

随着电瓶车&#xff08;电动自行车&#xff09;的普及&#xff0c;其在城市交通中扮演着越来越重要的角色。然而&#xff0c;电瓶车的管理、安全监控以及维护等方面也面临着诸多挑战。近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术的发展为解决这些问题提供了新的…

网络开局 与 Underlay网络自动化

由于出口和核心设备 部署在核心机房,地理位置集中,业务复杂,开局通常需要网络工程师进站调测。 因此核心层及核心以上的设备(包含核心层设备,旁挂独立AC设备和出口设备)推荐采用WEB网管开局方式或命令行开局方式。 核心以下的设备(包含汇聚层设备、接入层设备和AP)由于数量众…