07、Ribbion负载均衡策略源码分析

前言

在之前Ribbon 负载均衡算法流程的文档中,我们分析到在获取服务时,Ribbion 会使用负载均衡规则类去计算出当前应该调用哪个服务。
 
Spring Cloud Open Feign系列【6】Ribbion配置类源码分析中,我们可以看到当前注入的规则是ZoneAvoidanceRule,使用的是轮询算法。
 

IRule 接口

IRule接口代表负载均衡算法策略,该类源码如下:

public interface IRule{

    // 根据特定的算法中从服务列表中选取一个要访问的在线服务
    // 为null时,代表当前服务不可用
    public Server choose(Object key);
    // 设置负载均衡器
    public void setLoadBalancer(ILoadBalancer lb);
    // 设置负载均衡器
    public ILoadBalancer getLoadBalancer();    
}

IRule接口有以下几个实现类:
 

RandomRule(随机)

随机算法调用了ThreadLocalRandom,统计服务副本个数,然后在这个范围中,返回一个随机数下标,再通过下标,返回一个可用的服务。

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    public Server choose(ILoadBalancer lb, Object key) {

        if (lb == null) {

            return null;
        }
        Server server = null;
        // 循环
        while (server == null) {

            // 当前线程被中断,直接返回null
            if (Thread.interrupted()) {

                return null;
            }
            // 获取所有有效的服务实例列表
            List<Server> upList = lb.getReachableServers();
            // 获取所有服务的实例列表
            List<Server> allList = lb.getAllServers();
            // 服务列表个数
            int serverCount = allList.size();
            if (serverCount == 0) {

                // 没有获取到服务列表则返回 null,结束循环
                return null;
            }
            // ThreadLocalRandom.current().nextInt(serverCount)
            // 使用线程安全ThreadLocalRandom,获取一个服务个数范围内的随机数
            int index = chooseRandomInt(serverCount);
            // 在可用服务列表中,获取随机数下标的服务
            server = upList.get(index);

            if (server == null) {

                /*
                 * 如果有服务列表,又没获取到,可能存在于服务瞬态改变,则让出当前线程资源,然后再继续循环获取
                 */
                Thread.yield();
                continue;
            }
            // 获取到服务,并且在线,则直接返回
            if (server.isAlive()) {

                return (server);
            }
            // 有服务,但是又没有获取到可用,则会让出资源,再循环去获取一次
            server = null;
            Thread.yield();
        }

        return server;

    }

    protected int chooseRandomInt(int serverCount) {

        return ThreadLocalRandom.current().nextInt(serverCount);
    }

RoundRobinRule(轮询)

轮询会依次执行,每个执行一次(默认)。

算法公式:

// 当前请求总数 + 1, 在用服务总数取模,得到下标
 int next = (current + 1) % modulo;

