04、Sentinel的基本使用-热点参数限流

一、热点参数限流介绍

这一篇我们主要来介绍下热点参数限流,这个介绍我们看下官方的描述:

何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:

商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。

Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。热点参数限流支持集群模式。

这里我们简单理解下,就是对资源上面的参数进行流量控制,例如我们前面基于QPS进行限流时使用的获取Entry方法:

SphU.entry(KEY);
public static Entry entry(String name) throws BlockException {

    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}

可以看到我们只需要传入资源名称就是可以,但我们的热点参数就多了一个概念,就是参数,其的获取是资源上面对应的参数

SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
    throws BlockException {

    return Env.sph.entry(name, trafficType, batchCount, args);
}

我们主要是两个传入资源以及对应的参数,上面其他的两个参数:EntryType表示类型是入口流量还是出口流量,另一个batchCount表示本次获取的资源数量,例如如果我们设置的count是4,如果batchCount是2,获取后count就还只有2。

这个简单理解SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex])就是:资源就是获取到流量后需要调用的方法,而参数就是这个方法需要的入参。

二、demo介绍

我们看下规则配置。

1、参数

热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):

属性 说明 默认值
resource 资源名,必填
count 限流阈值,必填
grade 限流模式 QPS 模式
durationInSec 统计窗口时间长度(单位为秒),1.6.0 版本开始支持 1s
controlBehavior 流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持 快速失败
maxQueueingTimeMs 最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持 0ms
paramIdx 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
paramFlowItemList 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型
clusterMode 是否是集群参数流控规则 false
clusterConfig 集群流控相关配置
public static void initRule(){

        ParamFlowRule paramFlowRule = new ParamFlowRule();
        paramFlowRule.setResource(Resource_Key);
        paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        paramFlowRule.setCount(3);
        //允许的最大突发请求
//        paramFlowRule.setBurstCount(10);
        paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        //统计的周期
        paramFlowRule.setDurationInSec(1);
        paramFlowRule.setParamIdx(0);
        List<ParamFlowRule> paramFlowRules = new ArrayList<>();
        paramFlowRules.add(paramFlowRule);
        ParamFlowRuleManager.loadRules(paramFlowRules);
    }

这里我们可以看到其的参数设置与流量控制是差不多的。但这里还多了一个paramFlowRule.setParamIdx(0),这个就是参数的位置,你这条规则是控制方法上面的哪个参数,然后paramFlowRule.setBurstCount(10)是应对突发流量的(使用的是令牌桶算法,允许突发流量获取到令牌,这个就是设置最大能允许获取到的令牌树),例如如果你一般允许的Count是3,但如果流量突然增加到16,这个时候能允许超出限制给出10个令牌,但第16就获取不到。

这里我们可以理解资源为一个方法,其有一个参数:

public void doSomething(String name)

我们规则控制的就是第一个参数。

2、运行结果解释

然后我们来看下运行方法:

public class ParamFlowMain {

    public static String Resource_Key = "paramResource";

    //public static String[] ParamArray = new String[]{"A","B","C"};

    public static String ItemParamKey = ParamArray[1];

    public static void main(String[] args) {

        initRule();
        for (int i = 0; i < 12; i++) {

            Entry entry = null;
            int arrayIndex = i%3;
            try {

                entry = SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
                System.out.println("param " + ParamArray[arrayIndex] + " ---------doSomething");
            } catch (BlockException blockException) {

                System.out.println("param " + ParamArray[arrayIndex] + " ---------Blocked");
            } finally {

                if (Objects.nonNull(entry)) {

                    entry.exit();
                }
            }
        }
    }

    public static void initRule(){

       ............
    }

}

这里我们的一些方法,ParamArray数组可以理解为传给doSomething(String name)方法的入参name的3个不同的值。

param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------Blocked
param C ---------Blocked

我们可以看到3个参数每种只能通过3次。同时这里又引申到另一个问题,我如果想设置参数的某个值为另外的count呢,这个时候我们需要增加另一部分逻辑:

public static void initRule(){

        ParamFlowRule paramFlowRule = new ParamFlowRule();
        paramFlowRule.setResource(Resource_Key);
        paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        paramFlowRule.setCount(3);
        //允许的最大突发请求
//        paramFlowRule.setBurstCount(10);
        paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        //统计的周期
        paramFlowRule.setDurationInSec(1);

        paramFlowRule.setParamIdx(0);

        //例外的参数配置
        ParamFlowItem paramFlowItem = new ParamFlowItem();
        paramFlowItem.setCount(4);
        //参数类型
        paramFlowItem.setClassType(String.class.getTypeName());
        //具体的参数值
        paramFlowItem.setObject(ItemParamKey);
        List<ParamFlowItem> paramFlowItemList = new ArrayList<>();
        paramFlowItemList.add(paramFlowItem);
        paramFlowRule.setParamFlowItemList(paramFlowItemList);

        List<ParamFlowRule> paramFlowRules = new ArrayList<>();
        paramFlowRules.add(paramFlowRule);
        ParamFlowRuleManager.loadRules(paramFlowRules);
    }

也就是在ParamFlowRule增加ParamFlowItem,用它来控制特殊的值的控制。这里我们将ParamArray[1]也就是B控制的流量为4:

param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------doSomething
param C ---------Blocked

我们可以看到这次param B第4次是能通过的。

三、源码简单梳理

下面我们来简单梳理下源码里面的处理:

1、ParamFlowSlot

处理热点参数的SlotParamFlowSlot

