2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

在前面的章节中,我已经详细介绍了 dubbo-cluster 模块中的 Directory 和 Router 两个核心接口及实现。本章我继续按照下图的顺序介绍 LoadBalance 的相关内容。

202308162143038661.png

LoadBalance(负载均衡)的职责是将网络请求或者其他形式的负载“均摊”到不同的服务节点上,从而避免服务集群中部分节点压力过大、资源紧张,而另一部分节点比较空闲的情况。

通过合理的负载均衡算法,我们希望可以让每个服务节点获取到适合自己处理能力的负载, 实现处理能力和流量的合理分配 。Dubbo 需要对 Consumer 的调用请求进行分配,避免少数 Provider 节点负载过大,而剩余的其他 Provider 节点处于空闲的状态。因为当 Provider 负载过大时,就会导致一部分请求超时、丢失等一系列问题发生,造成线上故障。

Dubbo 提供了 5 种负载均衡实现,分别是:

  • 基于 Hash 一致性的 ConsistentHashLoadBalance;
  • 基于权重随机算法的 RandomLoadBalance;
  • 基于最少活跃调用数算法的 LeastActiveLoadBalance;
  • 基于加权轮询算法的 RoundRobinLoadBalance;
  • 基于最短响应时间的 ShortestResponseLoadBalance 。

常用的负载均衡可分为 软负载均衡硬件负载均衡 ,具体可以参见我写的《透彻理解分布式系统》专栏中的《分布式理论之高性能:负载均衡》这篇文章。本章介绍的主要是负载均衡算法。

一、LoadBalance接口

上述 Dubbo 提供的负载均衡实现,都是 LoadBalance 接口的实现类,如下图所示:

202308162143045752.png

LoadBalance 是一个 SPI 扩展接口,默认使用的扩展实现是 RandomLoadBalance,其定义如下:

    // LoadBalance.java
    
    @SPI(RandomLoadBalance.NAME)
    public interface LoadBalance {
    
        @Adaptive("loadbalance")
        <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    }

上述的 @Adaptive 注解参数为 loadbalance,即动态生成的适配器会按照 URL 中的 loadbalance 参数值选择扩展实现类。LoadBalance 接口中 select() 方法的核心功能是根据传入的 URL 和 Invocation,以及自身的负载均衡算法,从 Invoker 集合中选择一个 Invoker 返回。

1.1 AbstractLoadBalance

AbstractLoadBalance 抽象类并没有真正实现 select() 方法,只是对 Invoker 集合为空或是只包含一个 Invoker 对象的特殊情况进行了处理。另外,AbstractLoadBalance 还提供了一个 getWeight() 方法,该方法用于计算 Provider 权重,具体实现如下:

    // AbstractLoadBalance.java
    
    public abstract class AbstractLoadBalance implements LoadBalance {
        /**
         * 预热降权
         */
        static int calculateWarmupWeight(int uptime, int warmup, int weight) {
            // 计算权重,随着服务运行时间uptime增大,权重ww的值会慢慢接近配置值weight
            int ww = (int) ( uptime / ((float) warmup / weight));
            return ww < 1 ? 1 : (Math.min(ww, weight));
        }
    
        @Override
        public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            // Invoker集合为空,直接返回null
            if (CollectionUtils.isEmpty(invokers)) {
                return null;
            }
            // Invoker集合只包含一个Invoker,则直接返回该Invoker对象
            if (invokers.size() == 1) {
                return invokers.get(0);
            }
            // doSelect是个抽象方法,留给子类具体实现
            return doSelect(invokers, url, invocation);
        }
    
        protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
    
        int getWeight(Invoker<?> invoker, Invocation invocation) {
            int weight;
            URL url = invoker.getUrl();
            // 如果是RegistryService接口的话,直接获取权重即可
            if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
                weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
            } else {
                weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
                if (weight > 0) {
                    // 获取服务提供者的启动时间戳
                    long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
                    if (timestamp > 0L) {
                        // 计算Provider运行时长
                        long uptime = System.currentTimeMillis() - timestamp;
                        if (uptime < 0) {
                            return 1;
                        }
                        // 计算Provider预热时长
                        int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                        // 如果Provider运行时间小于预热时间,则该Provider节点可能还在预热阶段,需要重新计算服务权重
                        if (uptime > 0 && uptime < warmup) {
                            weight = calculateWarmupWeight((int)uptime, warmup, weight);
                        }
                    }
                }
            }
            return Math.max(weight, 0);
        }
    }

