2023-08-16  阅读(4)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

从本章开始,我将对 Dubbo 架构中的 Cluster 层进行分析。

在生产环境中,为了保证服务的高可用、高性能以及容错能力,我们通常会在多个服务器上运行相同的服务端程序,然后以 集群 的形式对外提供服务。根据各项性能指标的要求不同,各个服务端集群中服务实例的个数也不尽相同,从几个实例到几百个实例不等。

针对这种服务集群的情况,客户端程序需要解决一些问题,比如:

  • 客户端程序是否要感知每个服务端地址?
  • 客户端程序的一次请求,到底调用哪个服务端程序呢?
  • 请求失败之后是重试,还是抛出异常?如果是重试,是再次请求该服务实例,还是尝试请求其它服务实例?
  • 服务端集群如何做到负载均衡,负载均衡的标准是什么呢?

为了解决上述问题, Dubbo 独立出了一个实现集群功能的模块—— dubbo-cluster

202308162142431011.png

Dubbo Cluster 层隶属于 RPC 层,它在整个Dubbo 架构图中的位置如下:

202308162142436192.png

一、Cluster 架构

dubbo-cluster 模块的主要功能是将多个 Provider 聚合成一个 Provider 供 Consumer 调用,其中涉及集群的容错处理、路由规则的处理以及负载均衡。下图展示了 dubbo-cluster 模块中的核心组件:

202308162142450753.png

从上图可以看出,dubbo-cluster 主要包括以下四个核心接口:

  • Cluster 接口 :集群容错接口,主要是在某些 Provider 节点发生故障时,让 Consumer 的调用请求能够发送到正常的 Provider 节点,从而保证服务集群的高可用;
  • Directory 接口 :表示服务目录,是多个 Invoker 的集合,是后续路由规则、负载均衡策略以及集群容错的基础;
  • Router 接口 :表示路由器,请求经过 Router 时会按照用户指定的规则匹配出符合条件的 Provider;
  • LoadBalance 接口 :负载均衡接口,Consumer 会按照指定的负载均衡策略,从 Provider 集合中选出一个最合适的 Provider 节点来处理请求。

Cluster 层的核心流程是:

  1. 当调用进入 Cluster 时,Cluster 会创建一个 AbstractClusterInvoker 对象;
  2. 这个 AbstractClusterInvoker 首先从 Directory 中获取当前 Invoker 集合;
  3. 然后,按照 Router 集合进行路由,得到符合条件的 Invoker 集合;
  4. 最后,按照 LoadBalance 指定的负载均衡策略,得到最终要调用的 Invoker 对象。

了解了 dubbo-cluster 模块的核心架构和基础组件之后,我来按照上面架构图的顺序介绍每个接口的定义及相关实现。

二、Directory 接口

Directory 接口表示的是一个集合,该集合由多个 Invoker 构成,后续的路由处理、负载均衡、集群容错等一系列操作都是在 Directory 基础上实现的。

    // Directory.java
    
    public interface Directory<T> extends Node {
        /**
         * 获取服务接口的类型
         */
        Class<T> getInterface();
    
        /**
         * 根据传入的Invocation请求,过滤自身维护的Invoker集合,返回符合条件的Invoker集合
         */
        List<Invoker<T>> list(Invocation invocation) throws RpcException;
    
        /**
         * 返回当前Directory对象维护的全部Invoker对象
         */
        List<Invoker<T>> getAllInvokers();
        /**
         * 返回Consumer端的Url
         */
        URL getConsumerUrl();
    }

2.1 AbstractDirectory

AbstractDirectory 是 Directory 接口的抽象实现,其中除了维护 Consumer 端的 URL 信息,还维护了一个 RouterChain 对象,用于记录当前使用的 Router 对象集合。

AbstractDirectory 对 list() 方法的实现也比较简单,就是直接委托给了 doList() 方法,doList() 是个抽象方法,由 AbstractDirectory 的子类实现。

    // AbstractDirectory.java
    
    public abstract class AbstractDirectory<T> implements Directory<T> {
        private final URL url;
    
        private volatile boolean destroyed = false;
    
        private volatile URL consumerUrl;
    
        protected RouterChain<T> routerChain;
    
        public AbstractDirectory(URL url, RouterChain<T> routerChain) {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
    
            this.url = url.removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
            this.consumerUrl = url.addParameters(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)))
                    .removeParameter(MONITOR_KEY);
            setRouterChain(routerChain);
        }
    
        @Override
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            return doList(invocation);
        }
    
        @Override
        public URL getUrl() {
            return url;
        }
    
        protected void addRouters(List<Router> routers) {
            routers = routers == null ? Collections.emptyList() : routers;
            routerChain.addRouters(routers);
        }
    
        protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
    }

