Sentinel规则的推送有下面三种模式:
推送模式 | 说明 | 优点 | 缺点 |
---|---|---|---|
原始模式 | API将规则推送至客户端并直接更新到内存中 | 简单,无任何依赖 | 不保证一致性;规则保存在内存中,重启即消失。严重不建议用于生产环境 |
Pull模式 | 扩展写数据源(WritableDataSource),客户端主动向某个规则管理中心定期轮询拉取规则,这个规则中心可以是RDBMS、文件等 | 简单,无任何依赖;规则持久化 | 不保证一致性;实时性不保证,拉取过于频繁也可能会有性能问题。 |
Push模式 | 扩展读数据源(ReadableDataSource),规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用Nacos、Zookeeper等配置中心。这种方式有更好的实时性和一致性保证。生产环境下一般采用push模式的数据源。 | 规则持久化;一致性;快速 | 引入第三方依赖 |
原始模式
如果不做任何修改,Dashboard的推送规则方式是通过API将规则推送至客户端并直接更新到内存中:
这种做法的好处是简单,无依赖;坏处是应用重启规则就会消失,仅用于简单测试,不能用于生产环境。
拉模式
pull模式的数据源(如本地文件、RDBMS等)一般是可写入的。使用时需要在客户端注册数据源:将对应的读数据源注册至对应的 RuleManager,将写数据源注册至transport的WritableDataSourceRegistry中。
首先Sentinel控制台通过API将规则推送至客户端并更新到内存中,接着注册的写数据源会将新的规则保存到本地的文件中。使用 pull模式的数据源时一般不需要对Sentinel控制台进行改造。这种实现方法好处是简单,坏处是无法保证监控数据的一致性。
具体使用方式如下:
引入依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
</dependency>
实现InitFunc接口,在init中处理DataSource初始化逻辑,并利用spi机制实现加载。
package com.morris.user.config;
import com.alibaba.csp.sentinel.datasource.FileRefreshableDataSource;
import com.alibaba.csp.sentinel.datasource.FileWritableDataSource;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;
public class FileDataSourceInit implements InitFunc {
private static final String RULE_FILE_PATH = System.getProperty("user.home") + File.separator;
private static final String FLOW_RULE_FILE_NAME = "FlowRule.json";
@Override
public void init() throws Exception {
//处理流控规则逻辑
dealFlowRules();
}
private void dealFlowRules() throws FileNotFoundException {
String ruleFilePath = RULE_FILE_PATH + FLOW_RULE_FILE_NAME;
//创建流控规则的可读数据源
FileRefreshableDataSource flowRuleRDS = new FileRefreshableDataSource(
ruleFilePath, source -> JSON.parseObject((String) source,
new TypeReference<List<FlowRule>>() {
})
);
// 将可读数据源注册至FlowRuleManager 这样当规则文件发生变化时,就会更新规则到内存
FlowRuleManager.register2Property(flowRuleRDS.getProperty());
WritableDataSource<List<FlowRule>> flowRuleWDS = new FileWritableDataSource<>(
ruleFilePath, this::encodeJson
);
// 将可写数据源注册至 transport 模块的 WritableDataSourceRegistry 中.
// 这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
WritableDataSourceRegistry.registerFlowDataSource(flowRuleWDS);
}
private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
在META-INF/services目录下创建com.alibaba.csp.sentinel.init.InitFunc,内容如下:
com.morris.user.config.FileDataSourceInit
这样当在Dashboard中修改了配置后,Dashboard会调用客户端的接口修改客户端内存中的值,同时将配置写入文件FlowRule.json中,这样操作的话规则是实时生效的,如果是直接修改FlowRule.json的内容,这样需要等定时任务3秒后执行才能读到最新的规则。
推模式
生产环境下一般更常用的是push模式的数据源。对于push模式的数据源,如远程配置中心(ZooKeeper, Nacos, Apollo等等),推送的操作不应由Sentinel客户端进行,而应该经控制台统一进行管理,直接进行推送,数据源仅负责获取配置中心推送的配置并更新到本地。因此推送规则正确做法应该是配置中心控制台/Sentinel控制台 → 配置中心 → Sentinel数据源 → Sentinel,而不是经Sentinel数据源推送至配置中心。这样的流程就非常清晰了:
基于Nacos配置中心控制台实现推送
配置中心控制台 → 配置中心 → Sentinel数据源 → Sentinel
引入依赖:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置文件增加nacos的数据源:
spring:
application:
name: user-service
cloud:
sentinel:
transport:
dashboard: 127.0.0.1:8080
web-context-unify: false # 默认将调用链路收敛,需要打开才可以进行链路流控
datasource:
flow-ds:
nacos:
server-addr: 127.0.0.1:8848
dataId: ${spring.application.name}-flow
groupId: DEFAULT_GROUP
data-type: json
rule-type: flow
最后在Nacos控制台新建一个user-service-flow
的json配置,内容如下:
[
{
"resource": "/sentinel/chainB",
"controlBehavior": 0,
"count": 1,
"grade": 1,
"limitApp": "default",
"strategy": 0
}
]
这样直接在Nacos控制台修改规则就能实时生效了,缺点是直接在Sentinel Dashboard中修改规则配置,配置中心的配置不会发生变化。
思考:如何实现将通过sentinel控制台设置的规则直接持久化到nacos配置中心?
方法一:微服务增加基于Nacos的写数据源(WritableDataSource),发布配置到nacos配置中心。
//核心逻辑: 实现WritableDataSource#write方法,发布配置到nacos配置中心
@Override
public void write(T t) throws Exception {
lock.lock();
try {
configService.publishConfig(dataId, groupId, this.configEncoder.convert(t), ConfigType.JSON.getType());
} finally {
lock.unlock();
}
}
方法二:Sentinel Dashboard监听Nacos配置的变化,如发生变化就更新本地缓存。在Sentinel Dashboard端新增或修改规则配置在保存到内存的同时,直接发布配置到nacos配置中心;Sentinel Dashboard直接从nacos拉取所有的规则配置。Sentinel Dashboard和微服务不直接通信,而是通过nacos配置中心获取到配置的变更,也就是下面的基于Sentinel控制台实现推送。
AbstractDataSourceProperties
SentinelProperties内部提供了TreeMap类型的datasource属性用于配置数据源信息。
com.alibaba.cloud.sentinel.datasource.config.AbstractDataSourceProperties#postRegister
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
case FLOW:
FlowRuleManager.register2Property(dataSource.getProperty());
break;
case DEGRADE:
DegradeRuleManager.register2Property(dataSource.getProperty());
break;
case PARAM_FLOW:
ParamFlowRuleManager.register2Property(dataSource.getProperty());
break;
case SYSTEM:
SystemRuleManager.register2Property(dataSource.getProperty());
break;
case AUTHORITY:
AuthorityRuleManager.register2Property(dataSource.getProperty());
break;
case GW_FLOW:
GatewayRuleManager.register2Property(dataSource.getProperty());
break;
case GW_API_GROUP:
GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
break;
default:
break;
}
}
NacosDataSource从Nacos读取配置
NacosDataSource主要负责与Nacos进行通信,实时获取Nacos的配置。
public NacosDataSource(final Properties properties, final String groupId, final String dataId,
Converter<String, T> parser) {
super(parser);
if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
throw new IllegalArgumentException(String.format("Bad argument: groupId=[%s], dataId=[%s]",
groupId, dataId));
}
AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
this.groupId = groupId;
this.dataId = dataId;
this.properties = properties;
this.configListener = new Listener() {
@Override
public Executor getExecutor() {
return pool;
}
@Override
public void receiveConfigInfo(final String configInfo) {
// 配置发送变更
RecordLog.info("[NacosDataSource] New property value received for (properties: {}) (dataId: {}, groupId: {}): {}",
properties, dataId, groupId, configInfo);
T newValue = NacosDataSource.this.parser.convert(configInfo);
// Update the new value to the property.
getProperty().updateValue(newValue);
}
};
// 监听配置
initNacosListener();
// 第一次读取配置
loadInitialConfig();
}
private void loadInitialConfig() {
try {
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[NacosDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[NacosDataSource] Error when loading initial config", ex);
}
}
private void initNacosListener() {
try {
this.configService = NacosFactory.createConfigService(this.properties);
// Add config listener.
configService.addListener(dataId, groupId, configListener);
} catch (Exception e) {
RecordLog.warn("[NacosDataSource] Error occurred when initializing Nacos data source", e);
e.printStackTrace();
}
}
SentinelDataSourceHandler注入NacosDataSource
SentinelAutoConfiguration中注入了SentinelDataSourceHandler。
SentinelDataSourceHandler负责遍历配置文件中配置的DataSource,然后注入到spring容器中。
com.alibaba.cloud.sentinel.custom.SentinelDataSourceHandler#afterSingletonsInstantiated
public void afterSingletonsInstantiated() {
this.sentinelProperties.getDatasource().forEach((dataSourceName, dataSourceProperties) -> {
try {
List<String> validFields = dataSourceProperties.getValidField();
if (validFields.size() != 1) {
log.error("[Sentinel Starter] DataSource " + dataSourceName + " multi datasource active and won't loaded: " + dataSourceProperties.getValidField());
return;
}
AbstractDataSourceProperties abstractDataSourceProperties = dataSourceProperties.getValidDataSourceProperties();
abstractDataSourceProperties.setEnv(this.env);
abstractDataSourceProperties.preCheck(dataSourceName);
this.registerBean(abstractDataSourceProperties, dataSourceName + "-sentinel-" + (String)validFields.get(0) + "-datasource");
} catch (Exception var5) {
log.error("[Sentinel Starter] DataSource " + dataSourceName + " build error: " + var5.getMessage(), var5);
}
});
}
基于Sentinel控制台实现推送
配置中心控制台 → 配置中心 → Sentinel数据源 → Sentinel
从Sentinel1.4.0开始,Sentinel控制台提供DynamicRulePublisher和DynamicRuleProvider接口用于实现应用维度的规则推送和拉取:
- DynamicRuleProvider: 拉取规则
- DynamicRulePublisher: 推送规则
可以参考Sentinel Dashboard test包下的流控规则拉取和推送的实现逻辑:
这里主要改造Dashboard端,客户端还是采用前面的配置。
引入nacos的依赖:
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.3.0</version>
</dependency>
NacosConfig负责注入一些最基本的配置:
package com.alibaba.csp.sentinel.dashboard.rule.nacos;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.FlowRuleEntity;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.ConfigFactory;
import com.alibaba.nacos.api.config.ConfigService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class NacosConfig {
public static final String GROUP_ID = "DEFAULT_GROUP";
public static final String FLOW_DATA_ID_POSTFIX = "-flow";
@Bean
public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder() {
return JSON::toJSONString;
}
@Bean
public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder() {
return s -> JSON.parseArray(s, FlowRuleEntity.class);
}
@Bean
public ConfigService nacosConfigService() throws Exception {
return ConfigFactory.createConfigService("localhost:8848");
}
}
FlowRuleNacosProvider负责从Nacos读取配置:
package com.alibaba.csp.sentinel.dashboard.rule.nacos;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.FlowRuleEntity;
import com.alibaba.csp.sentinel.dashboard.rule.DynamicRuleProvider;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.nacos.api.config.ConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component("flowRuleNacosProvider")
public class FlowRuleNacosProvider implements DynamicRuleProvider<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Autowired
private Converter<String, List<FlowRuleEntity>> converter;
@Override
public List<FlowRuleEntity> getRules(String appName) throws Exception {
String rules = configService.getConfig(appName + NacosConfig.FLOW_DATA_ID_POSTFIX,
NacosConfig.GROUP_ID, 3000);
if (StringUtil.isEmpty(rules)) {
return new ArrayList<>();
}
return converter.convert(rules);
}
}
FlowRuleNacosPublisher负责将配置写入Nacos:
package com.alibaba.csp.sentinel.dashboard.rule.nacos;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.FlowRuleEntity;
import com.alibaba.csp.sentinel.dashboard.rule.DynamicRulePublisher;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.alibaba.nacos.api.config.ConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component("flowRuleNacosPublisher")
public class FlowRuleNacosPublisher implements DynamicRulePublisher<List<FlowRuleEntity>> {
@Autowired
private ConfigService configService;
@Autowired
private Converter<List<FlowRuleEntity>, String> converter;
@Override
public void publish(String app, List<FlowRuleEntity> rules) throws Exception {
AssertUtil.notEmpty(app, "app name cannot be empty");
if (rules == null) {
return;
}
configService.publishConfig(app + NacosConfig.FLOW_DATA_ID_POSTFIX,
NacosConfig.GROUP_ID, converter.convert(rules));
}
}
上面都是新增的类,最后还需要在Dashboard查询和修改规则时进行修改,具体修改是在FlowControllerV1
@Autowired
private FlowRuleNacosProvider flowRuleNacosProvider;
@Autowired
private FlowRuleNacosPublisher flowRuleNacosPublisher;
@GetMapping("/rules")
@AuthAction(PrivilegeType.READ_RULE)
public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app,
@RequestParam String ip,
@RequestParam Integer port) {
... ...
try {
//List<FlowRuleEntity> rules = sentinelApiClient.fetchFlowRuleOfMachine(app, ip, port);
List<FlowRuleEntity> rules = flowRuleNacosProvider.getRules(app);
rules = repository.saveAll(rules);
return Result.ofSuccess(rules);
} catch (Throwable throwable) {
logger.error("Error when querying flow rules", throwable);
return Result.ofThrowable(-1, throwable);
}
}
@PostMapping("/rule")
@AuthAction(PrivilegeType.WRITE_RULE)
public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
if (checkResult != null) {
return checkResult;
}
entity.setId(null);
Date date = new Date();
entity.setGmtCreate(date);
entity.setGmtModified(date);
entity.setLimitApp(entity.getLimitApp().trim());
entity.setResource(entity.getResource().trim());
try {
entity = repository.save(entity);
publishRules(entity.getApp());
// publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
return Result.ofSuccess(entity);
} catch (Throwable t) {
Throwable e = t instanceof ExecutionException ? t.getCause() : t;
logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
return Result.ofFail(-1, e.getMessage());
}
}
private void publishRules(/*@NonNull*/ String app) throws Exception {
List<FlowRuleEntity> rules = repository.findAllByApp(app);
flowRuleNacosPublisher.publish(app, rules);
}
以流控规则测试,当在sentinel dashboard配置了流控规则,会在nacos配置中心生成对应的配置,这样客户端就能读取到这个流控规则配置了。
user-service-flow.json
[{"app":"user-service","clusterConfig":{"acquireRefuseStrategy":0,"clientOfflineTime":2000,"fallbackToLocalWhenFail":true,"resourceTimeout":2000,"resourceTimeoutStrategy":0,"sampleCount":10,"strategy":0,"thresholdType":0,"windowIntervalMs":1000},"clusterMode":false,"controlBehavior":0,"count":1.0,"gmtCreate":1686709073771,"gmtModified":1686709073771,"grade":1,"id":2,"ip":"127.0.0.1","limitApp":"default","port":8720,"resource":"chain","strategy":0}]