上述的calculateWarmupWeight() 方法的目的是:对还在预热状态的 Provider 节点进行降权,避免 Provider 一启动就有大量请求涌进来。 服务预热 是一种常见的优化手段,这是由 JVM 本身的一些特性决定的,例如,JIT 等方面的优化,我们一般会在服务启动之后,让其在小流量状态下运行一段时间,然后再逐步放大流量。

了解了 LoadBalance 接口的定义以及 AbstractLoadBalance 提供的公共能力之后,下面我开始逐个介绍 LoadBalance 接口的具体实现。

二、ConsistentHashLoadBalance

ConsistentHashLoadBalance 底层是使用 一致性Hash算法 实现负载均衡。关于该算法,我在《透彻理解分布式系统》专栏中的《分布式理论之可扩展:一致性Hash》这篇文章详细阐述过。

2.1 基本思想

一致性 Hash 负载均衡可以让参数相同的请求每次都路由到相同的服务节点上 ,这种负载均衡策略可以在某些 Provider 节点下线的时候,让这些节点上的流量平摊到其他 Provider 上,不会引起流量的剧烈波动。

下面我通过一个示例,再来介绍下一致性 Hash 算法的原理:

假设现在有 1、2、3 三个 Provider 节点对外提供服务,有 100 个请求同时到达,如果想让请求尽可能均匀地分布到这三个 Provider 节点上,我们可能想到的最简单的方法就是 Hash 取模,即 hash(请求参数) % 3。如果参与 Hash 计算的是请求的全部参数,那么参数相同的请求将会落到同一个 Provider 节点上。不过此时如果突然有一个 Provider 节点出现宕机的情况,那我们就需要对 2 取模,即请求会重新分配到相应的 Provider 之上。在极端情况下,甚至会出现所有请求的处理节点都发生了变化,这就会造成比较大的波动。

为了避免因一个 Provider 节点宕机,而导致大量请求的处理节点发生变化的情况,我们可以考虑使用一致性 Hash 算法。 一致性 Hash 算法的原理也是取模算法,与 Hash 取模的不同之处在于:Hash 取模是对 Provider 节点数量取模,而一致性 Hash 算法是对 2^32 取模。

一致性 Hash 算法需要同时对 Provider 地址以及请求参数进行取模:

    hash(Provider地址) % 2^32
    hash(请求参数) % 2^32

Provider 地址和请求经过对 2^32 取模得到的结果值,都会落到一个 Hash 环上,如下图所示:

202308162143051783.png

我们按顺时针的方向,依次将请求分发到对应的 Provider。这样,当某台 Provider 节点宕机或增加新的 Provider 节点时,只会影响这个 Provider 节点对应的请求。

在理想情况下,一致性 Hash 算法会将这三个 Provider 节点均匀地分布到 Hash 环上,请求也可以均匀地分发给这三个 Provider 节点。但在实际情况中,这三个 Provider 节点地址取模之后的值,可能差距不大,这样会导致大量的请求落到一个 Provider 节点上,如下图所示:

202308162143059174.png

这就出现了数据倾斜的问题。 所谓数据倾斜是指由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到少量请求的情况

为了解决一致性 Hash 算法中出现的数据倾斜问题,演化出了 Hash槽 的概念。Hash 槽解决数据倾斜的思路是:既然问题是由 Provider 节点在 Hash 环上分布不均匀造成的,那么可以虚拟出 n 组 P1、P2、P3 的 Provider 节点 ,让多组 Provider 节点相对均匀地分布在 Hash 环上。如下图所示,相同阴影的节点均为同一个 Provider 节点,比如 P1-1、P1-2……P1-99 表示的都是 P1 这个 Provider 节点。引入 Provider 虚拟节点之后,让 Provider 在圆环上分散开来,以避免数据倾斜问题。

202308162143064515.png

2.2 算法实现

了解了一致性 Hash 算法的基本原理之后,我们再来看下Dubbo中的 ConsistentHashLoadBalance 一致性 Hash 算法的具体实现。

