这一篇我们主要来分析下Sentinel
的熔断降级,这个相关的官方文档。
一、demo使用
public class DegradeFlowDemo {
private static final String resourceKey = "degradeResource";
private static int index = 0;
public static void main(String[] args) throws InterruptedException {
initRule();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 12; i++) {
new Thread(() -> {
Entry entry = null;
try {
entry = SphU.entry(resourceKey);
doSomething();
} catch (BlockException blockException) {
System.out.println("---------Blocked");
} catch (Exception e) {
System.out.println("---------Exception");
Tracer.trace(e);
} finally {
if (Objects.nonNull(entry)) {
entry.exit();
}
}
}).start();
Thread.sleep(50);
}
System.out.println("End cost time - " + (System.currentTimeMillis() - startTime));
}
private static void initRule(){
List<DegradeRule> degradeRuleList = new ArrayList<>();
DegradeRule degradeRule = new DegradeRule(resourceKey);
degradeRule.setTimeWindow(500);
degradeRule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);
degradeRule.setCount(3);
degradeRule.setStatIntervalMs(1000);
degradeRule.setMinRequestAmount(10);
degradeRuleList.add(degradeRule);
DegradeRuleManager.loadRules(degradeRuleList);
}
private static void doSomething(){
if (index++%2 == 0) {
throw new NullPointerException();
}
System.out.println("----------doSomething");
}
}
这个demo我们就不像上篇那样详细了,我们看下运行结果:
----------doSomething
---------Exception
---------Exception
----------doSomething
---------Exception
----------doSomething
---------Exception
----------doSomething
---------Exception
----------doSomething
---------Blocked
---------Blocked
End cost time - 762
Process finished with exit code 0
这个demo就是我们开启12个线程运行,然后将熔断规则设置为异常错误数,也就是有多少错误请求的时候就返回,同时我们设置了最小的请求数,也就是在StatIntervalMs
时间短内必须要达到MinRequestAmount
请求的数量,才会熔断,不然即使有count
个异常树也不会熔断。例如demo
前10个其的异常数量是超过了3个,但并没有熔断,而是从第11个开始熔断的。
二、demo分析
1、参数
二、demo分析
1、参数
1)、setTimeWindow
这个主要是用来设置从熔断到下次放开一个探测请求的时间,也就是半开放状态。
public interface CircuitBreaker {
.........
/**
* Circuit breaker state.
*/
enum State {
OPEN,
HALF_OPEN,
CLOSED
}
}
这个是熔断器的3种状态,OPEN
表示目前熔断器是打开的,所有请求都不能通过,HALF_OPEN
表示半开状态,也就是会放开请求去运行看这个请求是否正常,如果正常交可以设置为CLOSED
,也就是关闭熔断器,让请求能通过,如果半开状态的请求不能正常访问,就将状态从HALF_OPEN
变为OPEN
,然后需要等待下一个窗口时间-TimeWindow
。
2)、setGrade
这个就是设置对应的熔断策略的,其主要有3中,
public static final int DEGRADE_GRADE_EXCEPTION_COUNT = 2;
public static final int DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT = 5;
public static final int DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT = 5;
- 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
- 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。
- 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。
这里我们直接贴官方的说明,这个介绍的还是
3)、degradeRule.setStatIntervalMs
然后degradeRule.setStatIntervalMs(1000)
设置的就是统计的窗口时间这个时间段多长,例如QPS
默认的就是1000ms
,也就是这些数据的统计是统计哪个时间段,也就是我们上篇梳理的LeapArray
public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}
我们上面的就是ExceptionCircuitBreaker
就是对应的DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT
策略对应的熔断器逻辑处理类。可以看到其的SimpleErrorCounterLeapArray
传入的是1,也就是在getStatIntervalMs
事件单位内就用一个桶来计算统计,我们上篇介绍的QPS
其默认的是2
、1000
,也就是将1000
分为两个桶来计算。
4)、setMinRequestAmount
最小请求数,在上面demo介绍有说明其的作用了。
三、逻辑源码分析。
1、DegradeSlot
与熔断降级相关的类是DegradeSlot
。
1)、entry方法
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
可以看到其是先通过performChecking
判断是否能通过,能通过再fireEntry
继续调用链路的执行。
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
其判断就是获取规则的CircuitBreaker
,也就是我们前面介绍的3种策略:
public ResponseTimeCircuitBreaker(DegradeRule rule) {
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}
我们可以看到ResponseTimeCircuitBreaker
也是一个桶来计算。
public final class DegradeRuleManager {
...........
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
我们看到DEGRADE_GRADE_EXCEPTION_RATIO
、DEGRADE_GRADE_EXCEPTION_COUNT
这两种都是用的同一个类。
也就是DegradeSlot
通过CircuitBreaker
去判断是否能通过,不能通过的话,就抛出DegradeException
(其是BlockException
的子类),
2)、exit方法
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
这个就是离开的,如果curEntry.getBlockError() == null
,也就是并没有阻塞熔断,其用CircuitBreaker
再来处理计算逻辑。
这里对应的官网的一句话:
注意异常降级仅针对业务异常,对 Sentinel 限流降级本身的异常(BlockException)不生效。为了统计异常比例或异常数,需要通过 Tracer.trace(ex) 记录业务异常
也就是BlockException
并不算一个异常,其计算的是你的业务异常。
3)、ExceptionCircuitBreaker
1)、exit相关的逻辑调用
我们以ExceptionCircuitBreaker
来说明下CircuitBreaker
一般的逻辑处理。剩下直接是上面的exit
方法里面的CircuitBreaker
调用:
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
exit
的逻辑主要是这两个方法,一个是对调用次数,异常次数计数。handleStateChangeWhenThresholdExceeded
就计算熔断器的状态,也就是通过计算对应的值,是熔断器状态在OPEN
、CLOSE
、HALF_OPEN
切换。例如:
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
如果当前是HALF_OPEN
,当这个请求没有error
,就将状态由HALF_OPEN
变为CLOSE
,不然就将HALF_OPEN
继续变为OPEN
,不允许通过请求。还有:
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
也就是要达到的最小请求数,才会去下面计算处理熔断器的打开,例如curCount > threshold
,请求数已经超过了我们的阈值,才下面去进行状态判断。
protected boolean fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
notifyObservers(State.HALF_OPEN, State.CLOSED, null);
return true;
}
return false;
}
protected boolean fromHalfOpenToOpen(double snapshotValue) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
updateNextRetryTimestamp();
notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
return true;
}
return false;
}
当然这里面还有取代的观察者模式相关逻辑,就不分析了。
四、规则设置的参数
这些参数的设置,我们再直接贴官网的吧。
Field | 说明 | 默认值 |
---|---|---|
resource | 资源名,即规则的作用对象 | |
grade | 熔断策略,支持慢调用比例/异常比例/异常数策略 | 慢调用比例 |
count | 慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值 | |
timeWindow | 熔断时长,单位为 s | |
minRequestAmount | 熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入) | 5 |
statIntervalMs | 统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入) | 1000 ms |
slowRatioThreshold | 慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入) |
由于我们上面的demo是异常数量处理,所以是没有设置slowRatioThreshold
,也就是慢遍历阈值的。