一、热点参数限流介绍
这一篇我们主要来介绍下热点参数限流,这个介绍我们看下官方的描述:
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。热点参数限流支持集群模式。
这里我们简单理解下,就是对资源上面的参数进行流量控制,例如我们前面基于QPS
进行限流时使用的获取Entry
方法:
SphU.entry(KEY);
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
可以看到我们只需要传入资源名称就是可以,但我们的热点参数就多了一个概念,就是参数,其的获取是资源
上面对应的参数
:
SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
throws BlockException {
return Env.sph.entry(name, trafficType, batchCount, args);
}
我们主要是两个传入资源以及对应的参数,上面其他的两个参数:EntryType
表示类型是入口流量还是出口流量,另一个batchCount
表示本次获取的资源数量,例如如果我们设置的count
是4,如果batchCount
是2,获取后count
就还只有2。
这个简单理解SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex])
就是:资源就是获取到流量后需要调用的方法,而参数就是这个方法需要的入参。
二、demo介绍
我们看下规则配置。
1、参数
热点参数规则(ParamFlowRule
)类似于流量控制规则(FlowRule
):
属性 | 说明 | 默认值 |
---|---|---|
resource | 资源名,必填 | |
count | 限流阈值,必填 | |
grade | 限流模式 | QPS 模式 |
durationInSec | 统计窗口时间长度(单位为秒),1.6.0 版本开始支持 | 1s |
controlBehavior | 流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持 | 快速失败 |
maxQueueingTimeMs | 最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持 | 0ms |
paramIdx | 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置 |
|
paramFlowItemList | 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型 |
|
clusterMode | 是否是集群参数流控规则 | false |
clusterConfig | 集群流控相关配置 |
public static void initRule(){
ParamFlowRule paramFlowRule = new ParamFlowRule();
paramFlowRule.setResource(Resource_Key);
paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
paramFlowRule.setCount(3);
//允许的最大突发请求
// paramFlowRule.setBurstCount(10);
paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
//统计的周期
paramFlowRule.setDurationInSec(1);
paramFlowRule.setParamIdx(0);
List<ParamFlowRule> paramFlowRules = new ArrayList<>();
paramFlowRules.add(paramFlowRule);
ParamFlowRuleManager.loadRules(paramFlowRules);
}
这里我们可以看到其的参数设置与流量控制是差不多的。但这里还多了一个paramFlowRule.setParamIdx(0)
,这个就是参数的位置,你这条规则是控制方法上面的哪个参数,然后paramFlowRule.setBurstCount(10)
是应对突发流量的(使用的是令牌桶算法,允许突发流量获取到令牌,这个就是设置最大能允许获取到的令牌树),例如如果你一般允许的Count
是3,但如果流量突然增加到16,这个时候能允许超出限制给出10个令牌,但第16就获取不到。
这里我们可以理解资源为一个方法,其有一个参数:
public void doSomething(String name)
我们规则控制的就是第一个参数。
2、运行结果解释
然后我们来看下运行方法:
public class ParamFlowMain {
public static String Resource_Key = "paramResource";
//public static String[] ParamArray = new String[]{"A","B","C"};
public static String ItemParamKey = ParamArray[1];
public static void main(String[] args) {
initRule();
for (int i = 0; i < 12; i++) {
Entry entry = null;
int arrayIndex = i%3;
try {
entry = SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
System.out.println("param " + ParamArray[arrayIndex] + " ---------doSomething");
} catch (BlockException blockException) {
System.out.println("param " + ParamArray[arrayIndex] + " ---------Blocked");
} finally {
if (Objects.nonNull(entry)) {
entry.exit();
}
}
}
}
public static void initRule(){
............
}
}
这里我们的一些方法,ParamArray
数组可以理解为传给doSomething(String name)
方法的入参name
的3个不同的值。
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------Blocked
param C ---------Blocked
我们可以看到3个参数每种只能通过3
次。同时这里又引申到另一个问题,我如果想设置参数的某个值为另外的count
呢,这个时候我们需要增加另一部分逻辑:
public static void initRule(){
ParamFlowRule paramFlowRule = new ParamFlowRule();
paramFlowRule.setResource(Resource_Key);
paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
paramFlowRule.setCount(3);
//允许的最大突发请求
// paramFlowRule.setBurstCount(10);
paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
//统计的周期
paramFlowRule.setDurationInSec(1);
paramFlowRule.setParamIdx(0);
//例外的参数配置
ParamFlowItem paramFlowItem = new ParamFlowItem();
paramFlowItem.setCount(4);
//参数类型
paramFlowItem.setClassType(String.class.getTypeName());
//具体的参数值
paramFlowItem.setObject(ItemParamKey);
List<ParamFlowItem> paramFlowItemList = new ArrayList<>();
paramFlowItemList.add(paramFlowItem);
paramFlowRule.setParamFlowItemList(paramFlowItemList);
List<ParamFlowRule> paramFlowRules = new ArrayList<>();
paramFlowRules.add(paramFlowRule);
ParamFlowRuleManager.loadRules(paramFlowRules);
}
也就是在ParamFlowRule
增加ParamFlowItem
,用它来控制特殊的值的控制。这里我们将ParamArray[1]
也就是B
控制的流量为4:
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------doSomething
param C ---------Blocked
我们可以看到这次param B
第4次是能通过的。
三、源码简单梳理
下面我们来简单梳理下源码里面的处理:
1、ParamFlowSlot
处理热点参数的Slot
是ParamFlowSlot
:
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
其首先是判断当前资源有没有设置对应的规则,如果没有就直接放行运行fireEntry
,而不会checkFlow
。
public static boolean hasRules(String resourceName) {
List<ParamFlowRule> rules = PARAM_FLOW_RULES.get(resourceName);
return rules != null && !rules.isEmpty();
}
这个目前就有我们设置的ParamFlowRule
,然后就会check
:
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
主要就是对ParamFlowRule
的判断,不通过就抛出ParamFlowException
(BlockException
的子类)
2、ParamFlowChecker
public final class ParamFlowChecker {
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
Object value = args[paramIdx];
..........
return passLocalCheck(resourceWrapper, rule, count, value);
}
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
再之后这里的判断就会看你是哪种控制行为,进行不通的判断,我们当前是走的passDefaultLocalCheck
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
......
}
}
这里我们注意两个计算tokenCounters
、timeCounters
,tokenCounters
记录是参数当前还能获取到的令牌树,timeCounters
是记录时间段的开始时间。然后可以看到long maxCount = tokenCount + rule.getBurstCount();
,最大许可数是使用到了BurstCount
。
然后下面就是具体的计算统计。这里我们简单梳理其中一个情况吧:
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
这个首先就是通过timeCounters.putIfAbsent(value, new AtomicLong(currentTime))
(注意这里是putIfAbsent
),来获取当前统计的时间段的开始时间,再之后是if (lastAddTokenTime == null)
,如果当前时间段还没有计算就初始化令牌计算。
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
然后再通过long passTime = currentTime - lastAddTokenTime.get()
来计算已经过去了多长时间。
然后再通过passTime > rule.getDurationInSec() * 1000
判断这个时间是否还在这个窗口,如果还在就是oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)
也就是用原来的窗口处理:
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
如果已经过了这个时间窗口:
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
这里主要会计算关系原来的令牌计算。
首先是toAddCount
:也就是在过去的这段时间按count
计算long tokenCount = (long)rule.getCount();
(不是与rule.getBurstCount()
叠加)已经产生了多少个令牌。
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
再进行新的可用的token
计算,由于我们有允许的最大的maxCount
限制,所以当toAddCount + restQps
> maxCount
,就还是使用maxCount
计算。如果没有的话,就使用toAddCount
计算。