ConsistentHashLoadBalance.doSelect() 方法,会根据 ServiceKey 和调用的方法名称选择一个 ConsistentHashSelector 对象, 核心算法都委托给 ConsistentHashSelector 对象完成。

    // ConsistentHashLoadBalance.java
    
    public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "consistenthash";
        public static final String HASH_NODES = "hash.nodes";
        public static final String HASH_ARGUMENTS = "hash.arguments";
    
        private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            // 获取调用的方法名称
            String methodName = RpcUtils.getMethodName(invocation);
    
            // 将ServiceKey和方法名称拼接起来,构成一个key
            String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
    
            // 用于感知invokers列表变化的hash值
            int invokersHashCode = invokers.hashCode();
    
            // 根据key获取对应的ConsistentHashSelector对象
            ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
            // 未查找到ConsistentHashSelector对象,则进行创建
            if (selector == null || selector.identityHashCode != invokersHashCode) {
                selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
                selector = (ConsistentHashSelector<T>) selectors.get(key);
            }
            // 通过ConsistentHashSelector对象选择一个Invoker对象
            return selector.select(invocation);
        }
    }

下面我们来看 ConsistentHashSelector,它在创建时会在构造函数中完成Hash环的构建,Hash环是一个TreeMap结构,TreeMap的特点就是可以按照Key值有序遍历:

    // ConsistentHashSelector.java
    
    private static final class ConsistentHashSelector<T> {
    
        // 用于记录虚拟 Invoker 对象的 Hash 环,Key是Hash值,Value是Provider
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
    
        // 虚拟 Invoker 个数
        private final int replicaNumber;
    
        // Invoker 集合的 HashCode 值
        private final int identityHashCode;
    
        // 需要参与 Hash 计算的参数索引。例如,argumentIndex = [0, 1, 2] 时,表示调用的目标方法的前三个参数要参与 Hash 计算
        private final int[] argumentIndex;
    
        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            // 初始化virtualInvokers字段,也就是虚拟Hash槽
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    
            // 该hashCode值用来判断Provider列表是否发生了变化
            this.identityHashCode = identityHashCode;
    
            URL url = invokers.get(0).getUrl();
    
            // 从hash.nodes参数中获取虚拟节点的个数,默认160
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
    
            // 获取参与Hash计算的参数下标值,默认对第一个参数进行Hash运算
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
    
            // 遍历每一个Provider,构造虚拟槽
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
    
                // 每一个Provider构造 40 * 4 = 160 个虚拟槽
                for (int i = 0; i < replicaNumber / 4; i++) {
                    // 对address + i进行md5运算,得到一个长度为16的字节数组
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
        //...
    }

请求会通过 ConsistentHashSelector.select() 方法选择合适的 Invoker 对象,其中会先对请求参数进行 md5 以及 Hash 运算,得到一个 Hash 值,然后再通过这个 Hash 值到 TreeMap 中查找目标 Invoker。具体实现如下:

    // ConsistentHashSelector.java
    
    public Invoker<T> select(Invocation invocation) {
    
        // 将参与一致性Hash的参数拼接到一起
        String key = toKey(invocation.getArguments());
    
        // 计算key的Hash值
        byte[] digest = md5(key);
    
        // 匹配Invoker对象
        return selectForKey(hash(digest, 0));
    }
    
    private Invoker<T> selectForKey(long hash) {
    
        // 从TreeMap中查找第一个节点值大于或等于传入Hash值的Invoker对象
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
    
        // 如果Hash值大于Hash环中的所有Invoker,则回到Hash环的开头,返回第一个Invoker对象
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }
    
        return entry.getValue();
    }

三、RandomLoadBalance

RandomLoadBalance 使用的负载均衡算法是 加权随机算法 。RandomLoadBalance 也是 Dubbo 默认使用的 LoadBalance 实现。

3.1 基本思想

这里我通过一个示例来说明加权随机算法的核心思想。假设我们有三个 Provider 节点 A、B、C,它们对应的权重分别为 5、2、3,权重总和为 10。现在把这些权重值放到一维坐标轴上,[0, 5) 区间属于节点 A,[5, 7) 区间属于节点 B,[7, 10) 区间属于节点 C,如下图所示:

