熔断降级
Slot 责任链上的最后一环:熔断降级 DegradeSlot,熔断降级作为保护系统的一种强大手段,可以根据慢调用、异常比例和异常数进行熔断,并自定义持续时间以实现系统保护
规则配置
规则类中属性解析
与控制面板对应
// 其中资源名称在 AbstractRule 里。
public class DegradeRule extends AbstractRule {
/*
熔断策略 (0: 平均RT,1: 异常比例,2: 异常计数)
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
// 阈值计数, 含义取决于所选择的熔断策略
private double count;
// 熔断器断开后恢复时间(单位秒), 超时后, 熔断器转换成半开状态, 允许部分请求通过
private int timeWindow;
// 触发熔断最低请求数
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
// RT模式下慢请求比例的阈值
private double slowRatioThreshold = 1.0d;
// 间隔统计持续时间 (毫秒)
private int statIntervalMs = 1000;
}
-
grade
:熔断降级规则的类型,取值范围为- 慢调用比例:
RuleConstant.DEGRADE_GRADE_RT
=0, 默认值就是慢比例 - 异常比例:
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO
=1 - 异常数:
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT
=2
- 慢调用比例:
-
count
:熔断降级的阈值,具体含义取决于grade
字段的值- 慢调用比例:
count
表示慢调用比例阈值 - 异常比例:
count
表示异常比例阈值 - 异常数:
count
表示异常数阈值
- 慢调用比例:
-
timeWindow
: 熔断降级发生后的降级持续时间(单位:秒
),在这段时间内,对应的资源将被降级, 超时后, 熔断器转换成半开状态, 允许部分请求通过, 如果这部分请求还是不通过, 那么熔断器转换成开状态, 继续熔断, 如果通过, 那么熔断器转换成关状态 -
minRequestAmount
: 熔断降级统计周期内的最小请求总数。仅当周期内的请求总数达到此值时,才会根据grade
和count
进行熔断降级。- 默认值为
RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT
=5
- 默认值为
-
slowRatioThreshold
:慢调用比例阈值,仅当grade
为慢调用比例时生效。取值范围为 0 到 1 之间的小数,表示慢调用请求占总请求的比例,默认值为1
-
statIntervalMs
:熔断降级统计周期(单位:毫秒
)。在这个周期内,Sentinel 会对请求进行统计,以判断是否需要进行熔断降级。默认值为 1000 毫秒(1 秒)
熔断器
不同的策略底层使用的算法不一样, 我们可以通过if-else进行, 也可以通过switch进行, 但是都不够优雅, 更加优雅的做法是, 使用策略模式, sentinel底层就是采用策略模式实现的
什么是策略模式不在赘述, 见这个链接策略模式 | 菜鸟教程 (runoob.com)
CircuitBreaker
是一个熔断器接口, 用于实现 Sentinel 的熔断降级功能。它定义了一些关键方法和一个内部枚举类型 State
public interface CircuitBreaker {
DegradeRule getRule();
boolean tryPass(Context context);
State currentState();
void onRequestComplete(Context context);
enum State {
OPEN, // 开启
HALF_OPEN, // 半开
CLOSED // 关闭
}
}
DegradeRule getRule()
:获取当前熔断器所对应的熔断降级规则。boolean tryPass(Context context)
:尝试通过熔断器关闭状态(CLOSED)
: 则允许请求通过打开状态(OPEN)
: 则拒绝请求半开状态(HALF_OPEN)
: 则根据规则允许部分请求通过。
State currentState()
:获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED)void onRequestComplete(Context context)
:在请求完成后调用此方法,用于更新熔断器的统计数据。
内部枚举类型 State
:
OPEN
:表示熔断器处于打开
状态,此时会拒绝所有请求HALF_OPEN
:表示熔断器处于半开
状态,此时允许部分请求通过,以检测系统是否已经恢复正常CLOSED
:表示熔断器处于关闭
状态,此时允许所有请求通过
熔断器接口实现类图如下
AbstractCircuitBreaker完成一些功能的基础功能
public abstract class AbstractCircuitBreaker implements CircuitBreaker {}
具体的策略实现类会继承该抽象类完成一些独有的逻辑
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {}
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {}
规程初始化
熔断降级规则的初始化也是通过监听器模式
来完成的。监听器就像是一个基础框架,所有的规则都是基于这套框架来实现的
规则的存储和转换
两个Map用户存储熔断策略
和熔断规则
public final class DegradeRuleManager {
// 熔断策略
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
// 熔断规则
private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
}
将调用者传入的 List<DegradeRule>
转换为上述两个 HashMap
private synchronized void reloadFrom(List<DegradeRule> list) {
// List<DegradeRule> 转 List<CircuitBreaker>
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
// 将熔断器策略
Map<String, List<DegradeRule>> rules = buildCircuitBreakerRules(cbs);
circuitBreakers.updateRules(cbs);
ruleMap.updateRules(rules);
}
buildCircuitBreakers
这里使用了策略模式
, 并使用swtich进行分发
/*
List<DegradeRule> 转 List<CircuitBreaker>
*/
private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
// cbMap用于存储CircuitBreaker
Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
// 非空判断
if (list == null || list.isEmpty()) {
return cbMap;
}
// 遍历列表
for (DegradeRule rule : list) {
// 非法校验
if (!isValidRule(rule)) {
RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
continue;
}
// 如果规则的limitApp为空,则将其设置为默认值
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
// 根据规则获取已存在的CircuitBreaker或创建新的CircuitBreaker
CircuitBreaker cb = getExistingSameCbOrNew(rule);
if (cb == null) {
RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
continue;
}
// 使用规则的资源名作为键,将CircuitBreaker添加到cbMap对应的列表中,如果cbMap中不存在该键则先创建空列表
String resourceName = rule.getResource();
List<CircuitBreaker> cbList = cbMap.get(resourceName);
if (cbList == null) {
cbList = new ArrayList<>();
cbMap.put(resourceName, cbList);
}
cbList.add(cb);
}
return cbMap;
}
/*
获取与给定降级规则相同的现有断路器或创建新的断路器
*/
private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
// 根据给定的降级规则获取所有断路器的列表
List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
// 非空校验
if (cbs == null || cbs.isEmpty()) {
return newCircuitBreakerFrom(rule);
}
// 遍历熔断器
for (CircuitBreaker cb : cbs) {
// 果找到与给定降级规则相同的断路器,则重用该断路器并返回
if (rule.equals(cb.getRule())) {
// Reuse the circuit breaker if the rule remains unchanged.
return cb;
}
}
// 执行到这里说明, 没有找到相同的熔断器, 根据给定的规则创建新的熔断器, 并返回
return newCircuitBreakerFrom(rule);
}
/*
根据指定的降级规则创建一个熔断器
*/
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
// 根据熔断器策略进行分发
switch (rule.getGrade()) {
// RT响应时间
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
// 异常比例, 异常数
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
核心流程
熔断验证
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(...) throws Throwable {
// 熔断验证逻辑
performChecking(...);
// 放行
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(...) {
}
}
可以看到核心熔断验证逻辑在performChecking(), 那么它做了什么事
- 获取熔断器
- 熔断相关的校验, 失败就抛出降级异常
/*
熔断检查
*/
void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 先根据资源name获取熔断器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
// 调用每个熔断器的 tryPass 方法进行验证
for (CircuitBreaker cb : circuitBreakers) {
// 验证失败则抛出异常进行熔断
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
可以看到真正判断是否触发熔断的是tryPass()
tryPass()做了什么事?
- 检查当前熔断器状态
- 关闭: 不需要熔断, 放行
- 打开: 继续往下执行
- 熔断器是打开状态, 判断当前系统时间大于等于下一次尝试恢复的时间
- 是: 将熔断器状态更改成半开启
- 否: 放行
/*
如果此次请求已经达到了熔断器恢复时间,并且将熔断器的状态从打开变为半开启(HALF_OPEN),则放行,反之拒绝
*/
@Override
public boolean tryPass(Context context) {
// 熔断器为关闭状态
if (currentState.get() == State.CLOSED) {
return true;
}
// 熔断器为开启状态
if (currentState.get() == State.OPEN) {
// 如果此次请求已经达到了熔断器恢复时间并且将熔断器的状态从打开变为半开启(HALF_OPEN),则放行,反之拒绝
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
retryTimeoutArrived()
// nextRetryTimestamp:下一次尝试恢复的时间
protected boolean retryTimeoutArrived() {
// 如果当前系统时间大于等于下一次尝试恢复的时间,也就是说已经到达了可以尝试恢复的时间,则返回 true,反之返回 false
return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
}
fromOpenToHalfOpen()
/*
尝试将熔断器的状态从打开(OPEN)更改为半开启(HALF_OPEN)。如果状态切换成功,返回 true 表示请求放行;否则返回 false 表示拒绝请求
*/
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 这里使用观察者模式, 通知观察者, 当前熔断器的状态从OPEN变成了HALF_OPEN
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
流程如下
熔断器开关时机
熔断开关时期应该是触发配置阈值时
, 但是数据何时采集?
entry()
为请求入口, 此时还没结束, 无法获取到异常数, RT相关信息, 而exit()
请求出口, 此时请求已经结束, 可以获取到RT, 异常数相关信息, 所以数据再exit()
中采集
代码如下
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
// 如果没报错,那就调用 onRequestComplete 方法来计数
if (curEntry.getBlockError() == null) {
// 放行该请求
// 这里使用for循环的原因: 因为一个资源的熔断器有多个, 完全可以对某个资源既按照慢调用比例进行熔断又按照异常数进行熔断
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
onRequestComplete的作用计数, 当请求结束时,会根据配置的熔断策略(异常比例或异常数)来更新计数器。如果达到阈值,熔断器状态将从 CLOSED
变为 OPEN
,
具体实现看ExceptionCircuitBreaker和ResponseTimeCircuitBreaker, 下边分析
ExceptionCircuitBreaker
异常数: errorCount
异常数
异常比例: 额外使用totalCount
记录请求总数, 异常比例 = errorCount / totalCount
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
/*
请求数据统计
*/
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
// 获取当前值
SimpleErrorCounter counter = stat.currentWindow().value();
// 如果此次请求报错了,则将 errorCount + 1
if (error != null) {
counter.getErrorCount().add(1);
}
// 将 totalCount 总数 + 1,用于计算异常比例
counter.getTotalCount().add(1);
// 根据当前请求的异常数/异常比例与设定阈值的关系,调用handleStateChangeWhenThresholdExceeded(error)方法来执行相应的状态变更操作
handleStateChangeWhenThresholdExceeded(error);
}
/*
熔断器开关变化逻辑处理
*/
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
// 如果当前熔断器已经打开了,则直接返回。
if (currentState.get() == State.OPEN) {
return;
}
// 如果是半开启状态
if (currentState.get() == State.HALF_OPEN) {
// 如果本次请求没出现异常,则代表可以关闭熔断器了,因此 fromHalfOpenToClose 关闭熔断器
if (error == null) {
// 这里面会通知各个观察者
fromHalfOpenToClose();
} else {
// 如果本次请求还是异常,那就继续熔断,打开熔断器
// 这里面会通知各个观察者
fromHalfOpenToOpen(1.0d);
}
return;
}
// 执行到这里, 说明熔断器处于一个关的状态
List<SimpleErrorCounter> counters = stat.values();
// 异常数量
long errCount = 0;
// 请求总数
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
// 如果请求总数没超过最小请求数,那直接放行
if (totalCount < minRequestAmount) {
return;
}
// curCount表示为当前配置熔断触发阈值, 配置熔断策略不同, 含义也不同
double curCount = errCount;
// 熔断策略为异常比例
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// 计算异常比例, 公式: 异常比例 = 异常数 / 总请求数
curCount = errCount * 1.0d / totalCount;
}
// 当错误率或者错误数大于阈值,则打开熔断器
if (curCount > threshold) {
// 这里面会通知各个观察者
transformToOpen(curCount);
}
}
}
异常数/异常比例的熔断降级了流程如下
ResponseTimeCircuitBreaker
这里主要统计的是慢比例调用
数据,
慢比例计算公式如下
响应时间RT = 请求结束时间 - 请求开始时间
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
@Override
public void onRequestComplete(Context context) {
// 获取滑动窗口统计的当前窗口内的慢请求计数器实例
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
// 计算请求响应时间(耗时),即完成时间减去创建时间
long rt = completeTime - entry.getCreateTimestamp();
// 判断响应时间是否超过最大允许响应时间,若超过则将慢请求计数加1
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
// 不论请求是否为慢请求,都将总请求计数加1
counter.totalCount.add(1);
// 根据当前请求的响应时间与设定阈值的关系,调用handleStateChangeWhenThresholdExceeded(rt)方法来执行相应的状态变更操作
handleStateChangeWhenThresholdExceeded(rt);
}
}
降级
降级就是抛出异常, 抛出异常也是降级的一种手段,Slot 相当于过滤器链,过滤器阶段就给拦截了,就不会进入到主业务流程当中,也就不会去查询数据库等一系列业务逻辑。当然,你可以捕获这个异常做一些你想做的事情,这就是降级
总结
熔断器分类和原理
- 异常熔断器
- 负责异常数/异常比例
- 请求结束时统计异常数和请求总数, 判断是否达到阈值, 达到阈值更改熔断器状态
- RT熔断器:
- 负责的是响应时间
- 计算请求结束和请求开始的差值, 和阈值比较, 判断是否达到阈值, 达到阈值更改熔断器状态
熔断器大体流程
- 计数
- 对比阈值
- 熔断器验证
状态流转
OPEN
: 熔断器打开, 系统进入熔断状态HALF_OPEN
: 熔断器半开, 系统放行部分请求, 如果请求通过, 熔断器切回关闭状态, 如果请求出现异常, 熔断器切回打开, 继续熔断CLAOSE
: 熔断器关闭, 系统正常
如下图
HALF_OPEN
像是一个中间态
参考资料
通关 Sentinel 流量治理框架 - 编程界的小學生