2023-07-29
原文作者:说好不能打脸 原文地址:https://yinwj.blog.csdn.net/article/details/49453303

5-2、protocol:RPC调用层

如果您直接看阿里提供的DUBBO框架的技术文档,您可以看到几个关键字和几张工作图。不过您肯定是无法通过那几张图搞清楚DUBBO框架的每一层是如何进行衔接,如何进行脱耦的。在这几篇文章中,我会主要从设计思路上,从DUBBO所采用的设计模式层面上,尽可能的向读者表达DUBBO框架的设计思路。

在DUBBO框架中,有一个自己的RPC协议,名字叫做dubbo(小写的)。不止如此,DUBBO框架的RPC层还支持多种RPC协议的使用( hessian、thrift、RMI),还支持webservice协议和纯http协议。那么DUBBO框架是如何确认“哪个接口使用哪一种协议”这个问题的呢?要说清楚这个问题,就需要从DUBBO框架RPC层的模块设计入手:

202307292152056101.png

就像前文提到的那样:关键模块的实现类的实例化都是通过DUBBO框架中的SPI机制完成的。例如Proxy层中的具体代理、RPC层的具体协议等等。我会通过代码跟踪的方式,详细介绍DUBBO框架RPC层的实例化过程。(DUBBO框架中,SPI的具体实现在com.alibaba.dubbo.common.extension包中,最主要的实现类是ExtensionLoader)

当DUBBO框架启动时,ExtensionLoader会加载所有的扩展点设置(见com.alibaba.dubbo.common.extension.SPI注解符)。ExtensionLoader会通过loadExtensionClasses()私有方法找到“com.alibaba.dubbo.rpc.Protocol”接口所匹配的配置文件:

    // 此方法已经getExtensionClasses方法同步过(这句注释不是我加的)。
    private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
    
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        // yinwenjie加的注释:加载接口类对应的配置文件
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadFile(extensionClasses, DUBBO_DIRECTORY);
        loadFile(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }

然后通过“ExtensionLoader.createExtension(String name)”私有方法生成相应的实例。注意下图中外部传入的’name’变量,是我在< dubbo:protocol >标签中填写的name=”thrift”。以下是这部分源代码:

    @SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
    
        }
    }

调用效果如下:

202307292152086932.png

在DUBBO框架中,您通过config配置文件所配置的“具体实现”,基本上都是通过这种方式进行加载的。通过由SPI支持的生成方式,和各种protocol协议定义的实现,使得DUBBO框架具备了在RPC层支持多种协议的功能特性。

6、config:配置模块和配置层

config模块负责管理DUBBO框架中的所有配置信息,在DUBBO框架启动/工作中,为各个模块提供配置点信息。在这个小节中,我们不会分析config模块怎么使用的(如果您想了解DUBBO框架的配置规范,可以参考DUBBO框架的用户手册),这个小节我们主要对config模块的设计结构、启动方式进行分析:

202307292152107993.png

上图中,没有特别说明的ApplicationConfig、ModuleConfig、ProtocolConfig等类分别对应了

这些配置标签对应的config类中,AnnotationBean类、ReferenceBean类和ServiceBean类提供了配置标签和Spring框架的结合机制。接下来我们以ServiceBean为例,为大家讲解< dubbo:service >标签中的配置信息加载过程。

首先我们需要清楚的是:和< dubbo:service >标签相关的标签还包括了:< dubbo:provider/ >、< dubbo:application/ > 、< dubbo:module/ >、< dubbo:registry/ >、< dubbo:monitor/ >和
< dubbo:protocol/ >标签。这些辅助(依赖)标签在您定义< dubbo:service >的时候,有的是必须定义的有的是可以选择性定义的。

也就是说在进行< dubbo:service >标签初始化时,还需要对可能存在的以上这些辅助标签一起进行初始化。然后作为< dubbo:service >标签的关联信息,一同写入到spring容器中进行保存 。在ServiceConfig类及其父类AbstractServiceConfig、AbstractInterfaceConfig中,定义了这些关联属性,代码片段如下:

    public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
        .......
        // 服务监控
        protected MonitorConfig        monitor;
        // 代理类型
        protected String               proxy;
        // 集群方式
        protected String               cluster; 
         // 应用信息
        protected ApplicationConfig    application;
        // 模块信息
        protected ModuleConfig         module;
        // 注册中心
        protected List<RegistryConfig> registries;
        // 允许的协议
        protected List<ProtocolConfig> protocols;
        .......
    }