202308162143070746.png

通过随机数生成器在 [0, 10) 这个范围内生成一个随机数,然后计算这个随机数会落到哪个区间中。例如,随机生成 4,就会落到 Provider A 对应的区间中,此时 RandomLoadBalance 就会返回 Provider A 这个节点。

3.2 算法实现

接下来我们再来看 RandomLoadBalance 中 doSelect() 方法的实现,其核心逻辑分为三个关键点:

  • 计算每个 Invoker 对应的权重值以及总权重值;
  • 当各个 Invoker 权重值不相等时,计算随机数应该落在哪个 Invoker 区间中,返回对应的 Invoker 对象;
  • 当各个 Invoker 权重值相同时,随机返回一个 Invoker 即可。
    // RandomLoadBalance.java
    
    public class RandomLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "random";
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size();
            boolean sameWeight = true;
    
            // 保存每个Invoker对象的权重
            int[] weights = new int[length];
    
            // 计算第一个Invoker对象的权重
            int firstWeight = getWeight(invokers.get(0), invocation);
            weights[0] = firstWeight;
    
            int totalWeight = firstWeight;
            for (int i = 1; i < length; i++) {
                // 计算每个Invoker对象的权重
                int weight = getWeight(invokers.get(i), invocation);
                weights[i] = weight;
                // 总权重
                totalWeight += weight;
                // 检测每个Provider的权重是否相同
                if (sameWeight && weight != firstWeight) {
                    sameWeight = false;
                }
            }
    
            // 如果存在不一样权重的Invoker
            if (totalWeight > 0 && !sameWeight) {
                // 获取一个随机数,[0, totalWeight) 区间
                int offset = ThreadLocalRandom.current().nextInt(totalWeight);
                // 循环让offset数减去Invoker的权重值,当offset小于0时,返回相应的Invoker
                for (int i = 0; i < length; i++) {
                    offset -= weights[i];
                    if (offset < 0) {
                        return invokers.get(i);
                    }
                }
            }
            // 随机返回一个Invoker
            return invokers.get(ThreadLocalRandom.current().nextInt(length));
        }
    }

四、LeastActiveLoadBalance

4.1 基本思想

LeastActiveLoadBalance 使用的是 最小活跃数负载均衡算法 。它认为当前活跃请求数越小的 Provider 节点,剩余的处理能力越多,处理请求的效率也就越高,那么该 Provider 在单位时间内就可以处理更多的请求,所以我们应该优先将请求分配给该 Provider 节点。

4.2 算法实现

LeastActiveLoadBalance 需要配合 ActiveLimitFilter 使用,ActiveLimitFilter 会记录每个接口方法的活跃请求数,在 LeastActiveLoadBalance 进行负载均衡时,只会从活跃请求数最少的 Invoker 集合里挑选 Invoker。