public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {

        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {

            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        }

        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

其首先是判断当前资源有没有设置对应的规则,如果没有就直接放行运行fireEntry,而不会checkFlow

public static boolean hasRules(String resourceName) {

    List<ParamFlowRule> rules = PARAM_FLOW_RULES.get(resourceName);
    return rules != null && !rules.isEmpty();
}

&nbsp;

这个目前就有我们设置的ParamFlowRule,然后就会check

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {

    if (args == null) {

        return;
    }
    if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {

        return;
    }
    List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

    for (ParamFlowRule rule : rules) {

        applyRealParamIdx(rule, args.length);

        // Initialize the parameter metrics.
        ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

        if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {

            String triggeredParam = "";
            if (args.length > rule.getParamIdx()) {

                Object value = args[rule.getParamIdx()];
                triggeredParam = String.valueOf(value);
            }
            throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
        }
    }
}

主要就是对ParamFlowRule的判断,不通过就抛出ParamFlowException(BlockException的子类)

2、ParamFlowChecker

public final class ParamFlowChecker {

    public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                             Object... args) {

        if (args == null) {

            return true;
        }
        int paramIdx = rule.getParamIdx();
        if (args.length <= paramIdx) {

            return true;
        }

        // Get parameter value.
        Object value = args[paramIdx];
        ..........

        return passLocalCheck(resourceWrapper, rule, count, value);
    }
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                    Object value) {

    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {

        if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {

            return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
        } else {

            return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
        }
    } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {

        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
        if (exclusionItems.contains(value)) {

            int itemThreshold = rule.getParsedHotItems().get(value);
            return ++threadCount <= itemThreshold;
        }
        long threshold = (long)rule.getCount();
        return ++threadCount <= threshold;
    }

    return true;
}

再之后这里的判断就会看你是哪种控制行为,进行不通的判断,我们当前是走的passDefaultLocalCheck

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                     Object value) {

    ParameterMetric metric = getParameterMetric(resourceWrapper);
    CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
    CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

    if (tokenCounters == null || timeCounters == null) {

        return true;
    }

    // Calculate max token count (threshold)
    Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
    long tokenCount = (long)rule.getCount();
    if (exclusionItems.contains(value)) {

        tokenCount = rule.getParsedHotItems().get(value);
    }

    if (tokenCount == 0) {

        return false;
    }

    long maxCount = tokenCount + rule.getBurstCount();
    if (acquireCount > maxCount) {

        return false;
    }

    while (true) {

        ......  
    }
}

&nbsp;

这里我们注意两个计算tokenCounterstimeCounterstokenCounters记录是参数当前还能获取到的令牌树,timeCounters是记录时间段的开始时间。然后可以看到long maxCount = tokenCount + rule.getBurstCount();,最大许可数是使用到了BurstCount

然后下面就是具体的计算统计。这里我们简单梳理其中一个情况吧:

while (true) {

    long currentTime = TimeUtil.currentTimeMillis();

    AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
    if (lastAddTokenTime == null) {

                // Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
        tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
        return true;
      }
    long passTime = currentTime - lastAddTokenTime.get();
    // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
    if (passTime > rule.getDurationInSec() * 1000) {

        AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
        if (oldQps == null) {

            // Might not be accurate here.
            lastAddTokenTime.set(currentTime);
            return true;
        } else {

            long restQps = oldQps.get();
            long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
            long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                : (restQps + toAddCount - acquireCount);

            if (newQps < 0) {

                return false;
            }
            if (oldQps.compareAndSet(restQps, newQps)) {

                lastAddTokenTime.set(currentTime);
                return true;
            }
            Thread.yield();
        }
    } else {

        AtomicLong oldQps = tokenCounters.get(value);
        if (oldQps != null) {

            long oldQpsValue = oldQps.get();
            if (oldQpsValue - acquireCount >= 0) {

                if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {

                    return true;
                }
            } else {

                return false;
            }
        }
        Thread.yield();
    }
}

这个首先就是通过timeCounters.putIfAbsent(value, new AtomicLong(currentTime))(注意这里是putIfAbsent),来获取当前统计的时间段的开始时间,再之后是if (lastAddTokenTime == null),如果当前时间段还没有计算就初始化令牌计算。

if (lastAddTokenTime == null) {

    // Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
    tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
    return true;
}

然后再通过long passTime = currentTime - lastAddTokenTime.get()来计算已经过去了多长时间。

然后再通过passTime > rule.getDurationInSec() * 1000判断这个时间是否还在这个窗口,如果还在就是oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)也就是用原来的窗口处理:

AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {

    long oldQpsValue = oldQps.get();
    if (oldQpsValue - acquireCount >= 0) {

        if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {

            return true;
        }
    } else {

        return false;
    }
}
Thread.yield();

如果已经过了这个时间窗口:

AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {

    // Might not be accurate here.
    lastAddTokenTime.set(currentTime);
    return true;
} else {

    long restQps = oldQps.get();
    long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
    long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
        : (restQps + toAddCount - acquireCount);

    if (newQps < 0) {

        return false;
    }
    if (oldQps.compareAndSet(restQps, newQps)) {

        lastAddTokenTime.set(currentTime);
        return true;
    }
    Thread.yield();
}

这里主要会计算关系原来的令牌计算。

首先是toAddCount:也就是在过去的这段时间按count计算long tokenCount = (long)rule.getCount();(不是与rule.getBurstCount()叠加)已经产生了多少个令牌。

long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);

再进行新的可用的token计算,由于我们有允许的最大的maxCount限制,所以当toAddCount + restQps > maxCount,就还是使用maxCount计算。如果没有的话,就使用toAddCount计算。