当Spring框架的配置文件被加载完成时,ServiceBean所实现的接口定义InitializingBean中的方法afterPropertiesSet()会被激活运行。以下是代码片段:

    // 试图寻找<dubbo:provider/>标签的定义
    if (getProvider() == null) {
       Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
        if (providerConfigMap != null && providerConfigMap.size() > 0) {
            Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null  : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
    
            // .......(省略了一些处理代码)
            // 如果处理成功则作为ServiceBean的属性进行关联
            if (providerConfig != null) {
                 setProvider(providerConfig);
             }
        }
    }
    
    //试图寻找<dubbo:application/>标签的定义
    if (getApplication() == null
         && (getProvider() == null || getProvider().getApplication() == null)) {
        Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
    
        // .......(省略了一些处理代码)
        // 如果处理成功则作为serviceBean的属性进行关联
        if (applicationConfig != null) {
            setApplication(applicationConfig);
        } 
    }
    
    // ..... 接着处理其它可能存在标签

在DUBBO框架中,最重要的两个标签是< dubbo:service/ >标签和< dubbo:reference/ >标签。这两个标签都是和Spring框架进行了依赖的,通过对< dubbo:service/ >标签的初始化过程也可以知道< dubbo:reference/ >标签的初始化过程了。

7、registry:注册中心模块

首先要说明的是,DUBBO框架的注册中心模块是用来统筹DUBBO服务端(服务提供者)和客户端(服务消费者)的各种状态、事件。以便DUBBO框架的各端能够知晓当前整个分布式环境的全局,新的服务能够注册进来,并且可以被消费端发现;

DUBBO框架的注册中心有多种实现,既有大家熟悉的Zookeeper实现,还有Multicast网络广播的实现、Redis缓存服务的实现和本地实现,下图是Registry模块的主要类图——您会发现是一个典型的工厂方法模式(原图比较大,被缩小了。请点击新的页签进行查看)。

202307292152184894.png

7-1、DUBBO内部的URL

在介绍DUBBO中Registry工作过程之前,首先需要介绍DUBBO框架中“com.alibaba.dubbo.common.URL”这个类。在DUBBO框架中,各模块(各层)间传递消息的主要描述方式,是通过DUBBO框架中的这个类完成的。下面是这个类的主要代码片段:

    public final class URL implements Serializable {
    
        private static final long serialVersionUID = -1985165475234910535L;
        private final String protocol;
        private final String username;
        private final String password;
        private final String host;
        private final int port;
        private final String path;
        private final Map<String, String> parameters;
    
        // ==== cache ====
        /*
         * yinwenjie加的注释:
         * 请注意这部分代码。其中带有volatile关键字。
         * 意味着当多线程环境下每个线程拿到的都是这些属性的最新值。
         * 
         * 另外,这些属性都还带有transient关键字。
         * 意味着当URL对象被序列化时,这些属性不会被序列化
         */
        private volatile transient Map<String, Number> numbers;
        private volatile transient Map<String, URL> urls;
        private volatile transient String ip;
        private volatile transient String full;
        private volatile transient String identity;
        private volatile transient String parameter;
        private volatile transient String string;
    
        protected URL() {
            this.protocol = null;
            this.username = null;
            this.password = null;
            this.host = null;
            this.port = 0;
            this.path = null;
            this.parameters = null;
        }
    
        public URL(String protocol, String host, int port) {
            this(protocol, null, null, host, port, null, (Map<String, String>) null);
        }
    
        public URL(String protocol, String host, int port, String[] pairs) { 
            // 变长参数...与下面的path参数冲突,改为数组
            this(protocol, null, null, host, port, null, CollectionUtils.toStringMap(pairs));
        }
    
        public URL(String protocol, String host, int port, Map<String, String> parameters) {
            this(protocol, null, null, host, port, null, parameters);
        }
    
        //.........(省略了一部分构造函数)
    
        public URL(String protocol, String username, String password, String host, int port, String path, Map<String, String> parameters) {
            if ((username == null || username.length() == 0) 
                    && password != null && password.length() > 0) {
                throw new IllegalArgumentException("Invalid url, password without username!");
            }
            this.protocol = protocol;
            this.username = username;
            this.password = password;
            this.host = host;
            this.port = (port < 0 ? 0 : port);
            this.path = path;
            // trim the beginning "/"
            while(path != null && path.startsWith("/")) {
                path = path.substring(1);
            }
            if (parameters == null) {
                parameters = new HashMap<String, String>();
            } else {
                parameters = new HashMap<String, String>(parameters);
            }
            this.parameters = Collections.unmodifiableMap(parameters);
        }
    
        // ........还有很多代码
    }

那么DUBBO框架中的URL信息是怎么描述的呢?我们举几个实际的例子进行说明:

    zookeeper://10.5.1.246:2181/com.alibaba.dubbo.registry.RegistryService?application=ws-demo&dubbo=2.4.10&interface=com.alibaba.dubbo.registry.RegistryService&pid=1144&timestamp=1451030327372
    
    dubbo://10.5.1.121:20880/yinwenjie.test.dubboService.iface.MyService?anyhost=true&application=ws-demo&dubbo=2.4.10&interface=yinwenjie.test.dubboService.iface.MyService&methods=doMyTest&pid=1144&side=provider&timestamp=1451030346703

