前言
在之前Ribbon 负载均衡算法流程的文档中,我们分析到在获取服务时,Ribbion 会使用负载均衡规则类去计算出当前应该调用哪个服务。
在Spring Cloud Open Feign系列【6】Ribbion配置类源码分析中,我们可以看到当前注入的规则是ZoneAvoidanceRule
,使用的是轮询算法。
IRule 接口
IRule接口代表负载均衡算法策略,该类源码如下:
public interface IRule{
// 根据特定的算法中从服务列表中选取一个要访问的在线服务
// 为null时,代表当前服务不可用
public Server choose(Object key);
// 设置负载均衡器
public void setLoadBalancer(ILoadBalancer lb);
// 设置负载均衡器
public ILoadBalancer getLoadBalancer();
}
IRule接口有以下几个实现类:
RandomRule(随机)
随机算法调用了ThreadLocalRandom
,统计服务副本个数,然后在这个范围中,返回一个随机数下标,再通过下标,返回一个可用的服务。
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
// 循环
while (server == null) {
// 当前线程被中断,直接返回null
if (Thread.interrupted()) {
return null;
}
// 获取所有有效的服务实例列表
List<Server> upList = lb.getReachableServers();
// 获取所有服务的实例列表
List<Server> allList = lb.getAllServers();
// 服务列表个数
int serverCount = allList.size();
if (serverCount == 0) {
// 没有获取到服务列表则返回 null,结束循环
return null;
}
// ThreadLocalRandom.current().nextInt(serverCount)
// 使用线程安全ThreadLocalRandom,获取一个服务个数范围内的随机数
int index = chooseRandomInt(serverCount);
// 在可用服务列表中,获取随机数下标的服务
server = upList.get(index);
if (server == null) {
/*
* 如果有服务列表,又没获取到,可能存在于服务瞬态改变,则让出当前线程资源,然后再继续循环获取
*/
Thread.yield();
continue;
}
// 获取到服务,并且在线,则直接返回
if (server.isAlive()) {
return (server);
}
// 有服务,但是又没有获取到可用,则会让出资源,再循环去获取一次
server = null;
Thread.yield();
}
return server;
}
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
RoundRobinRule(轮询)
轮询会依次执行,每个执行一次(默认)。
算法公式:
// 当前请求总数 + 1, 在用服务总数取模,得到下标
int next = (current + 1) % modulo;
比如,当前有两个副本,第一次请求时,(0+1)%2 得到下标为1,存储上次请求的坐标为1,并获取下标为1的服务。第二次时,则得到下标为0,第三次时,又得到下标1,依次类推,每个都会循环执行。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
// 计数器,初始为0
int count = 0;
// 当 server 为null, 循环次数小于10次,则会进入循环
while (server == null && count++ < 10) {
// 所以及可用服务集合
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
// 在线服务数
int upCount = reachableServers.size();
// 所有服务数
int serverCount = allServers.size();
// 没有可用服务,直接返回null 结束循环
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 计算下标
int nextServerIndex = incrementAndGetModulo(serverCount);
// 根据下标获取服务
server = allServers.get(nextServerIndex);
// 没有获取到,继续循环
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
// 省略....
return server;
}
// 轮询算法
private int incrementAndGetModulo(int modulo) {
for (;;) {
// 获取当前客户端已经请求的总数, AtomicInteger 类型,
int current = nextServerCyclicCounter.get();
// 当前请求总数 + 1, 在用服务总数取模,得到下标
int next = (current + 1) % modulo;
//
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
WeightedResponseTimeRule(响应时间权重轮询)
WeightedResponseTimeRule
继承自RoundRobinRule,是对轮询算法进行的扩展,加入了权重计算。
可以为每个服务器分配动态权重,然后然后加权循环,权重高的则会优先执行。
该类在初始化(注入)的时候,会调用一个初始化方法,进行定时任务创建。
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
// 创建一个定时任务计划,NFLoadBalancer-serverWeightTimer-order-service
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
// 执行定时任务 DynamicServerWeightTask任务,0秒后执行任务run 方法,定时执行间隔时间,默认30秒
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
// 计算权重
sw.maintainWeights();
// 钩子 jvm关闭时,通知定时任务
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
定时任务,默认30S执行一次,调用的是ServerWeight.maintainWeights()方法。
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
maintainWeights
方法核心代码如下
// 负载均衡策略服务器状态总控
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// 循环所有服务列表,将他们的所有平均响应时间加一起
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
Double weightSoFar = 0.0;
// 循环所有服务列表,设置权重,
// 用总的响应时间减去当前服务的平均响应时间,服务响应时间越大,那么其权重就会越小,
// 比如 公共响应时间为 10 ,A服务为 2 ,B 服务为8 ,则A服务的权重为8.00 ,B为2.00
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
在choose 方法中,会根据权重去计算。
@Override
public Server choose(ILoadBalancer lb, Object key) {
//
while (server == null) {
// 获取服务的权重集合
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
// 获取服务列表
List<Server> allList = lb.getAllServers();
// 选出权重最大的坐标
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 返回一个大于或等于 0.0 且小于 1.0 的随机浮点数* 最大权重,
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
// 循环权重列表,如果当前随机数,则就选择这个
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
// 省略....
return server;
}
总结:
定时任务,去计算权重,平均响应时间越大,权重越小。在选择时,生成一个随机数,然后循环服务列表,权重大于这个随机数,则会被选择。
RetryRule(重试)
RetryRule
实际调用的也是轮询算法,只是在没有获取到服务时,会进行循环重试操作,超过设置的时间限制,则会退出,默认为500MS。
public Server choose(ILoadBalancer lb, Object key) {
// 当前请求时间
long requestTime = System.currentTimeMillis();
// 限制时间,默认当前请求时间+ 500MS
long deadline = requestTime + maxRetryMillis;
Server answer = null;
// 调用轮询算法获取一个可用服务
answer = subRule.choose(key);
// 如果服务不信在、不是健康的,并且没有超过限制时间则进去以下方法
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
// 开启定时任务,多少秒后打断线程
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
// 重新选择
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
BestAvailableRule(最小并发)
BestAvailableRule
继承了ClientConfigEnabledRoundRobinRule
,会遍历所有的服务提供者,选择并发量最小的那个服务。
这个算法,可以将请求转发到压力最小的服务器,但是如果副本数太多,每次都要循环计算出最小并发,还是比较好资源的。
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
// 获取服务列表,
List<Server> serverList = getLoadBalancer().getAllServers();
//
int minimalConcurrentConnections = Integer.MAX_VALUE;
// 当前请求时间
long currentTime = System.currentTimeMillis();
Server chosen = null;
// 遍历服务
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
// 判断断路器是否跳闸,如果没有跳闸,继续往下走
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
// 如果没有找到并发量最小的服务节点,则使用父类的策略
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
AvailabilityFilteringRule(可用断言过滤器)
AvailabilityFilteringRule
继承自PredicateBasedRule
,PredicateBasedRule
和以上的规则有点区别,它将服务器筛选逻辑委托给{@link AbstractServerPredicate(断言器)}实例,过滤后,将以循环方式从过滤列表返回服务器。
AvailabilityFilteringRule
采用过滤器来进行服务选择,过滤掉那些一直连接失败的且被标记为circuit tripped
的后端 Server,并过滤掉那些高并发的后端 Server 或者使用一个 AvailabilityPredicate 来包含过滤 Server 的逻辑。其实就是检查 Status 里记录的各个 Server 的运行状态。
// 线性轮询先选一个,然后看是否符合条件,符合条件直接选择
@Override
public Server choose(Object key) {
// 计数器
int count = 0;
// 轮询选择一个
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
// 断言判断,符合则返回,不符合则重新选择
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
// 超过10次还选不到,就使用父类策略
return super.choose(key);
}
ZoneAvoidanceRule (可用性其性能断言)
使用ZoneAvoidancePredicate
和 AvailabilityPredicate
来判断是否选择某个 Server,前一个判断判定一个 Zone 的运行性能是否可用,剔除不可用的 Zone Server,AvailabilityPredicate 用于过滤掉连接数过多的 Server。
该规则会先过滤,然后使用轮询算法,选出可用的服务,默认使用的就是这个,也推荐使用这个。
负载均衡策略案例
使用自带的策略
添加策略配置的方法十分简单,只需要配置Bean 就可以了,这个策略是全局的,作用于所有客户端。
@Bean
public RandomRule getRule() {
return new RandomRule();
}
可以在配置文件中,为某个单独的客户端配置策略,如果配置了全局,单独配置的则不会生效,需要注意。
order-service:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule
自定义策略
实现IRule或者继承 AbstractLoadBalancerRule实现其中的chose方法即可。