在 LeastActiveLoadBalance 的实现中,首先会选出所有活跃请求数最小的 Invoker 对象,之后的逻辑与 RandomLoadBalance 完全一样,即按照这些 Invoker 对象的权重挑选最终的 Invoker 对象。下面是 LeastActiveLoadBalance.doSelect() 方法的具体实现:

    // LeastActiveLoadBalance.java
    
    public class LeastActiveLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "leastactive";
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size();
    
            // 记录最小的活跃请求数
            int leastActive = -1;
    
            // 记录具有相同最小活跃请求数的Invoker个数
            int leastCount = 0;
    
            // 记录活跃请求数最小的Invoker在invokers数组中的下标位置 
            int[] leastIndexes = new int[length];
    
            // 记录活跃请求数最小的Invoker集合中,每个Invoker的权重值
            int[] weights = new int[length];
    
            // 记录活跃请求数最小的Invoker集合中,所有Invoker的权重值之和
            int totalWeight = 0;
    
            // 记录活跃请求数最小的Invoker集合中,第一个Invoker的权重值
            int firstWeight = 0;
    
            // 活跃请求数最小的集合中,所有Invoker的权重值是否相同
            boolean sameWeight = true;
    
            // 遍历所有Invoker,获取活跃请求数最小的Invoker集合
            for (int i = 0; i < length; i++) {
                Invoker<T> invoker = invokers.get(i);
                // 获取该Invoker的活跃请求数
                int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
                // 获取该Invoker的权重
                int afterWarmup = getWeight(invoker, invocation);
    
                weights[i] = afterWarmup;
    
                // 当前Invoker是第一个活跃请求数最小的Invoker
                if (leastActive == -1 || active < leastActive) {
                    // 重新记录最小的活跃请求数
                    leastActive = active;
    
                    // 重新记录活跃请求数最小的Invoker集合个数
                    leastCount = 1;
    
                    // 重新记录Invoker
                    leastIndexes[0] = i;
    
                    // 重新记录总权重值
                    totalWeight = afterWarmup;
    
                    // 该Invoker作为第一个Invoker,记录其权重值
                    firstWeight = afterWarmup;
    
                    // 重新记录是否权重值相等
                    sameWeight = true;
                } 
                // 当前Invoker属于活跃请求数最小的Invoker集合
                else if (active == leastActive) {
                    // 记录该Invoker的下标
                    leastIndexes[leastCount++] = i;
                    // 更新总权重
                    totalWeight += afterWarmup;
                    // 更新权重值是否相等
                    if (sameWeight && afterWarmup != firstWeight) {
                        sameWeight = false;
                    }
                }
            }
            // 如果只有一个活跃请求数最小的Invoker对象,直接返回即可
            if (leastCount == 1) {
                return invokers.get(leastIndexes[0]);
            }
    
            // 按照RandomLoadBalance的逻辑,从活跃请求数最小的Invoker集合中,随机选择一个Invoker对象返回
            if (!sameWeight && totalWeight > 0) {
                int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
                for (int i = 0; i < leastCount; i++) {
                    int leastIndex = leastIndexes[i];
                    offsetWeight -= weights[leastIndex];
                    if (offsetWeight < 0) {
                        return invokers.get(leastIndex);
                    }
                }
            }
            return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
        }
    }

五、RoundRobinLoadBalance

RoundRobinLoadBalance 实现的是 加权轮询负载均衡算法

轮询指的是将请求轮流分配给每个 Provider。例如,有 A、B、C 三个 Provider 节点,按照普通轮询的方式,我们会将第一个请求分配给 Provider A,将第二个请求分配给 Provider B,第三个请求分配给 Provider C,第四个请求再次分配给 Provider A……如此循环往复。

5.1 基本思想

轮询是一种无状态负载均衡算法,实现简单,适用于集群中所有 Provider 节点性能相近的场景。 但现实情况中就很难保证这一点了,因为很容易出现集群中性能最好和最差的 Provider 节点处理同样流量的情况,这就可能导致性能差的 Provider 节点各方面资源非常紧张,甚至无法及时响应了,但是性能好的 Provider 节点的各方面资源使用还较为空闲。这时我们可以通过加权轮询的方式,降低分配到性能较差的 Provider 节点的流量。

加权之后,分配给每个 Provider 节点的流量比会接近或等于它们的权重比。例如,Provider 节点 A、B、C 权重比为 5:1:1,那么在 7 次请求中,节点 A 将收到 5 次请求,节点 B 会收到 1 次请求,节点 C 则会收到 1 次请求。

在 Dubbo 2.6.4 版本及之前,RoundRobinLoadBalance 的实现存在一些问题,例如,选择 Invoker 的性能问题、负载均衡时不够平滑等。在 Dubbo 2.6.5 版本之后,这些问题都得到了修复,所以这里我只介绍最新的 RoundRobinLoadBalance 实现。

5.2 算法实现

每个 Provider 节点有两个权重:

  1. 一个权重是配置的 weight,该值在负载均衡的过程中不会变化;
  2. 另一个权重是 currentWeight,该值会在负载均衡的过程中动态调整,初始值为 0。

当有新的请求进来时,RoundRobinLoadBalance 会遍历 Invoker 列表,并用对应的 currentWeight 加上其配置的权重。遍历完成后,再找到最大的 currentWeight,将其减去权重总和,然后返回相应的 Invoker 对象。

下面我通过一个示例说明 RoundRobinLoadBalance 的执行流程,这里我们依旧假设 A、B、C 三个节点的权重比例为 5:1:1。

202308162143076027.png

