01、Sentinel的基本使用-流量控制及其源码调用分析

首先Sentinel的一些基本介绍就不多说了,其是阿里开源的用于流量控制熔断降级的这些高并发下的一些控制。下面我们直接看使用的案例。

首先在项目中我们引入对应的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.3</version>
</dependency>

一、流量控制

1、QPS方式

public class SimpleEntryMain {

    private static String resourceName = "simpleEntry";

    public static void main(String[] args) throws InterruptedException {

        initFlowRule();
        for (int i = 0; i < 10; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
            Thread.sleep(50);
        }
    }

    private static void initFlowRule() {

        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(resourceName);
        rule1.setCount(2);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    static class RunTask implements Runnable {

        @Override
        public void run() {

                Entry entry = null;
                try {

                    entry = SphU.entry(resourceName);
                    doSomeThing();
                } catch (BlockException e1) {

                    System.out.println("Block   ----------");
                } finally {

                    if (entry != null) {

                        entry.exit();
                    }
                }
        }
    }

    public static void doSomeThing(){

        System.out.println("++++++++++++++++++");
        try {

            Thread.sleep(3000);
        } catch (InterruptedException e) {

            // ignore
        }
    }
}

这个demo首先是通过initFlowRule()方法设置了规则,然后我们通过多线程来模拟多个调用规则,方法的调用我们简单的模拟花费Thread.sleep(3000)也就是3秒。然后线程中运行的就是Sentinel的使用:

Entry entry = null;
try {

    entry = SphU.entry(resourceName);
    doSomeThing();
} catch (BlockException e1) {

    System.out.println("Block   ----------");
} finally {

    if (entry != null) {

        entry.exit();
    }
}

这里就是try-catch-finally的标准使用流程,其主要是通过SphU.entry(resourceName)的调用在里面解析规则的配置校验,如果能通过的话就不会抛出异常,如果不能通过就抛出BlockException异常。我们上面是直接使用,当我们在SpringBoot中直接使用的话,其本身的封装可能使用动态代理或者AOP的方式来将这里的doSomeThingSphU.entry(resourceName)的获取做解耦,我们加了注解后就可以直接专注于我们的doSomeThing()的方法逻辑。

2、demo介绍

上面就是基于QPS方式来限流,首先是我们设置对应的规则:

private static void initFlowRule() {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(resourceName);
    rule1.setCount(2);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

这里我们主要是设置了4个参数:

1、resource

这个是表示当前规则控制的是哪个资源。这个与SphU.entry(resourceName)是对应的,当你在通过其看能不能获取规则的时候,其就是通过resourceName来匹配你前面设置的规则的。

2、count

这个是规则的数量现在,例如你如果是基于QPS的方法的话,这个count就表示一秒内只能允许两个请求通过。

3、grade

这个是表示你选择的哪种流控规则。例如QPS、或同时运行的线程数。

/**
 * The threshold type of flow control (0: thread count, 1: QPS).
 */
private int grade = RuleConstant.FLOW_GRADE_QPS;
public final class RuleConstant {

    public static final int FLOW_GRADE_THREAD = 0;
    public static final int FLOW_GRADE_QPS = 1;

4、limitApp

这个是表示规则现在的哪个应用的资源,应该是用于规则只作用在对应的app上面,不过你本身练习或单应用,并不需要主动来设置,因为其在new FlowRule()的时候就自动设置了:

public FlowRule() {

    super();
    setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public static final String LIMIT_APP_DEFAULT = "default";

3、线程启动之间Thread.sleep(50)的原因

上面的注意点主要有我们故意在开启10个线程的时候,在它们之间加了Thread.sleep(50),如果不加这个可能就没有柱塞的效果,例如如果没有加这个:

++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++

可以看到没有阻塞,如果我们加了:

++++++++++++++++++
++++++++++++++++++
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------

当然如果有加Thread.sleep(50),打印也可能是其他的顺序,这个看CPU的具体运行。

我们之所以要在线程启动之间加这个运行间隔,是因为Sentinel本身的计算原理。一个是sentinel是否能通过判断与对应的统计数量+1中间并不是原子性的。Sentinel本身的运行时一个链式执行结构,因为其本身不单只有流量控制规则、还是熔断降级等其他的规则,以及一些统计运算,这些都是封装在ProcessorSlot接口中,然后通过设置next就这样一个、一个的链上了。

1、判断与统计+1之间只有可见性,而没有原则性

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {

        if (next != null) {

            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

&nbsp;

例如我们的流控节点就是FlowSlot在处理、熔断降级是DegradeSlot处理的,然后我们是否能通过的数据就是StatisticSlot在处理:

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {

        try {

            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);
            ..........
    }
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {

        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

&nbsp;

可以看到我们的判断是在FlowSlot中处理的,然后对于节点的计数是在StatisticSlot中处理的,而这组件是没有同步控制的,

输入计数是使用了LongAdder实现可见性。

public class MetricBucket {

    private final LongAdder[] counters;
private LongAdder curThreadNum = new LongAdder();

所以在并发的时候,可能就是多个线程在判断的时候,其他线程也是判断,+1操作可能还没有运行。

我们可以修改下线程启动的线程的方式:

public static void main(String[] args) throws InterruptedException {

//        initFlowQpsRule();
        initFlowRule();
        for (int i = 0; i < 5; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
        Thread.sleep(100);
        for (int i = 0; i < 5; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }

我们将10个线程拆分为两组,在之间睡眠100ms

++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------

可以看到前面5个并没有是2+3方式(两个通过、3个阻塞),但后面的5个都被限流了。基于这个分段、并且Sentinel是滑动窗口的方式我们可以联想到另一个原因,就是窗口分段区间的问题。

4、默认的滑动窗口逻辑。

Sentinel本身是使用的滑动窗口进行调用计数的,当在调用entry方法获取资源的时候,其就会计数。这个计数就是通过滑动窗口处理的。其逻辑例如QPS的话,其滑动的单位就是1000ms,然后其内部又会将这个1000ms划分为更多的段,然后再进行数值判断。

例如限流规则使用QPS规则的话,默认是将1秒分为两个区间,也就是每段500ms也就是0.5秒。例如我们在获取数量判断的时候,如果是在B1区间(也就是500-1000)的时候,其就会将B0-B1两个空间的值加起来,而当我们在B3的时候,Sentinel就通过当前的时候就能判断这个时候B0已经失效了,就会将B0失效,生成新的数据端描叙B2,判断数量的会就会是B1+B2

*     B0       B1      B3
* ||_______|_______|_______||___
* 0       500     1000    1500   timestamp
*      ^
*      time=300

这里在我们开始的时候也就是从第0秒开始。然后我们再修改原来的线程启动间隔,并且我们这里将线程改为2+1+2的数量(减少线程数量,是为了防止中间打印太多字符不好分析逻辑)。

public static void main(String[] args) throws InterruptedException {

        initFlowRule();
        for (int i = 0; i < 1; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
        Thread.sleep(500);
        for (int i = 0; i < 1; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
        Thread.sleep(500);
        for (int i = 0; i < 2; i++) {

            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
            Thread.sleep(50);
        }
    }
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block   ----------

我们可以看到这个demo的逻辑,是想说明Sentinel对滑动窗口桶的划分逻辑,Sentinel默认是将1s划分为两个桶,也就是每个桶为500ms,所以我们上面的逻辑就说明了,在B0获取一个了,然后B1也能获取一个,但由于B2是在500-1500这个区间,由这两个桶计算所得,所以在1000-1500这里就只能获取一个,阻塞一个。

public class StatisticNode implements Node {

    /**
     * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
     * by given {@code sampleCount}.
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
public static volatile int SAMPLE_COUNT = 2;
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {

        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
public LeapArray(int sampleCount, int intervalInMs) {

    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.intervalInSecond = intervalInMs / 1000.0;
    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);
}

&nbsp;

例如在StatisticNode计算有多少通过的时候就是通过ArrayMetric里面的桶通过的数量相加:

@Override
public double passQps() {

    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {

        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
        .........
    @Override
    public long pass() {

        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {

            pass += window.pass();
        }
        return pass;
    }

这里的data.currentWindow();就是计数当前是在哪个桶,如果还没有就创建,感兴趣可以自己具体去看下这个逻辑。然后data.values()就是获取当前时间用于数量的桶。

&nbsp;

public MetricBucket() {

    MetricEvent[] events = MetricEvent.values();
    this.counters = new LongAdder[events.length];
    for (MetricEvent event : events) {

        counters[event.ordinal()] = new LongAdder();
    }
    initMinRt();
}
public enum MetricEvent {

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS
}

这里就记录了通过、阻拦、异常这些信息。这些内容就能用于当前请求是否能通过的判断逻辑:

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;

    private double count;
    private int grade;

    public DefaultController(double count, int grade) {

        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {

        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {

        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {

            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {

                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {

                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }
private int avgUsedTokens(Node node) {

    if (node == null) {

        return DEFAULT_AVG_USED_TOKENS;
    }
    return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

如果不能通过就抛出FlowException,这个是BlockException的子类。

public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        if (ruleProvider == null || resource == null) {

            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {

            for (FlowRule rule : rules) {

                if (!canPassCheck(rule, context, node, count, prioritized)) {

                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

关于BlockException,我们可以看到其还有授权、降级等的子类。

&nbsp;

5、流量控制的方式

Sentinel流量控制的方式有三种直接拒绝Warm UP(也就是慢慢达到我们设置的阈值)匀速排队,对应的就是我们控制行为的设置setControlBehavior。前面默认的就是注解拒接,也就是首先直接到我们的阈值,如果超过就直接拒绝。

1、Warm UP

private static void initFlowRule() {

    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(KEY);
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
    rule1.setWarmUpPeriodSec(3);
    //rule1.setWarmUpPeriodSec(10);
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}
public class WarmUpFlowDemo {

    private static final String KEY = "abc";

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;

    private static final int threadCount = 100;
    private static int seconds = 60 + 40;

    public static void main(String[] args) throws Exception {

        initFlowRule();
        // trigger Sentinel internal init
        Entry entry = null;
        try {

            entry = SphU.entry(KEY);
        } catch (Exception e) {

        } finally {

            if (entry != null) {

                entry.exit();
            }
        }

        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();

        //first make the system run on a very low condition
        for (int i = 0; i < 3; i++) {

            Thread t = new Thread(new WarmUpTask());
            t.setName("sentinel-warmup-task");
            t.start();
        }
        Thread.sleep(2000);
        for (int i = 0; i < threadCount; i++) {

            Thread t = new Thread(new WarmUpTask());
            t.setName("sentinel-run-task");
            t.start();
        }
    }

    private static void initFlowRule() {

        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(KEY);
        rule1.setCount(20);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
        rule1.setWarmUpPeriodSec(3);
        //rule1.setWarmUpPeriodSec(10);
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

    static class WarmUpTask implements Runnable {

        @Override
        public void run() {

            while (!stop) {

                Entry entry = null;
                try {

                    entry = SphU.entry(KEY);
                    // token acquired, means pass
                    pass.addAndGet(1);
                } catch (BlockException e1) {

                    block.incrementAndGet();
                } catch (Exception e2) {

                    // biz exception
                } finally {

                    total.incrementAndGet();
                    if (entry != null) {

                        entry.exit();
                    }
                }
                Random random2 = new Random();
                try {

                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000));
                } catch (InterruptedException e) {

                    // ignore
                }
            }
        }
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {

            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");
            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {

                try {

                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {

                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                        + ", pass:" + oneSecondPass
                        + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {

                    stop = true;
                }
            }
        }
    }
}

这个是官方的demo,删了一点东西,看起来更清楚。这个TimerTask相关的线程就是每隔一秒打印在这一秒钟请求的总数、通过的数量、阻断的数量。然后WarmUpTask就通过while循环一直获取资源。然后对应规则的参数,阈值是20,我们预热的事件设置为3S(rule1.setWarmUpPeriodSec(3)),这个值越快,就表示预热的事件越短。而我们启动线程的时候,最开始是3个线程来获取,然后突然增加到100个,我们看当我们rule1.setWarmUpPeriodSec(10)时候:

begin to statistic!!!
1649141717082, total:6, pass:5, block:1
1649141718082, total:2, pass:2, block:0
1649141719083, total:2, pass:2, block:0
1649141720083, total:3, pass:3, block:0
1649141721083, total:23, pass:7, block:16
1649141722083, total:148, pass:6, block:142
1649141723084, total:109, pass:7, block:102
1649141724085, total:90, pass:7, block:83
1649141725085, total:105, pass:8, block:97
1649141726085, total:107, pass:8, block:99
1649141727085, total:99, pass:9, block:90
1649141728086, total:102, pass:9, block:93
1649141729086, total:108, pass:11, block:97
1649141730086, total:110, pass:12, block:98
1649141732086, total:103, pass:16, block:87
1649141733088, total:99, pass:19, block:80

可以看到最开始线程数量少,可以看到其没有什么block。然后当线程多的时候,其就block多,但pass我们可以看到其是从7渐渐增加到了20左右。我们设置rule1.setWarmUpPeriodSec(3)

begin to statistic!!!
1649141873716, total:3, pass:3, block:0
1649141874716, total:3, pass:3, block:0
1649141875717, total:3, pass:3, block:0
1649141876717, total:4, pass:4, block:0
1649141877718, total:7, pass:7, block:0
1649141878718, total:167, pass:8, block:159
1649141879719, total:104, pass:11, block:93
1649141880719, total:105, pass:20, block:85
1649141881718, total:110, pass:20, block:90
1649141882720, total:106, pass:20, block:86

可以看到其pass很快就增加到了20

4、匀速排队

这种方式是适用于:这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。

注意:匀速排队模式暂时不支持 QPS > 1000 的场景。

匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。

也就是这种方式其是不会拒绝请求的,其内部的实现是通过睡眠来处理,例如默认的DefaultController其是计数直接返回false

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;

    private double count;
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

而匀速排队的处理:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {

    // Pass when acquire count is less or equal than 0.
    if (acquireCount <= 0) {

        return true;
    }
    // Reject when count is less or equal than 0.
    // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
    if (count <= 0) {

        return false;
    }

    long currentTime = TimeUtil.currentTimeMillis();
    // Calculate the interval between every two requests.
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

    // Expected pass time of this request.
    long expectedTime = costTime + latestPassedTime.get();

    if (expectedTime <= currentTime) {

        // Contention may exist here, but it's okay.
        latestPassedTime.set(currentTime);
        return true;
    } else {

        // Calculate the time to wait.
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
        if (waitTime > maxQueueingTimeMs) {

            return false;
        } else {

            long oldTime = latestPassedTime.addAndGet(costTime);
            try {

                waitTime = oldTime - TimeUtil.currentTimeMillis();
                if (waitTime > maxQueueingTimeMs) {

                    latestPassedTime.addAndGet(-costTime);
                    return false;
                }
                // in race condition waitTime may <= 0
                if (waitTime > 0) {

                    Thread.sleep(waitTime);
                }
                return true;
            } catch (InterruptedException e) {

            }
        }
    }
    return false;
}

可以看到其if (count <= 0) {只用于最前面的判断,后面的truefalse并没有直接与其关联。

&nbsp;

然后再运行过程中,可以看到其会计算等待时间,通过这个时间去sleep,睡眠过了直接是直接返回true,让这个请求继续跑。但这里也就是还有个补充逻辑,也就是你能设置最大的请求时间rule1.setMaxQueueingTimeMs(20 * 1000) ,超过的话也是直接返回falseblockedwaitTime > maxQueueingTimeMs,例如规则:

private static void initPaceFlowRule() {

    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(KEY);
    rule1.setCount(count);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    /*
     * CONTROL_BEHAVIOR_RATE_LIMITER means requests more than threshold will be queueing in the queue,
     * until the queueing time is more than {@link FlowRule#maxQueueingTimeMs}, the requests will be rejected.
     */
    rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
    rule1.setMaxQueueingTimeMs(20 * 1000);  //也就是最长等待20s

    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

6、通过线程数来进行流量控制

这个本身的一些设置与QPS没有太大的区别,了解了QPS也比较容易了解这种方式,其的统计对象是运行的线程数量。这里直接贴一

个demo:

public class FlowThreadDemo {

    private static int seconds = 60 + 40;
    private static volatile int methodBRunningTime = 2000;

    private CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws Exception {

        initFlowRule();
        for (int i = 0; i < 4; i++) {

            Thread entryThread = new Thread(new Runnable() {

                @SneakyThrows
                @Override
                public void run() {

                    while (true) {

                        Entry methodA = null;
                        try {

                            methodA = SphU.entry("methodA");
                            System.out.println(Thread.currentThread().getName() + "--------pass---"+System.currentTimeMillis());
                            Thread.sleep(2000);
                        } catch (BlockException e1) {

                            System.out.println(Thread.currentThread().getName() + "--------block");
                        } catch (Exception e2) {

                            // biz exception
                        } finally {

                            if (methodA != null) {

                                methodA.exit();
                            }
                        }
                    }
                }
            });
            entryThread.setName("working thread1");
            entryThread.start();
        }
        Thread.sleep(1500);
        for (int i = 0; i < 4; i++) {

            Thread entryThread = new Thread(new Runnable() {

                @SneakyThrows
                @Override
                public void run() {

                    while (true) {

                        Entry methodA = null;
                        try {

                            TimeUnit.MILLISECONDS.sleep(50);
                            methodA = SphU.entry("methodA");
                            System.out.println(Thread.currentThread().getName() + "--------pass---"+System.currentTimeMillis());
                            Thread.sleep(2000);
                        } catch (BlockException e1) {

                            System.out.println(Thread.currentThread().getName() + "--------block");
                        } catch (Exception e2) {

                            // biz exception
                        } finally {

                            if (methodA != null) {

                                methodA.exit();
                            }
                        }
                    }
                }
            });
            entryThread.setName("working thread2");
            entryThread.start();
        }
    }

    private static void initFlowRule() {

        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource("methodA");
        // set limit concurrent thread for 'methodA' to 20
        rule1.setCount(4);
        rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
        rule1.setLimitApp("default");

        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }

}

这个demo的运行不像前面的QPS。我们设置同时能运行的线程数是4-rule1.setCount(4),然后在第一组启动4个线程与第二组启动4个线程中间睡眠了了1.5s,也就是说第二组如果是QPS的话,是能运行的:

working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block

但目前我们可以,thread2并没有机会运行,这是因为我们线程运行的事件模拟是2s-Thread.sleep(2000);,也就是说这个时候第一组还没有释放,释放是通过methodA.exit()方法的调用,将当前运行的线程数减1(加一操作与QPS一样,我们前面QPS有提到,也就是StatisticSlot的entry方法)。同样是在StatisticSlot处理

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

    Node node = context.getCurNode();

    if (context.getCurEntry().getBlockError() == null) {

        // Calculate response time (use completeStatTime as the time of completion).
        long completeStatTime = TimeUtil.currentTimeMillis();
        context.getCurEntry().setCompleteTimestamp(completeStatTime);
        long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

        Throwable error = context.getCurEntry().getError();

        // Record response time and success count.
        recordCompleteFor(node, count, rt, error);
        recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
        if (resourceWrapper.getEntryType() == EntryType.IN) {

            recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
        }
    }
        ...........
    fireExit(context, resourceWrapper, count);
}
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {

    if (node == null) {

        return;
    }
    node.addRtAndSuccess(rt, batchCount);
    node.decreaseThreadNum();

    if (error != null && !(error instanceof BlockException)) {

        node.increaseExceptionQps(batchCount);
    }
}
@Override
public void decreaseThreadNum() {

    super.decreaseThreadNum();
    this.clusterNode.decreaseThreadNum();
}