03、Sentinel的基本使用-系统规则

这一篇我们来分析下Sentinel系统规则的使用,这里我们再来看下官方对其的介绍。

一、系统规则介绍

系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。

系统规则支持以下的模式:

  • Load 自适应(仅对 Linux/Unix-like 机器生效):系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps minRt 估算得出。设定参考值一般是 CPU cores 2.5。
  • CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。
  • 平均 RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
  • 并发线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
  • 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。

也就是说系统规则是对整个系统的调用其作用的,而不是像流量控制、熔断降级那样需要给系统对应的资源设置对应的控制规则。

同时对系统的调用解析规则控制可以从不同角度来解析规则控制,例如上面的CPU usageRTQPS这些不同的维度来控制。

二、demo

public class SystemGuardDemo {

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;
    private static final int threadCount = 100;

    private static int seconds = 60 + 40;

    public static void main(String[] args) throws Exception {

        tick();
        initSystemRule();

        for (int i = 0; i < threadCount; i++) {

            Thread entryThread = new Thread(new Runnable() {

                @Override
                public void run() {

                    while (true) {

                        Entry entry = null;
                        try {

                            entry = SphU.entry("methodA", EntryType.IN);
                            pass.incrementAndGet();
                            try {

                                TimeUnit.MILLISECONDS.sleep(20);
                            } catch (InterruptedException e) {

                                // ignore
                            }
                        } catch (BlockException e1) {

                            block.incrementAndGet();
                            try {

                                TimeUnit.MILLISECONDS.sleep(20);
                            } catch (InterruptedException e) {

                                // ignore
                            }
                        } catch (Exception e2) {

                            // biz exception
                        } finally {

                            total.incrementAndGet();
                            if (entry != null) {

                                entry.exit();
                            }
                        }
                    }
                }

            });
            entryThread.setName("working-thread");
            entryThread.start();
        }
    }

    private static void initSystemRule() {

        List<SystemRule> rules = new ArrayList<SystemRule>();
        SystemRule rule = new SystemRule();
        // max load is 3
        rule.setHighestSystemLoad(3.0);
        // max cpu usage is 60%
        rule.setHighestCpuUsage(0.9);
        // max avg rt of all request is 10 ms
        rule.setAvgRt(10);
        // max total qps is 20
        rule.setQps(10);
        // max parallel working thread is 10
        rule.setMaxThread(10);

        rules.add(rule);
        SystemRuleManager.loadRules(Collections.singletonList(rule));
    }