比如,当前有两个副本,第一次请求时,(0+1)%2 得到下标为1,存储上次请求的坐标为1,并获取下标为1的服务。第二次时,则得到下标为0,第三次时,又得到下标1,依次类推,每个都会循环执行。

    public Server choose(ILoadBalancer lb, Object key) {

        if (lb == null) {

            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        // 计数器,初始为0
        int count = 0;
        // 当 server 为null, 循环次数小于10次,则会进入循环
        while (server == null && count++ < 10) {

            // 所以及可用服务集合
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            // 在线服务数
            int upCount = reachableServers.size();
            // 所有服务数
            int serverCount = allServers.size();
            // 没有可用服务,直接返回null  结束循环
            if ((upCount == 0) || (serverCount == 0)) {

                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
            // 计算下标
            int nextServerIndex = incrementAndGetModulo(serverCount);
            // 根据下标获取服务
            server = allServers.get(nextServerIndex);
            // 没有获取到,继续循环
            if (server == null) {

                /* Transient. */
                Thread.yield();
                continue;
            }
            // 省略....
        return server;
    }
        // 轮询算法
        private int incrementAndGetModulo(int modulo) {

        for (;;) {

            // 获取当前客户端已经请求的总数, AtomicInteger 类型,
            int current = nextServerCyclicCounter.get();
            // 当前请求总数 + 1, 在用服务总数取模,得到下标
            int next = (current + 1) % modulo;
            // 
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

WeightedResponseTimeRule(响应时间权重轮询)

WeightedResponseTimeRule继承自RoundRobinRule,是对轮询算法进行的扩展,加入了权重计算。

可以为每个服务器分配动态权重,然后然后加权循环,权重高的则会优先执行。

该类在初始化(注入)的时候,会调用一个初始化方法,进行定时任务创建。

    void initialize(ILoadBalancer lb) {

        if (serverWeightTimer != null) {

            serverWeightTimer.cancel();
        }
        // 创建一个定时任务计划,NFLoadBalancer-serverWeightTimer-order-service
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        // 执行定时任务 DynamicServerWeightTask任务,0秒后执行任务run 方法,定时执行间隔时间,默认30秒
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
        // do a initial run
        ServerWeight sw = new ServerWeight();
        // 计算权重
        sw.maintainWeights();
        // 钩子 jvm关闭时,通知定时任务
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

            public void run() {

                logger
                        .info("Stopping NFLoadBalancer-serverWeightTimer-"
                                + name);
                serverWeightTimer.cancel();
            }
        }));
    }

定时任务,默认30S执行一次,调用的是ServerWeight.maintainWeights()方法。

    class DynamicServerWeightTask extends TimerTask {

        public void run() {

            ServerWeight serverWeight = new ServerWeight();
            try {

                serverWeight.maintainWeights();
            } catch (Exception e) {

                logger.error("Error running DynamicServerWeightTask for {}", name, e);
            }
        }
    }

maintainWeights方法核心代码如下

                // 负载均衡策略服务器状态总控
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {

                    // no statistics, nothing to do
                    return;
                }
                double totalResponseTime = 0;
                // 循环所有服务列表,将他们的所有平均响应时间加一起
                for (Server server : nlb.getAllServers()) {

                    // this will automatically load the stats if not in cache
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                Double weightSoFar = 0.0;
                // 循环所有服务列表,设置权重,
                // 用总的响应时间减去当前服务的平均响应时间,服务响应时间越大,那么其权重就会越小,
                // 比如 公共响应时间为 10 ,A服务为 2 ,B 服务为8 ,则A服务的权重为8.00 ,B为2.00
                List<Double> finalWeights = new ArrayList<Double>();
                for (Server server : nlb.getAllServers()) {

                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);

在choose 方法中,会根据权重去计算。

    @Override
    public Server choose(ILoadBalancer lb, Object key) {

        // 
        while (server == null) {

            // 获取服务的权重集合
            List<Double> currentWeights = accumulatedWeights;
            if (Thread.interrupted()) {

                return null;
            }
            // 获取服务列表
            List<Server> allList = lb.getAllServers();
            // 选出权重最大的坐标 
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
            // No server has been hit yet and total weight is not initialized
            // fallback to use round robin
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {

                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {

                    return server;
                }
            } else {

                // 返回一个大于或等于 0.0 且小于 1.0 的随机浮点数* 最大权重,
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // pick the server index based on the randomIndex
                int n = 0;
                // 循环权重列表,如果当前随机数,则就选择这个
                for (Double d : currentWeights) {

                    if (d >= randomWeight) {

                        serverIndex = n;
                        break;
                    } else {

                        n++;
                    }
                }
                // 省略....
        return server;
    }

总结

定时任务,去计算权重,平均响应时间越大,权重越小。在选择时,生成一个随机数,然后循环服务列表,权重大于这个随机数,则会被选择。

RetryRule(重试)

RetryRule 实际调用的也是轮询算法,只是在没有获取到服务时,会进行循环重试操作,超过设置的时间限制,则会退出,默认为500MS。

    public Server choose(ILoadBalancer lb, Object key) {

        // 当前请求时间
        long requestTime = System.currentTimeMillis();
        // 限制时间,默认当前请求时间+ 500MS
        long deadline = requestTime + maxRetryMillis;
        Server answer = null;
        // 调用轮询算法获取一个可用服务
        answer = subRule.choose(key);
        // 如果服务不信在、不是健康的,并且没有超过限制时间则进去以下方法
        if (((answer == null) || (!answer.isAlive()))
                && (System.currentTimeMillis() < deadline)) {

            // 开启定时任务,多少秒后打断线程
            InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());
            // 重新选择
            while (!Thread.interrupted()) {

                answer = subRule.choose(key);

                if (((answer == null) || (!answer.isAlive()))
                        && (System.currentTimeMillis() < deadline)) {

                    /* pause and retry hoping it's transient */
                    Thread.yield();
                } else {

                    break;
                }
            }

            task.cancel();
        }

        if ((answer == null) || (!answer.isAlive())) {

            return null;
        } else {

            return answer;
        }
    }

BestAvailableRule(最小并发)

BestAvailableRule继承了ClientConfigEnabledRoundRobinRule,会遍历所有的服务提供者,选择并发量最小的那个服务。

这个算法,可以将请求转发到压力最小的服务器,但是如果副本数太多,每次都要循环计算出最小并发,还是比较好资源的。

    @Override
    public Server choose(Object key) {

        if (loadBalancerStats == null) {

            return super.choose(key);
        }
        //  获取服务列表,
        List<Server> serverList = getLoadBalancer().getAllServers();
        // 
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        // 当前请求时间
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        // 遍历服务
        for (Server server: serverList) {

            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            // 判断断路器是否跳闸,如果没有跳闸,继续往下走
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {

                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                if (concurrentConnections < minimalConcurrentConnections) {

                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        // 如果没有找到并发量最小的服务节点,则使用父类的策略
        if (chosen == null) {

            return super.choose(key);
        } else {

            return chosen;
        }
    }

AvailabilityFilteringRule(可用断言过滤器)

AvailabilityFilteringRule 继承自PredicateBasedRulePredicateBasedRule和以上的规则有点区别,它将服务器筛选逻辑委托给{@link AbstractServerPredicate(断言器)}实例,过滤后,将以循环方式从过滤列表返回服务器。

AvailabilityFilteringRule 采用过滤器来进行服务选择,过滤掉那些一直连接失败的且被标记为circuit tripped的后端 Server,并过滤掉那些高并发的后端 Server 或者使用一个 AvailabilityPredicate 来包含过滤 Server 的逻辑。其实就是检查 Status 里记录的各个 Server 的运行状态。

    // 线性轮询先选一个,然后看是否符合条件,符合条件直接选择
    @Override
    public Server choose(Object key) {

    // 计数器
        int count = 0;
        // 轮询选择一个 
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {

            // 断言判断,符合则返回,不符合则重新选择
            if (predicate.apply(new PredicateKey(server))) {

                return server;
            }
            server = roundRobinRule.choose(key);
        }
        // 超过10次还选不到,就使用父类策略
        return super.choose(key);
    }

ZoneAvoidanceRule (可用性其性能断言)

使用ZoneAvoidancePredicateAvailabilityPredicate来判断是否选择某个 Server,前一个判断判定一个 Zone 的运行性能是否可用,剔除不可用的 Zone Server,AvailabilityPredicate 用于过滤掉连接数过多的 Server。

该规则会先过滤,然后使用轮询算法,选出可用的服务,默认使用的就是这个,也推荐使用这个。

负载均衡策略案例

使用自带的策略

添加策略配置的方法十分简单,只需要配置Bean 就可以了,这个策略是全局的,作用于所有客户端。

    @Bean
    public RandomRule getRule() {

        return new RandomRule();
    }

可以在配置文件中,为某个单独的客户端配置策略,如果配置了全局,单独配置的则不会生效,需要注意。

order-service:
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

自定义策略

实现IRule或者继承 AbstractLoadBalancerRule实现其中的chose方法即可。