资源数据采集
之前的NodeSelectorSlot
和ClusterBuilderSlot
已经完成了对资源调用树的构建, 现在则是要对资源进行收集, 核心点就是这些资源数据是如何统计
LogSlot
作用: 记录异常请求日志, 用于故障排查
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
// 啥也没干, 直接调用下一个Slot
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
// 被流控或者熔断降级后直接打印log
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
try {
// 啥也没干,直接调用下一个 Slot
fireExit(context, resourceWrapper, count, args);
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exit exception", e);
}
}
}
LogSlot只做了一件事, 当出现BlockException
异常时, 记录log日志(EagleEyeLogUtil.log
会将日志写到 sentinel-block.log
文件中)
StatisticSlot
初始StatisticSlot
如果要设计一个 StatisticSlot,首先需要明确其需要实现的功能,即收集各种指标数据,如请求总数、请求成功数、请求失败数、响应时间等。
目前先把核心结构先列出来, 后续填充其他功能
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 调用责任链下一个 Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
} catch (Throwable e) {
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
// 调用责任链下一个 Slot
fireExit(context, resourceWrapper, count);
}
}
错误信息和异常数统计
fireEntry()
调用的是真正验证用于的Slot, 比如FlowSlot, DegradeSlot等, 如果后续验证不通过的话, 那么会抛出BlockException
, 那么此时就可以使用try-catch捕获, 捕获后记录异常错误信息以及异常数
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 调用下一个Slot, 如果验证不通过, 那就捕获异常
fireEntry(context, resourceWrapper, node, count, prioritized, args);
} catch (BlockException e) {
// 捕获 BlockException
throw e;
} catch (Throwable e) {
// .....
throw e;
}
}
QPS和线程数统计
QPS和线程数的统计应该在什么时候统计?
可以fireEntry()之后
进行统计, 调用fireEntry()
- 如果没有报
BlockException
, 则表示没有被流控
或熔断降级
- 将当前资源占用的线程数 + 1以及当前请求QPS + 1
- 如果报了
BlockException
, 则表示被拦截了, 即请求失败- 将请求拒绝的QPS + 1
对于总的QPS则可以通过公式计算 总QPS = 成功QPS + 失败QPS
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 规则验证
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果能走到这里,则将当前资源占用的线程数 + 1 以及当前资源请求成功的 QPS 数 + 1
node.increaseThreadNum();
node.addPassRequest();
} catch (BlockException e) { // 捕获 BlockException
// 如果规则验证失败,则将 BlockQps + 1
node.increaseBlockQps();
throw e;
} catch (Throwable e) {
// .....
throw e;
}
}
响应时间统计
entry()是入口方法
,相当于 AOP的before() 方法,那我们肯定会对应一个after() 方法,exit()是出口方法
, 也就说可以在exit()中记录响应时间
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// 获取系统当前时间
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
// 得到响应时间,这个时间是哪里来的呢?是我们最初最开始为资源创建Entry对象时记录的。
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
// 记录响应时间等信息
recordCompleteFor(node, count, rt, error);
}
fireExit(context, resourceWrapper, count, args);
}
结束时间是在 StatisticsSlot 里的exit方法记录的,那开始时间是在哪记录的呢?在entry方法里记录可以吗?显然不妥,因为StatisticsSlot不是第一个Slot,不能作为请求的起始时间
,起始时间应该放到初始化Entry
资源管理对象,也就是只要资源诞生就意味着此次请求开始了,而且我们在设计 Entry 类的时候也将开始时间和结束时间两个字段设计进去了,因此我们开始时间我们可以直接通过 context.getCurEntry().getCreateTimestamp()
获取
流程图如下
DefaltNode, EntranceNode和ClusterNode的指标如何统计
- DefaltNode:用于统计某个 Context 下某个资源的指标信息,维度是 Context + 资源
- EntranceNode:用于统计某个 Context 下全部资源的指标信息,维度是 Context
- ClusterNode:用于统计某个资源在全部 Context 下的指标信息,维度是资源,与 Context 无关
收集指标信息也就是每次请求就记录一下, 问题就是在哪里出发记录的动作?
即下述三个问题
- 如何统计某个资源在某个Context下的指标?
- 如何统计某个Context下所有资源的指标?
- 如何统计某个资源在全部Context中的指标?
如何统计某个资源在某个Context下的指标?
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 数据统计
node.increaseThreadNum();
node.addPassRequest(count);
}
}
可以发现 increaseThreadNum()
和 addPassRequest()
方法都是node调用的,那node是什么呢?
node是DefaultNode类型
的方法参数,我们还知道 entry()
方法是通过上一个责任链:ClusterSlot调用的,也就是说node这个参数是前面Slot传过来的,其实,我们回溯回去,会发现这个node就是DefaultNode本身,并不是它的子类EntranceNode。因此,我们得出一个结论:StatisticSlot直接调用DefaultNode里的方法进行指标收集,我们又知道DefaultNode的维度是Context + 资源
public class DefaultNode extends StatisticNode {
// 和资源绑定
private ResourceWrapper id;
private ClusterNode clusterNode;
// 增加线程数
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
// 增加请求成功数
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
}
DefaultNode核心源码
public class DefaultNode extends StatisticNode {
// 和资源绑定
private ResourceWrapper id;
private ClusterNode clusterNode;
// 增加线程数
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
// 增加请求成功数
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
}
DefaultNode 的维度是 Context + 资源,DefaultNode源码里只看到了资源 ResourceWrapper,没有看到Context呢?在NodeSelectorSlot的entry()
方法里我们会初始化DefaultNode 且与Context进行绑定(Key-Value形式),核心代码
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
// Context#name与DefaultNode 进行绑定
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
public void entry(...) {
DefaultNode node = new DefaultNode(resourceWrapper, null);
map.put(context.getName(), node);
}
}
要想获取某个资源在某 Context 下的指标时
- 从map中获取DefaultNode
- 从DefaultNode获取资源Id
流程图如下
如何统计某个Context下所有资源的指标?
也就是不细分资源,直接统计Context
如何找到当前Context下的全部资源呢?
- 一个资源肯定对应一个DefaultNode
- EntranceNode相当于树干,它有很多树枝 DefaultNode 挂到其下面
public class EntranceNode extends DefaultNode {
// 树枝
private volatile Set<Node> childList = new HashSet<>();
}
有了这个 childList 事情就变得简单了,直接 for 循环遍历即可,获取到的是每个 DefaultNode,然后调用每个 DefaultNode 的统计方法进行求和即可,如下所示:
public class EntranceNode extends DefaultNode {
@Override
public int curThreadNum() {
int r = 0;
// 遍历 DefaultNode 子集
for (Node node : getChildList()) {
// += 操作求和
r += node.curThreadNum();
}
return r;
}
@Override
public double passQps() {
double r = 0;
for (Node node : getChildList()) {
r += node.passQps();
}
return r;
}
}
如何统计某个资源在全部Context中的指标?
我们知道 ClusterNode 是在 DefaultNode 下的,一个资源至少对应一个 DefaultNode 以及会对应唯一一个 ClusterNode (因为 ClusterNode 的维度是资源,所以不管资源在哪几个 Context 下,都只会对应唯一一个 ClusterNode)
上边的DefaultNode 的时候不管是 increaseThreadNum()
还是 addPassRequest()
都会调用一个方法叫:this.clusterNode.increaseXxx()
,其实这就是用于统计某个资源在所有 Context 下的指标信息的
public void increaseThreadNum() {
super.increaseThreadNum();
// clusterNode.xxx
this.clusterNode.increaseThreadNum();
}
public void addPassRequest(int count) {
super.addPassRequest(count);
// clusterNode.xxx
this.clusterNode.addPassRequest(count);
}
总结
StatisticSlot只负责指标统计, 调用相关的统计方法进行实现, Sentinel底层采用滑动窗口, 令牌桶, 漏桶三个算法
参考资料
通关 Sentinel 流量治理框架 - 编程界的小學生