    private static void tick() {

        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {

            System.out.println("begin to statistic!!!");
            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {

                try {

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {

                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
                    + oneSecondTotal + ", pass:"
                    + oneSecondPass + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {

                    stop = true;
                }
            }
            System.exit(0);
        }
    }
}
100, 1649568871387, total:2594, pass:100, block:2500
99, 1649568872395, total:3206, pass:75, block:3225
98, 1649568873404, total:3283, pass:36, block:3164
97, 1649568874411, total:4417, pass:16, block:4484
96, 1649568875412, total:4901, pass:22, block:4879
95, 1649568876412, total:4996, pass:16, block:4980
94, 1649568877413, total:4903, pass:17, block:4886
93, 1649568879250, total:165, pass:16, block:75
92, 1649568884798, total:33, pass:5, block:10
91, 1649568887934, total:9, pass:0, block:17
90, 1649568894137, total:112, pass:22, block:93
89, 1649568895138, total:3202, pass:28, block:3255
88, 1649568896139, total:3200, pass:27, block:3173

这个是官方的demo,上面我们主要是需要注意SphU.entry("methodA", EntryType.IN),也就是我们设置了这个规则判断是入口流量,我们看下其的规则设置。

1、规则设置

private static void initSystemRule() {

    List<SystemRule> rules = new ArrayList<SystemRule>();
    SystemRule rule = new SystemRule();
    // max load is 3
    rule.setHighestSystemLoad(3.0);
    // max cpu usage is 60%
    rule.setHighestCpuUsage(0.9);
    // max avg rt of all request is 10 ms
    rule.setAvgRt(10);
    // max total qps is 20
    rule.setQps(10);
    // max parallel working thread is 10
    rule.setMaxThread(10);

    rules.add(rule);
    SystemRuleManager.loadRules(Collections.singletonList(rule));
}
public class SystemRule extends AbstractRule {

    /**
     * negative value means no threshold checking.
     */
    private double highestSystemLoad = -1;
    /**
     * cpu usage, between [0, 1]
     */
    private double highestCpuUsage = -1;
    private double qps = -1;
    private long avgRt = -1;
    private long maxThread = -1;

这里我们可以看到其能设置不同维度的初始,例如highestSystemLoadhighestCpuUsage,这个都是与系统负载使用相关的控制,例如CPU的使用率、还可以从入口流量的qps控制、还有最大的响应时间、最大的线程数等,那这些参数是怎样控制的呢。

public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {

        SystemRuleManager.checkSystem(resourceWrapper, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

入口通过SystemRuleManager.checkSystem来判断。

2、参数控制

1)、loadSystemConf

public final class SystemRuleManager {

    ..........
    public static void loadSystemConf(SystemRule rule) {

        boolean checkStatus = false;
        // Check if it's valid.

        if (rule.getHighestSystemLoad() >= 0) {

            highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
            highestSystemLoadIsSet = true;
            checkStatus = true;
        }

        if (rule.getHighestCpuUsage() >= 0) {

            if (rule.getHighestCpuUsage() > 1) {

                RecordLog.warn(String.format("[SystemRuleManager] Ignoring invalid SystemRule: "
                    + "highestCpuUsage %.3f > 1", rule.getHighestCpuUsage()));
            } else {

                highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
                highestCpuUsageIsSet = true;
                checkStatus = true;
            }
        }

        if (rule.getAvgRt() >= 0) {

            maxRt = Math.min(maxRt, rule.getAvgRt());
            maxRtIsSet = true;
            checkStatus = true;
        }
        if (rule.getMaxThread() >= 0) {

            maxThread = Math.min(maxThread, rule.getMaxThread());
            maxThreadIsSet = true;
            checkStatus = true;
        }

        if (rule.getQps() >= 0) {

            qps = Math.min(qps, rule.getQps());
            qpsIsSet = true;
            checkStatus = true;
        }

        checkSystemStatus.set(checkStatus);

    }

这个是最初的初始初始化,同时我们在前面可以看到这些参数默认是为-1,这里就是如果你主动设置了这些qpsrt等,就会设置checkStatustrue来进行规则的校验。

public final class SystemRuleManager {

    private static volatile double highestSystemLoad = Double.MAX_VALUE;
    /**
     * cpu usage, between [0, 1]
     */
    private static volatile double highestCpuUsage = Double.MAX_VALUE;
    private static volatile double qps = Double.MAX_VALUE;
    private static volatile long maxRt = Long.MAX_VALUE;
    private static volatile long maxThread = Long.MAX_VALUE;

并且这些参数默认就是设置的对应的最大值,然后主要就是用设置的来替换这些默认的值(Math.min(maxRt, rule.getAvgRt())),主动设置的应该是更小的。

2)、checkSystem

public final class SystemRuleManager {

    public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {

        if (resourceWrapper == null) {

            return;
        }
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get()) {

            return;
        }

        // for inbound traffic only
        if (resourceWrapper.getEntryType() != EntryType.IN) {

            return;
        }

        // total qps
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.passQps();
        if (currentQps + count > qps) {

            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {

            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {

            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {

            if (!checkBbr(currentThread)) {

                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {

            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

这里就是具体的校验,可以看到首先是checkSystemStatus.get()判断,如果设置了规则就在后面解析判断,同时其还有是对入口流量判断:

if (resourceWrapper.getEntryType() != EntryType.IN) {

    return;
}

之后下面就是规则校验,我们可以看到其是按顺序来进行判断的,如果不能通过就抛出SystemBlockException。判断的顺序就是qps-> currentThread(最大线程数)-> rt(最大的返回事件)-> highestSystemLoad(系统加载)-> highestCpuUsage(cpu的使用率)。这里关键是对highestSystemLoadhighestCpuUsage计算,其后台主要是SystemStatusListener在处理:

public final class SystemRuleManager {

    ..........
    private static SystemStatusListener statusListener = null;
    private final static SystemPropertyListener listener = new SystemPropertyListener();
    private static SentinelProperty<List<SystemRule>> currentProperty = new DynamicSentinelProperty<List<SystemRule>>();
    private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
        new NamedThreadFactory("sentinel-system-status-record-task", true));

    static {

        checkSystemStatus.set(false);
        statusListener = new SystemStatusListener();
        scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS);
        currentProperty.addListener(listener);
    }
public class SystemStatusListener implements Runnable {

感兴趣的可以去看下SystemStatusListener的逻辑计算,这里就不具体分析了(懒,&nbsp;
),直接贴其源码吧。

@Override
public void run() {

    try {

        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        currentLoad = osBean.getSystemLoadAverage();

        /*
         * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br>
         * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
         * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
         * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
         * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
         * system. If the system recent cpu usage is not available, the method returns a negative value.
         */
        double systemCpuUsage = osBean.getSystemCpuLoad();

        // calculate process cpu usage to support application running in container environment
        RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
        long newProcessCpuTime = osBean.getProcessCpuTime();
        long newProcessUpTime = runtimeBean.getUptime();
        int cpuCores = osBean.getAvailableProcessors();
        long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
                .toMillis(newProcessCpuTime - processCpuTime);
        long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
        double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
        processCpuTime = newProcessCpuTime;
        processUpTime = newProcessUpTime;

        currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);

        if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {

            writeSystemStatusLog();
        }
    } catch (Throwable e) {

        RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
    }
}