Directory 接口有 RegistryDirectoryStaticDirectory 两个具体实现,如下图所示:

202308162142457944.png

2.2 StaticDirectory

StaticDirectory 实现中维护的 Invoker 集合是静态的,在 StaticDirectory 对象创建完成之后,不会再发生变化。

StaticDirectory 的实现比较简单:

  • 在构造方法中,StaticDirectory 会接收一个 Invoker 集合,并赋值到自身的 invokers 字段中,作为底层的 Invoker 集合;
  • 在 doList() 方法中,StaticDirectory 会使用 RouterChain 中的 Router 从 invokers 集合中过滤出符合路由规则的 Invoker 对象集合。
    // StaticDirectory.java
    
    public class StaticDirectory<T> extends AbstractDirectory<T> {
    
        private final List<Invoker<T>> invokers;
    
        public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {
            super(url == null && CollectionUtils.isNotEmpty(invokers) ? invokers.get(0).getUrl() : url, routerChain);
            if (CollectionUtils.isEmpty(invokers)) {
                throw new IllegalArgumentException("invokers == null");
            }
            this.invokers = invokers;
        }
    
        @Override
        public Class<T> getInterface() {
            return invokers.get(0).getInterface();
        }
    
        @Override
        public List<Invoker<T>> getAllInvokers() {
            return invokers;
        }
    
        public void buildRouterChain() {
            // 根据 URL 构造一个包含内置 Router 的 RouterChain 对象
            RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
            // 将invokers与RouterChain关联
            routerChain.setInvokers(invokers);
            // 设置routerChain字段
            this.setRouterChain(routerChain);
        }
    
        @Override
        protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
            List<Invoker<T>> finalInvokers = invokers;
            if (routerChain != null) {
                try {
                    // 通过RouterChain过滤出符合条件的Invoker集合
                    finalInvokers = routerChain.route(getConsumerUrl(), invocation);
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
            return finalInvokers == null ? Collections.emptyList() : finalInvokers;
        }
    }

2.3 RegistryDirectory

RegistryDirectory 中维护的 Invoker 集合会随着注册中心中维护的注册信息动态发生变化,这就依赖了 ZooKeeper 等注册中心的推送能力。

RegistryDirectory 实现了 NotifyListener 接口,当注册中心中的服务信息发生变化时,RegistryDirectory 会收到变更通知,然后动态增删底层 Invoker 集合。

核心字段

我们先来看一下 RegistryDirectory 中的核心字段,在 RegistryDirectory 的构造方法中,会根据传入的注册中心 URL 初始化核心字段:

    // RegistryDirectory.java
    
    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
        // 集群策略适配器,通过 Dubbo SPI 方式动态创建
        private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    
        // 路由工厂适配器,通过 Dubbo SPI 方式动态创建
        private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
    
        // 服务对应的 ServiceKey,默认由 {interface}:[group]:[version] 三部分构成
        private final String serviceKey; 
    
        // 服务接口类型,例如org.apache.dubbo.demo.DemoService
        private final Class<T> serviceType; 
    
        // Consumer URL 中 refer 参数解析后得到的全部 KV
        private final Map<String, String> queryMap;
    
        // 只保留 Consumer 属性的 URL,也就是由 queryMap 集合重新生成的 URL
        private final URL directoryUrl; 
    
        // 是否引用多个服务组
        private final boolean multiGroup;
    
        // 使用的 Protocol 实现
        private Protocol protocol; 
    
        // 使用的注册中心实现
        private Registry registry; 
    
        private volatile boolean forbidden = false;
        private boolean shouldRegister;
        private boolean shouldSimplified;
    
        private volatile URL overrideDirectoryUrl;
        private volatile URL registeredConsumerUrl;
    
        // 动态更新的配置信息
        private volatile List<Configurator> configurators; 
    
        // Provider URL 与对应 Invoker 之间的映射,该集合会与 invokers 字段同时动态更新
        private volatile Map<String, Invoker<T>> urlInvokerMap; 
    
        // 动态更新的 Invoker 集合
        private volatile List<Invoker<T>> invokers;
    
        // 当前缓存的所有 Provider 的 URL,该集合会与 invokers 字段同时动态更新
        private volatile Set<URL> cachedInvokerUrls;
    
        private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
        private ReferenceConfigurationListener serviceConfigurationListener;
    
        public RegistryDirectory(Class<T> serviceType, URL url) {
            // 传入的url参数是注册中心的URL
            super(url); 
            shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
            shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
            this.serviceType = serviceType;
            this.serviceKey = url.getServiceKey();
    
            // 解析refer参数值,得到其中Consumer的属性信息
            this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    
            // 将queryMap中的KV作为参数,重新构造URL,其中的protocol和path部分不变
            this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
            String group = directoryUrl.getParameter(GROUP_KEY, "");
            this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
        }
        //...
    }

