首先Sentinel的一些基本介绍就不多说了,其是阿里开源的用于流量控制
、熔断降级
的这些高并发下的一些控制。下面我们直接看使用的案例。
首先在项目中我们引入对应的依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.3</version>
</dependency>
一、流量控制
1、QPS方式
public class SimpleEntryMain {
private static String resourceName = "simpleEntry";
public static void main(String[] args) throws InterruptedException {
initFlowRule();
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(50);
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(resourceName);
rule1.setCount(2);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
static class RunTask implements Runnable {
@Override
public void run() {
Entry entry = null;
try {
entry = SphU.entry(resourceName);
doSomeThing();
} catch (BlockException e1) {
System.out.println("Block ----------");
} finally {
if (entry != null) {
entry.exit();
}
}
}
}
public static void doSomeThing(){
System.out.println("++++++++++++++++++");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// ignore
}
}
}
这个demo首先是通过initFlowRule()
方法设置了规则,然后我们通过多线程来模拟多个调用规则,方法的调用我们简单的模拟花费Thread.sleep(3000)
也就是3秒。然后线程中运行的就是Sentinel
的使用:
Entry entry = null;
try {
entry = SphU.entry(resourceName);
doSomeThing();
} catch (BlockException e1) {
System.out.println("Block ----------");
} finally {
if (entry != null) {
entry.exit();
}
}
这里就是try-catch-finally
的标准使用流程,其主要是通过SphU.entry(resourceName)
的调用在里面解析规则的配置校验,如果能通过的话就不会抛出异常,如果不能通过就抛出BlockException
异常。我们上面是直接使用,当我们在SpringBoot
中直接使用的话,其本身的封装可能使用动态代理或者AOP的方式来将这里的doSomeThing
与SphU.entry(resourceName)
的获取做解耦,我们加了注解后就可以直接专注于我们的doSomeThing()
的方法逻辑。
2、demo介绍
上面就是基于QPS
方式来限流,首先是我们设置对应的规则:
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(resourceName);
rule1.setCount(2);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
这里我们主要是设置了4个参数:
1、resource
这个是表示当前规则控制的是哪个资源。这个与SphU.entry(resourceName)
是对应的,当你在通过其看能不能获取规则的时候,其就是通过resourceName
来匹配你前面设置的规则的。
2、count
这个是规则的数量现在,例如你如果是基于QPS
的方法的话,这个count
就表示一秒内只能允许两个请求通过。
3、grade
这个是表示你选择的哪种流控规则。例如QPS
、或同时运行的线程数。
/**
* The threshold type of flow control (0: thread count, 1: QPS).
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
public final class RuleConstant {
public static final int FLOW_GRADE_THREAD = 0;
public static final int FLOW_GRADE_QPS = 1;
4、limitApp
这个是表示规则现在的哪个应用的资源,应该是用于规则只作用在对应的app
上面,不过你本身练习或单应用,并不需要主动来设置,因为其在new FlowRule()
的时候就自动设置了:
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public static final String LIMIT_APP_DEFAULT = "default";
3、线程启动之间Thread.sleep(50)的原因
上面的注意点主要有我们故意在开启10个线程的时候,在它们之间加了Thread.sleep(50)
,如果不加这个可能就没有柱塞的效果,例如如果没有加这个:
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
可以看到没有阻塞,如果我们加了:
++++++++++++++++++
++++++++++++++++++
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
当然如果有加Thread.sleep(50)
,打印也可能是其他的顺序,这个看CPU
的具体运行。
我们之所以要在线程启动之间加这个运行间隔,是因为Sentinel
本身的计算原理。一个是sentinel
的是否能通过判断
与对应的统计数量+1
中间并不是原子性的。Sentinel
本身的运行时一个链式执行结构,因为其本身不单只有流量控制规则、还是熔断降级等其他的规则,以及一些统计运算,这些都是封装在ProcessorSlot
接口中,然后通过设置next
就这样一个、一个的链上了。
1、判断与统计+1之间只有可见性,而没有原则性
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
例如我们的流控节点就是FlowSlot
在处理、熔断降级是DegradeSlot
处理的,然后我们是否能通过的数据就是StatisticSlot
在处理:
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
..........
}
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
可以看到我们的判断是在FlowSlot
中处理的,然后对于节点的计数是在StatisticSlot
中处理的,而这组件是没有同步控制的,
输入计数是使用了LongAdder
实现可见性。
public class MetricBucket {
private final LongAdder[] counters;
private LongAdder curThreadNum = new LongAdder();
所以在并发的时候,可能就是多个线程在判断的时候,其他线程也是判断,+1
操作可能还没有运行。
我们可以修改下线程启动的线程的方式:
public static void main(String[] args) throws InterruptedException {
// initFlowQpsRule();
initFlowRule();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
Thread.sleep(100);
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
}
我们将10个线程拆分为两组,在之间睡眠100ms
:
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
可以看到前面5个并没有是2+3
方式(两个通过、3个阻塞),但后面的5个都被限流了。基于这个分段、并且Sentinel
是滑动窗口的方式我们可以联想到另一个原因,就是窗口分段区间的问题。
4、默认的滑动窗口逻辑。
Sentinel
本身是使用的滑动窗口进行调用计数的,当在调用entry
方法获取资源的时候,其就会计数。这个计数就是通过滑动窗口处理的。其逻辑例如QPS
的话,其滑动的单位就是1000ms
,然后其内部又会将这个1000ms
划分为更多的段,然后再进行数值判断。
例如限流规则使用QPS规则的话,默认是将1秒分为两个区间,也就是每段500ms
也就是0.5秒。例如我们在获取数量判断的时候,如果是在B1
区间(也就是500-1000)的时候,其就会将B0-B1
两个空间的值加起来,而当我们在B3
的时候,Sentinel
就通过当前的时候就能判断这个时候B0
已经失效了,就会将B0
失效,生成新的数据端描叙B2
,判断数量的会就会是B1+B2
。
* B0 B1 B3
* ||_______|_______|_______||___
* 0 500 1000 1500 timestamp
* ^
* time=300
这里在我们开始的时候也就是从第0秒开始。然后我们再修改原来的线程启动间隔,并且我们这里将线程改为2+1+2
的数量(减少线程数量,是为了防止中间打印太多字符不好分析逻辑)。
public static void main(String[] args) throws InterruptedException {
initFlowRule();
for (int i = 0; i < 1; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
Thread.sleep(500);
for (int i = 0; i < 1; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
}
Thread.sleep(500);
for (int i = 0; i < 2; i++) {
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(50);
}
}
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block ----------
我们可以看到这个demo的逻辑,是想说明Sentinel
对滑动窗口桶的划分逻辑,Sentinel
默认是将1s
划分为两个桶,也就是每个桶为500ms
,所以我们上面的逻辑就说明了,在B0
获取一个了,然后B1
也能获取一个,但由于B2
是在500-1500
这个区间,由这两个桶计算所得,所以在1000-1500
这里就只能获取一个,阻塞一个。
public class StatisticNode implements Node {
/**
* Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
public static volatile int SAMPLE_COUNT = 2;
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
例如在StatisticNode
计算有多少通过的时候就是通过ArrayMetric
里面的桶通过的数量相加:
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
.........
@Override
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
这里的data.currentWindow();
就是计数当前是在哪个桶,如果还没有就创建,感兴趣可以自己具体去看下这个逻辑。然后data.values()
就是获取当前时间用于数量的桶。
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
这里就记录了通过、阻拦、异常这些信息。这些内容就能用于当前请求是否能通过的判断逻辑:
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
如果不能通过就抛出FlowException
,这个是BlockException
的子类。
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
关于BlockException
,我们可以看到其还有授权、降级等的子类。
5、流量控制的方式
Sentinel
流量控制的方式有三种直接拒绝
、Warm UP(也就是慢慢达到我们设置的阈值)
、匀速排队
,对应的就是我们控制行为的设置setControlBehavior
。前面默认的就是注解拒接,也就是首先直接到我们的阈值,如果超过就直接拒绝。
1、Warm UP
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
rule1.setWarmUpPeriodSec(3);
//rule1.setWarmUpPeriodSec(10);
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
public class WarmUpFlowDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
initFlowRule();
// trigger Sentinel internal init
Entry entry = null;
try {
entry = SphU.entry(KEY);
} catch (Exception e) {
} finally {
if (entry != null) {
entry.exit();
}
}
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
//first make the system run on a very low condition
for (int i = 0; i < 3; i++) {
Thread t = new Thread(new WarmUpTask());
t.setName("sentinel-warmup-task");
t.start();
}
Thread.sleep(2000);
for (int i = 0; i < threadCount; i++) {
Thread t = new Thread(new WarmUpTask());
t.setName("sentinel-run-task");
t.start();
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
rule1.setWarmUpPeriodSec(3);
//rule1.setWarmUpPeriodSec(10);
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
static class WarmUpTask implements Runnable {
@Override
public void run() {
while (!stop) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
// token acquired, means pass
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
Random random2 = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random2.nextInt(2000));
} catch (InterruptedException e) {
// ignore
}
}
}
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
}
}
}
这个是官方的demo,删了一点东西,看起来更清楚。这个TimerTask
相关的线程就是每隔一秒打印在这一秒钟请求的总数、通过的数量、阻断的数量。然后WarmUpTask
就通过while
循环一直获取资源。然后对应规则的参数,阈值是20
,我们预热的事件设置为3S(rule1.setWarmUpPeriodSec(3))
,这个值越快,就表示预热的事件越短。而我们启动线程的时候,最开始是3
个线程来获取,然后突然增加到100
个,我们看当我们rule1.setWarmUpPeriodSec(10)
时候:
begin to statistic!!!
1649141717082, total:6, pass:5, block:1
1649141718082, total:2, pass:2, block:0
1649141719083, total:2, pass:2, block:0
1649141720083, total:3, pass:3, block:0
1649141721083, total:23, pass:7, block:16
1649141722083, total:148, pass:6, block:142
1649141723084, total:109, pass:7, block:102
1649141724085, total:90, pass:7, block:83
1649141725085, total:105, pass:8, block:97
1649141726085, total:107, pass:8, block:99
1649141727085, total:99, pass:9, block:90
1649141728086, total:102, pass:9, block:93
1649141729086, total:108, pass:11, block:97
1649141730086, total:110, pass:12, block:98
1649141732086, total:103, pass:16, block:87
1649141733088, total:99, pass:19, block:80
可以看到最开始线程数量少,可以看到其没有什么block
。然后当线程多的时候,其就block
多,但pass
我们可以看到其是从7
渐渐增加到了20
左右。我们设置rule1.setWarmUpPeriodSec(3)
begin to statistic!!!
1649141873716, total:3, pass:3, block:0
1649141874716, total:3, pass:3, block:0
1649141875717, total:3, pass:3, block:0
1649141876717, total:4, pass:4, block:0
1649141877718, total:7, pass:7, block:0
1649141878718, total:167, pass:8, block:159
1649141879719, total:104, pass:11, block:93
1649141880719, total:105, pass:20, block:85
1649141881718, total:110, pass:20, block:90
1649141882720, total:106, pass:20, block:86
可以看到其pass
很快就增加到了20
。
4、匀速排队
这种方式是适用于:这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
注意:匀速排队模式暂时不支持 QPS > 1000 的场景。
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。
也就是这种方式其是不会拒绝请求的,其内部的实现是通过睡眠来处理,例如默认的DefaultController
其是计数直接返回false
。
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
而匀速排队的处理:
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
可以看到其if (count <= 0) {
只用于最前面的判断,后面的true
、false
并没有直接与其关联。
然后再运行过程中,可以看到其会计算等待时间,通过这个时间去sleep
,睡眠过了直接是直接返回true
,让这个请求继续跑。但这里也就是还有个补充逻辑,也就是你能设置最大的请求时间rule1.setMaxQueueingTimeMs(20 * 1000)
,超过的话也是直接返回false
来blocked
,waitTime > maxQueueingTimeMs
,例如规则:
private static void initPaceFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
rule1.setCount(count);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
/*
* CONTROL_BEHAVIOR_RATE_LIMITER means requests more than threshold will be queueing in the queue,
* until the queueing time is more than {@link FlowRule#maxQueueingTimeMs}, the requests will be rejected.
*/
rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
rule1.setMaxQueueingTimeMs(20 * 1000); //也就是最长等待20s
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
6、通过线程数来进行流量控制
这个本身的一些设置与QPS
没有太大的区别,了解了QPS
也比较容易了解这种方式,其的统计对象是运行的线程数量。这里直接贴一
个demo:
public class FlowThreadDemo {
private static int seconds = 60 + 40;
private static volatile int methodBRunningTime = 2000;
private CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) throws Exception {
initFlowRule();
for (int i = 0; i < 4; i++) {
Thread entryThread = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
while (true) {
Entry methodA = null;
try {
methodA = SphU.entry("methodA");
System.out.println(Thread.currentThread().getName() + "--------pass---"+System.currentTimeMillis());
Thread.sleep(2000);
} catch (BlockException e1) {
System.out.println(Thread.currentThread().getName() + "--------block");
} catch (Exception e2) {
// biz exception
} finally {
if (methodA != null) {
methodA.exit();
}
}
}
}
});
entryThread.setName("working thread1");
entryThread.start();
}
Thread.sleep(1500);
for (int i = 0; i < 4; i++) {
Thread entryThread = new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
while (true) {
Entry methodA = null;
try {
TimeUnit.MILLISECONDS.sleep(50);
methodA = SphU.entry("methodA");
System.out.println(Thread.currentThread().getName() + "--------pass---"+System.currentTimeMillis());
Thread.sleep(2000);
} catch (BlockException e1) {
System.out.println(Thread.currentThread().getName() + "--------block");
} catch (Exception e2) {
// biz exception
} finally {
if (methodA != null) {
methodA.exit();
}
}
}
}
});
entryThread.setName("working thread2");
entryThread.start();
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(4);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
}
这个demo的运行不像前面的QPS
。我们设置同时能运行的线程数是4
-rule1.setCount(4)
,然后在第一组启动4个线程与第二组启动4个线程中间睡眠了了1.5s
,也就是说第二组如果是QPS
的话,是能运行的:
working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread1--------pass---1649148661104
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
working thread2--------block
但目前我们可以,thread2
并没有机会运行,这是因为我们线程运行的事件模拟是2s
-Thread.sleep(2000);
,也就是说这个时候第一组还没有释放,释放是通过methodA.exit()
方法的调用,将当前运行的线程数减1(加一操作与QPS
一样,我们前面QPS
有提到,也就是StatisticSlot的entry
方法)。同样是在StatisticSlot
处理
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// Calculate response time (use completeStatTime as the time of completion).
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
// Record response time and success count.
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
...........
fireExit(context, resourceWrapper, count);
}
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
if (node == null) {
return;
}
node.addRtAndSuccess(rt, batchCount);
node.decreaseThreadNum();
if (error != null && !(error instanceof BlockException)) {
node.increaseExceptionQps(batchCount);
}
}
@Override
public void decreaseThreadNum() {
super.decreaseThreadNum();
this.clusterNode.decreaseThreadNum();
}