这一篇我们来分析下Sentinel
系统规则的使用,这里我们再来看下官方对其的介绍。
一、系统规则介绍
系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN
),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
系统规则支持以下的模式:
- Load 自适应(仅对 Linux/Unix-like 机器生效):系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps minRt 估算得出。设定参考值一般是 CPU cores 2.5。
- CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。
- 平均 RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
- 并发线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
- 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
也就是说系统规则是对整个系统的调用其作用的,而不是像流量控制、熔断降级那样需要给系统对应的资源设置对应的控制规则。
同时对系统的调用解析规则控制可以从不同角度来解析规则控制,例如上面的CPU usage
、RT
、QPS
这些不同的维度来控制。
二、demo
public class SystemGuardDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
initSystemRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
pass.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (BlockException e1) {
block.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initSystemRule() {
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.9);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(10);
// max parallel working thread is 10
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
+ oneSecondTotal + ", pass:"
+ oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
System.exit(0);
}
}
}
100, 1649568871387, total:2594, pass:100, block:2500
99, 1649568872395, total:3206, pass:75, block:3225
98, 1649568873404, total:3283, pass:36, block:3164
97, 1649568874411, total:4417, pass:16, block:4484
96, 1649568875412, total:4901, pass:22, block:4879
95, 1649568876412, total:4996, pass:16, block:4980
94, 1649568877413, total:4903, pass:17, block:4886
93, 1649568879250, total:165, pass:16, block:75
92, 1649568884798, total:33, pass:5, block:10
91, 1649568887934, total:9, pass:0, block:17
90, 1649568894137, total:112, pass:22, block:93
89, 1649568895138, total:3202, pass:28, block:3255
88, 1649568896139, total:3200, pass:27, block:3173
这个是官方的demo,上面我们主要是需要注意SphU.entry("methodA", EntryType.IN)
,也就是我们设置了这个规则判断是入口流量,我们看下其的规则设置。
1、规则设置
private static void initSystemRule() {
List<SystemRule> rules = new ArrayList<SystemRule>();
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.9);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(10);
// max parallel working thread is 10
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
public class SystemRule extends AbstractRule {
/**
* negative value means no threshold checking.
*/
private double highestSystemLoad = -1;
/**
* cpu usage, between [0, 1]
*/
private double highestCpuUsage = -1;
private double qps = -1;
private long avgRt = -1;
private long maxThread = -1;
这里我们可以看到其能设置不同维度的初始,例如highestSystemLoad
、highestCpuUsage
,这个都是与系统负载使用相关的控制,例如CPU
的使用率、还可以从入口流量的qps
控制、还有最大的响应时间、最大的线程数等,那这些参数是怎样控制的呢。
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
入口通过SystemRuleManager.checkSystem
来判断。
2、参数控制
1)、loadSystemConf
public final class SystemRuleManager {
..........
public static void loadSystemConf(SystemRule rule) {
boolean checkStatus = false;
// Check if it's valid.
if (rule.getHighestSystemLoad() >= 0) {
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
checkStatus = true;
}
if (rule.getHighestCpuUsage() >= 0) {
if (rule.getHighestCpuUsage() > 1) {
RecordLog.warn(String.format("[SystemRuleManager] Ignoring invalid SystemRule: "
+ "highestCpuUsage %.3f > 1", rule.getHighestCpuUsage()));
} else {
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
}
if (rule.getAvgRt() >= 0) {
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
if (rule.getMaxThread() >= 0) {
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
if (rule.getQps() >= 0) {
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
这个是最初的初始初始化,同时我们在前面可以看到这些参数默认是为-1
,这里就是如果你主动设置了这些qps
、rt
等,就会设置checkStatus
为true
来进行规则的校验。
public final class SystemRuleManager {
private static volatile double highestSystemLoad = Double.MAX_VALUE;
/**
* cpu usage, between [0, 1]
*/
private static volatile double highestCpuUsage = Double.MAX_VALUE;
private static volatile double qps = Double.MAX_VALUE;
private static volatile long maxRt = Long.MAX_VALUE;
private static volatile long maxThread = Long.MAX_VALUE;
并且这些参数默认就是设置的对应的最大值,然后主要就是用设置的来替换这些默认的值(Math.min(maxRt, rule.getAvgRt())
),主动设置的应该是更小的。
2)、checkSystem
public final class SystemRuleManager {
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.passQps();
if (currentQps + count > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
这里就是具体的校验,可以看到首先是checkSystemStatus.get()
判断,如果设置了规则就在后面解析判断,同时其还有是对入口流量判断:
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
之后下面就是规则校验,我们可以看到其是按顺序来进行判断的,如果不能通过就抛出SystemBlockException
。判断的顺序就是qps
-> currentThread
(最大线程数)-> rt
(最大的返回事件)-> highestSystemLoad
(系统加载)-> highestCpuUsage
(cpu的使用率)。这里关键是对highestSystemLoad
、highestCpuUsage
计算,其后台主要是SystemStatusListener
在处理:
public final class SystemRuleManager {
..........
private static SystemStatusListener statusListener = null;
private final static SystemPropertyListener listener = new SystemPropertyListener();
private static SentinelProperty<List<SystemRule>> currentProperty = new DynamicSentinelProperty<List<SystemRule>>();
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-system-status-record-task", true));
static {
checkSystemStatus.set(false);
statusListener = new SystemStatusListener();
scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS);
currentProperty.addListener(listener);
}
public class SystemStatusListener implements Runnable {
感兴趣的可以去看下SystemStatusListener
的逻辑计算,这里就不具体分析了(懒,
),直接贴其源码吧。
@Override
public void run() {
try {
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
currentLoad = osBean.getSystemLoadAverage();
/*
* Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>
* Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
* A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
* of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
* observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
* system. If the system recent cpu usage is not available, the method returns a negative value.
*/
double systemCpuUsage = osBean.getSystemCpuLoad();
// calculate process cpu usage to support application running in container environment
RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
long newProcessCpuTime = osBean.getProcessCpuTime();
long newProcessUpTime = runtimeBean.getUptime();
int cpuCores = osBean.getAvailableProcessors();
long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
.toMillis(newProcessCpuTime - processCpuTime);
long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
processCpuTime = newProcessCpuTime;
processUpTime = newProcessUpTime;
currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
writeSystemStatusLog();
}
} catch (Throwable e) {
RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}