具体的执行流程如下:

  1. 处理第一个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [0, 0, 0] 变为 [5, 1, 1]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 A。最后,将节点 A 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [-2, 1, 1];
  2. 处理第二个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [-2, 1, 1] 变为 [3, 2, 2]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 A。最后,将节点 A 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [-4, 2, 2];
  3. 处理第三个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [-4, 2, 2] 变为 [1, 3, 3]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 B。最后,将节点 B 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [1, -4, 3];
  4. 处理第四个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [1, -4, 3] 变为 [6, -3, 4]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 A。最后,将节点 A 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [-1, -3, 4];
  5. 处理第五个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [-1, -3, 4] 变为 [4, -2, 5]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 C。最后,将节点 C 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [4, -2, -2];
  6. 处理第六个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [4, -2, -2] 变为 [9, -1, -1]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 A。最后,将节点 A 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [2, -1, -1];
  7. 处理第七个请求,currentWeight 数组中的权重与配置的 weight 相加,即从 [2, -1, -1] 变为 [7, 0, 0]。接下来,从中选择权重最大的 Invoker 作为结果,即节点 A。最后,将节点 A 的 currentWeight 值减去 totalWeight 值,最终得到 currentWeight 数组为 [0, 0, 0]。

到此为止,一个轮询的周期就结束了。

在 Dubbo 2.6.4 版本中,上面示例的一次轮询结果是 [A, A, A, A, A, B, C],也就是说前 5 个请求会全部都落到 A 这个节点上。这将会使节点 A 在短时间内接收大量的请求,压力陡增,而节点 B 和节点 C 此时没有收到任何请求,处于完全空闲的状态,这种“瞬间分配不平衡”的情况也就是前面提到的“不平滑问题”。

在 RoundRobinLoadBalance 中,每个 Invoker 对象都有一个对应的 WeightedRoundRobin 对象,用来记录配置的权重(weight 字段)以及随每次负载均衡算法执行变化的 current 权重(current 字段):

    // RoundRobinLoadBalance.WeightedRoundRobin.java
    
    protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;
    
        public int getWeight() {
            return weight;
        }
    
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
    
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
    
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
    
        public long getLastUpdate() {
            return lastUpdate;
        }
    
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }

我们再来看 RoundRobinLoadBalance.doSelect() 方法的具体实现:

    // RoundRobinLoadBalance.java
    
    public class RoundRobinLoadBalance extends AbstractLoadBalance {
        public static final String NAME = "roundrobin";
    
        private static final int RECYCLE_PERIOD = 60000;
    
        private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    
        protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
            String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
            Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
            if (map != null) {
                return map.keySet();
            }
            return null;
        }
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    
            // 获取整个Invoker列表对应的WeightedRoundRobin映射表,如果为空,则创建一个新的WeightedRoundRobin映射表
            ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
            int totalWeight = 0;
            long maxCurrent = Long.MIN_VALUE;
            // 获取当前时间
            long now = System.currentTimeMillis();
            Invoker<T> selectedInvoker = null;
            WeightedRoundRobin selectedWRR = null;
            for (Invoker<T> invoker : invokers) {
                String identifyString = invoker.getUrl().toIdentityString();
                int weight = getWeight(invoker, invocation);
                // 检测当前Invoker是否有相应的WeightedRoundRobin对象,没有则进行创建
                WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
                    WeightedRoundRobin wrr = new WeightedRoundRobin();
                    wrr.setWeight(weight);
                    return wrr;
                });
    
                // 检测Invoker权重是否发生了变化,若发生变化,则更新WeightedRoundRobin的weight字段
                if (weight != weightedRoundRobin.getWeight()) {
                    weightedRoundRobin.setWeight(weight);
                }
                // 让currentWeight加上配置的Weight
                long cur = weightedRoundRobin.increaseCurrent();
                //  设置lastUpdate字段
                weightedRoundRobin.setLastUpdate(now);
                // 寻找具有最大currentWeight的Invoker,以及Invoker对应的WeightedRoundRobin
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = invoker;
                    selectedWRR = weightedRoundRobin;
                }
                // 计算权重总和
                totalWeight += weight;
            }
            if (invokers.size() != map.size()) {
                map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
            }
            if (selectedInvoker != null) {
                // 用currentWeight减去totalWeight
                selectedWRR.sel(totalWeight);
                // 返回选中的Invoker对象
                return selectedInvoker;
            }
            return invokers.get(0);
        }
    }