实例中的第一个URL描述了DUBBO框架注册中心层的协议描述。表示注册层使用的协议是zookeeper,访问地址为10.5.1.246,访问端口为2181。访问路径为com.alibaba.dubbo.registry.RegistryService。剩下的是参数信息,存储在URL类中的parameters属性。

实例中的第二个URL描述,是DUBBO框架Protocol层的协议描述。表示Protocol层使用的具体协议是“dubbo”(这个是DUBBO框架中自带的一种RPC协议);访问地址为:10.5.1.121(我的主机提供的一个可访问IP);访问端口为20880;访问路径为yinwenjie.test.dubboService.iface.MyService。剩下的参数信息,存储在URL类中的parameters属性。其中比较重要的参数包括了RPC协议的版本号,timestamp时间戳、methods接口方法信息等。

虽然我查询了尽可能多的DUBBO框架开发的公开文档,但是我并没有找到DUBBO框架的作者对“为什么会采用这种信息描述形式”的原因说明。可能DUBBO的作者们并没有进行这部分的文档编辑(可能性较低),也有可能DUBBO的作者们将这部分描述作为了内部文档,并没有开放;但是根据我肤浅的理解,作者应该有出于这样的考虑:

我们观察分布式系统的工作环境就可以发现, DUBBO框架中的每一个服务生产者/服务消费者节点的版本很有可能是不一样的 。如果不同版本中同样的协议描述存在差异(这种差异可能和微小,就是增加或者减少了一个属性),就会导致DUBBO框架中协议信息描述对象的修改。但如果采用URL格式进行描述就不会出现这样的问题,特别是将URL的属性放入到Map中;另外采用URL形式进行协议描述也增加了DUBBO框架开发调试阶段的信息可读性。

7-2、Registry工作过程

202307292152249135.png

接下来,我们看看Registry注册中心的一个具体实现ZookeeperRegistry的工作过程。首先我们看看Registry注册中心最顶层的接口RegistryService有一些什么样的定义(这个接口是注册服务层的基本接口):

    package com.alibaba.dubbo.registry;
    
    import java.util.List;
    
    import com.alibaba.dubbo.common.URL;
    
    /**
     * RegistryService. (SPI, Prototype, ThreadSafe)
     * 
     * @see com.alibaba.dubbo.registry.Registry
     * @see com.alibaba.dubbo.registry.RegistryFactory#getRegistry(URL)
     * @author william.liangf
     */
    public interface RegistryService {
    
        /**
         * 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。
         * 
         * 注册需处理契约:<br>
         * 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
         * 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
         * 3. 当URL设置了category=routers时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
         * 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
         * 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
         * 
         * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
         */
        void register(URL url);
    
        /**
         * 取消注册.
         * 
         * 取消注册需处理契约:<br>
         * 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
         * 2. 按全URL匹配取消注册。<br>
         * 
         * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
         */
        void unregister(URL url);
    
        /**
         * 订阅符合条件的已注册数据,当有注册数据变更时自动推送.
         * 
         * 订阅需处理契约:<br>
         * 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
         * 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
         * 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
         * 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
         * 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
         * 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
         * 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
         * 
         * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
         * @param listener 变更事件监听器,不允许为空
         */
        void subscribe(URL url, NotifyListener listener);
    
        /**
         * 取消订阅.
         * 
         * 取消订阅需处理契约:<br>
         * 1. 如果没有订阅,直接忽略。<br>
         * 2. 按全URL匹配取消订阅。<br>
         * 
         * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
         * @param listener 变更事件监听器,不允许为空
         */
        void unsubscribe(URL url, NotifyListener listener);
    
        /**
         * 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
         * 
         * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
         * @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
         * @return 已注册信息列表,可能为空,含义同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
         */
        List<URL> lookup(URL url);
    }

从上面给出的类结构图来看,无论您使用的是Redis作为注册中心的媒介,还是使用Zookeeper作为注册中心的媒介,还是DUBBO框架所支持的任意一种注册中心媒介,他们都实现了RegistryService接口。也就是说,这些注册中心至少有四个基本功能。注册、取消注册、订阅、取消订阅。