subscribe方法

在完成初始化之后,我们来看 subscribe() 方法,该方法会在 Consumer 进行订阅的时候被调用,内部会调用 Registry.subscribe() 完成订阅操作,同时还会将当前 RegistryDirectory 对象作为 NotifyListener 监听器添加到 Registry 中,具体实现如下:

    // RegistryDirectory.java
    
    public void subscribe(URL url) {
    
        setConsumerUrl(url);
    
        // 将当前RegistryDirectory对象作为ConfigurationListener记录到CONSUMER_CONFIGURATION_LISTENER中
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    
        // 完成订阅操作,注册中心的相关操作在前文已经介绍过了,这里不再重复
        registry.subscribe(url, this);
    }

notify方法

在 RegistryDirectory.notify() 方法中,首先会按照 category 对发生变化的 URL 进行分类,分成 configurators、routers、providers 三类,并分别对不同类型的 URL 进行处理:

  • 将 configurators 类型的 URL 转化为 Configurator,保存到 configurators 字段中;
  • 将 router 类型的 URL 转化为 Router,并通过 routerChain.addRouters() 方法添加 routerChain 中保存;
  • 将 provider 类型的 URL 转化为 Invoker 对象,并记录到 invokers 集合和 urlInvokerMap 集合中。

notify() 方法的具体实现如下:

    // RegistryDirectory.java
    
    public synchronized void notify(List<URL> urls) {
    
        // 按照category进行分类,分成configurators、routers、providers三类
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));
    
        // 获取configurators类型的URL,并转换成Configurator对象
        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
        // 获取routers类型的URL,并转成Router对象,添加到RouterChain中
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);
    
        // 获取providers类型的URL,调用refreshOverrideAndInvoker()方法进行处理
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    
        //... 在Dubbo3.0中会触发AddressListener监听器,但是现在AddressListener接口还没有实现,所以省略这段代码
    
        refreshOverrideAndInvoker(providerURLs);
    }

doList方法

该方法是 AbstractDirectory 留给其子类实现的一个方法,也是通过 Directory 接口获取 Invoker 集合的核心所在,具体实现如下:

    // RegistryDirectory.java
    
    public List<Invoker<T>> doList(Invocation invocation) {
    
        // 检测forbidden字段,当该字段在refreshInvoker()过程中设置为true时,表示无Provider可用,直接抛出异常
        if (forbidden) { 
            throw new RpcException("...");
        }
    
        if (multiGroup) {
            // multiGroup为true时的特殊处理,在refreshInvoker()方法中针对multiGroup为true的场景,已经使用Router进行了筛选,所以这里直接返回接口
            return this.invokers == null ? Collections.emptyList() : this.invokers;
        }
    
        List<Invoker<T>> invokers = null;
    
        // 通过RouterChain.route()方法筛选Invoker集合,最终得到符合路由条件的Invoker集合
        invokers = routerChain.route(getConsumerUrl(), invocation);
        return invokers == null ? Collections.emptyList() : invokers;
    }

三、总结

本章,我首先介绍了 dubbo-cluster 模块的整体架构,简单说明了 Cluster、Directory、Router、LoadBalance 四个核心接口的功能。接下来我深入介绍了 Directory 接口的定义以及 StaticDirectory、RegistryDirectory 两个类的核心实现。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文