六、ShortestResponseLoadBalance

6.1 基本思想

ShortestResponseLoadBalance 是 Dubbo 2.7 版本之后新增加的一个 LoadBalance 实现类 。它实现了 最短响应时间的负载均衡算法 ,也就是从多个 Provider 节点中选出调用成功的且响应时间最短的 Provider 节点,不过满足该条件的 Provider 节点可能有多个,所以还要再使用随机算法进行一次选择,得到最终要调用的 Provider 节点。

6.2 算法实现

了解了 ShortestResponseLoadBalance 的基本思想之后,我们直接来看 ShortestResponseLoadBalance.doSelect() 方法的核心实现:

    // ShortestResponseLoadBalance.java
    
    public class ShortestResponseLoadBalance extends AbstractLoadBalance {
    
        public static final String NAME = "shortestresponse";
    
        @Override
        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            // 记录Invoker集合的数量
            int length = invokers.size();
    
            // 用于记录所有Invoker集合中最短响应时间
            long shortestResponse = Long.MAX_VALUE;
    
            // 具有相同最短响应时间的Invoker个数
            int shortestCount = 0;
    
            // 存放所有最短响应时间的Invoker的下标
            int[] shortestIndexes = new int[length];
    
            // 存储每个Invoker的权重
            int[] weights = new int[length];
    
            // 存储权重总和
            int totalWeight = 0;
    
            // 记录第一个Invoker对象的权重
            int firstWeight = 0;
    
            // 最短响应时间Invoker集合中的Invoker权重是否相同
            boolean sameWeight = true;
    
            // Filter out all the shortest response invokers
            for (int i = 0; i < length; i++) {
                Invoker<T> invoker = invokers.get(i);
                RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    
                // 获取调用成功的平均时间:调用成功的请求数总数对应的总耗时 / 调用成功的请求数总数
                long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
    
                // 获取的是该Provider当前的活跃请求数,也就是当前正在处理的请求数
                int active = rpcStatus.getActive();
    
                // 计算处理一个新请求的预估值,也就是如果当前请求发给这个Provider,大概耗时多久处理完成
                long estimateResponse = succeededAverageElapsed * active;
    
                // 计算该Invoker的权重(主要是处理预热)
                int afterWarmup = getWeight(invoker, invocation);
                weights[i] = afterWarmup;
    
                // 第一次找到Invoker集合中最短响应耗时的Invoker对象,记录其相关信息
                if (estimateResponse < shortestResponse) {
                    shortestResponse = estimateResponse;
                    shortestCount = 1;
                    shortestIndexes[0] = i;
                    totalWeight = afterWarmup;
                    firstWeight = afterWarmup;
                    sameWeight = true;
                } 
                // 出现多个耗时最短的Invoker对象
                else if (estimateResponse == shortestResponse) {
                    shortestIndexes[shortestCount++] = i;
                    totalWeight += afterWarmup;
                    if (sameWeight && i > 0 && afterWarmup != firstWeight) {
                        sameWeight = false;
                    }
                }
            }
            if (shortestCount == 1) {
                return invokers.get(shortestIndexes[0]);
            }
    
            // 如果耗时最短的所有Invoker对象的权重不相同,则通过加权随机负载均衡的方式选择一个Invoker返回
            if (!sameWeight && totalWeight > 0) {
                int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
                for (int i = 0; i < shortestCount; i++) {
                    int shortestIndex = shortestIndexes[i];
                    offsetWeight -= weights[shortestIndex];
                    if (offsetWeight < 0) {
                        return invokers.get(shortestIndex);
                    }
                }
            }
            // 如果耗时最短的所有Invoker对象的权重相同,则随机返回一个
            return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
        }
    }

七、总结

本章,我重点介绍了 Dubbo Cluster 层中的负载均衡相关的内容。首先我们介绍了 LoadBalance 接口的定义以及 AbstractLoadBalance 抽象类提供的公共能力。然后,我还对LoadBalance接口的各类负载均衡算法的实现进行了深入分析。

阅读全文