那么针对Zookeeper的实现,这些功能是怎么实现的呢?请看以下部分AbstractRegistry类的代码:

    public abstract class AbstractRegistry implements Registry {
        // 。。。。。 省略了部分代码
    
        // 存放已被注册的URL信息
        private final Set<URL> registered = new ConcurrentHashSet<URL>();
    
        // 存放已被订阅的监听映射信息,从subscribed的接口来看,一个URL可以有多个订阅的监听
        private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    
        // 存放在某个事件中,已完成事件通知的订阅信息
        private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    
        // ........ 又省略很多代码。重点来看那四个方法的实现:
        // 实现register方法
        public void register(URL url) {
            ........
            registered.add(url);
        }
    
        // 实现unregister方法
        public void unregister(URL url) {
            ........
            registered.remove(url);
        }
    
        // 实现subscribe方法
        public void subscribe(URL url, NotifyListener listener) {
    
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners == null) {
                subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
                listeners = subscribed.get(url);
            }
            listeners.add(listener);
        }
    
        // 实现unsubscribe方法
        public void unsubscribe(URL url, NotifyListener listener) {
            Set<NotifyListener> listeners = subscribed.get(url);
            if (listeners != null) {
                listeners.remove(listener);
            }
        }
    }

上面的代码很简单,就是将调用者给定的URL信息和监听信息,按照既定的本地存储格式进行存入。代码到这里,注册过程离完成还早着呢!首先考虑一个问题,根据特定的某种注册中心实现介质,是不是还有一些特有的注册动作要执行呢?答案肯定是的。

例如这里基于zookeeper的注册中心实现,肯定 还需要通知远程连接的zookeeper,将注册URL信息或者监听信息进行存入;如果操作出现错误,还要将之前已经写入结构的注册URL信息或者监听信息进行删除或者记录没有成功的操作(以便随后自动重试)

这个过程不是由AbstractRegistry类完成的,而是由其子类FailbackRegistry、ZookeeperRegistry类完成的;我们先来看看FailbackRegistry这个类中的关键:

    public abstract class FailbackRegistry extends AbstractRegistry {
            // 。。。。。。yinwenjie的注释:省略了很多代码
        public void register(URL url) {
            // 执行AbstractRegistry中的register方法
            super.register(url);
            // 清除可能存在的URL错误记录
            failedRegistered.remove(url);
            failedUnregistered.remove(url);
    
            try {
                // 向服务器端发送注册请求(等一下见ZookeeperRegistry的错误方法)
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
                // 如果开启了启动时检测,则直接抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
    
                    // 抛出异常
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    // 因为后续要重试,所以只需要显示错误日志。不用抛出异常
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // 将失败的注册请求记录到失败列表,定时重试
                failedRegistered.add(url);
            }
        }
    
        //。。。。。。。又忽略了很多代码
    
        // ==== 模板方法,FailbackRegistry子类必须实现这些方法 ====
    
        protected abstract void doRegister(URL url);
    
        protected abstract void doUnregister(URL url);
    
        protected abstract void doSubscribe(URL url, NotifyListener listener);
    
        protected abstract void doUnsubscribe(URL url, NotifyListener listener);
    }

很明显FailbackRegistry并不是用来进行zookeeper相关注册或者监听映射过程的,而是 起到一个承上启下的作用,隔离处理过程中的公共过程和特有过程 。这是一个很好的处理思路,各位读者阅读别人的代码就是为了学习这些处理思路。

那么我们再来看ZookeeperRegistry中,针对zookeeper的特定处理过程(这个就简单了,按照已经确定的zookeeper目录结构,进行写操作即可):

    public class ZookeeperRegistry extends FailbackRegistry {
        // 。。。。。省略了很多代码
    
        // com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient接口
        // 在DUBBO框架中,这个接口一共有两个实现
        private final ZookeeperClient zkClient;
    
        // 。。。。。又省略了很多代码
        protected void doRegister(URL url) {
            try {
                // 这里进行zookeeper相关的特有操作
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        // 。。。。。省略了很多代码
    }

8、下文介绍

好吧,我承认从介绍DUBBO框架开始的几篇文章,我引用了开源软件中大量的代码片段。这些代码片段主要还是为读者介绍设计思路和开源软件作者的coding技巧。各位读者一定要有耐心,静心阅读这些代码和我的注释说明(这是一种重要的修禅方式)。

另外,在书写这两篇DUBBO框架的代码分析文章中,我发现文章内容越来越偏离我这个专栏的中心思想。所以在完成DUBBO框架Registry的代码分析后,我决定暂停这部分的写作(后面的专栏中,我还会抽时间将分析继续)以便让文章回归专栏的中心思想。再说,我也不希望在RPC和服务治理的知识体系上耽误太多的时间。

从下一篇文章开始,我将进入这个专栏下一部分的写作,也就是系统间通讯的另一个知识体系ESB技术。严格来说,RPC与服务治理 和 ESB企业服务总线他们都是SOA思想的一个实现,但是两个知识体系的使用环境、工作原理都是不一样的(当然承载他们的基础知识都已一样的,所以学好基础非常重要)。敬请各位读者期待后续的专题文章。

最后感谢各位读者对我的关注,上一篇专题文章《架构设计:系统间通信(17)——服务治理与Dubbo 中篇(分析)》居然在10天时间内突破了1万的阅读量,网络上的转发数量更是不计其数。

阅读全文