2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

前面章节,我已经对 Dubbo 架构中的 Remoting 层进行了完整介绍,包括 Dubbo 底层的网络模型以及线程模型。从本章开始,我就开始介绍 Dubbo Remoting 上面的一层—— RPC 层。RPC 层对应dubbo-rpc模块,也同样包含很多子层,整个RPC层在Dubbo框架中的位置如下图:

202308162142104971.png

本章,我主要针对RPC层的核心接口进行讲解,这些接口大多定义在dubbo-rpc-api中:

202308162142132642.png

一、dubbo-rpc模块划分

下图展示了 dubbo-rpc 模块的结构,其中每个子模块对应一种RPC协议:

202308162142148463.png

1.1 顶层抽象

上述的dubbo-rpc-api模块,是 Dubbo 对 Rpc 层的顶层抽象,里面定义了很多核心接口,其它 dubbo-rpc-* 子模块都是依赖第三方协议框架实现了 dubbo-rpc-api 模块中的核心接口的。

dubbo-rpc-api 模块中的各个包的功能如下:

  • filter 包 :包含了很多过滤器,当服务消费方进行服务引用时,会通过这些过滤器进行过滤;
  • listener 包 :在服务发布/引用的过程中,我们可以添加一些 Listener 来监听相应的事件;
  • protocol 包 :一些实现了 Protocol 接口以及 Invoker 接口的抽象类位于该包中,它们为 Protocol 接口及 Invoker 接口的具体实现提供了一些公共逻辑;
  • proxy 包 :提供了创建代理的能力,支持 JDK 动态代理以及 Javassist 字节码两种方式;
  • support 包 :包括了 RpcUtils 工具类、Mock 相关的 Protocol 实现以及 Invoker 实现;
  • 其它接口 :没有在上述 package 中的接口和类,是更为核心的抽象接口。

二、核心接口

在 Dubbo RPC 层中涉及的核心接口有 Invoker、Invocation、Protocol、Result、Exporter、ProtocolServer、Filter 等,这些接口分别抽象了 Dubbo RPC 层的不同概念,看似相互独立,但又相互协同,一起构建出了 Dubbo RPC 层的骨架。本节,我就来逐一介绍这些核心接口的含义。

2.1 Invoker

Invoker 接口是整个Dubbo框架中最核心的接口之一,渗透在整个 Dubbo 代码实现里。Dubbo 中的很多设计思想都会向 Invoker 这个概念靠拢:

    // Invoker.java
    
    public interface Invoker<T> extends Node {
    
        /**
         * 服务接口
         */
        Class<T> getInterface();
    
        /**
         * 进行一次调用,也有人称之为一次"会话"
         */
        Result invoke(Invocation invocation) throws RpcException;
    
    }

服务消费方发起RPC远程调用时,底层的代理对象就是通过Invoker完成网络调用的。

2.2 Invocation

Invocation 接口是 Invoker.invoke() 方法的入参,它抽象了一次 RPC 调用的目标服务和方法信息、相关参数信息、具体的参数值以及一些附加信息,具体定义如下:

    // Invocation.java
    
    public interface Invocation {
    
        // 目标服务的唯一标识
        String getTargetServiceUniqueName();
    
        /**
         * 目标服务的方法名
         */
        String getMethodName();
    
        /**
         * 目标服务名称
         */
        String getServiceName();
    
        /**
         * 方法的参数类型
         */
        Class<?>[] getParameterTypes();
    
        /**
         * 方法的参数签名
         */
        default String[] getCompatibleParamSignatures() {
            return Stream.of(getParameterTypes())
                    .map(Class::getName)
                    .toArray(String[]::new);
        }
    
        /**
         * 方法的参数值
         */
        Object[] getArguments();
    
        /**
         * 此次调用关联的Invoker对象
         */
        Invoker<?> getInvoker();
    
        /**
         * Invoker对象可以设置一些KV属性,这些属性并不会传递给Provider
         */
        Object put(Object key, Object value);
        Object get(Object key);
        Map<Object, Object> getAttributes();
    
        /**
         * Invocation可以携带一个KV作为附加信息,一并传递给Provider(注意与attribute的区分)
         */
        String getAttachment(String key);
    
        Object getObjectAttachment(String key);
    
        String getAttachment(String key, String defaultValue);
    
        Object getObjectAttachment(String key, Object defaultValue);
    
        Map<String, String> getAttachments();
    
        Map<String, Object> getObjectAttachments();
    
        void setAttachment(String key, String value);
    
        void setAttachment(String key, Object value);
    
        void setObjectAttachment(String key, Object value);
    
        void setAttachmentIfAbsent(String key, String value);
    
        void setAttachmentIfAbsent(String key, Object value);
    
        void setObjectAttachmentIfAbsent(String key, Object value);
    }

2.3 Result

Result 接口是 Invoker.invoke() 方法的返回值,抽象了一次调用的返回值,其中包含了被调用方的返回值(或是异常)以及附加信息,我们也可以添加回调方法,在 RPC 调用方法结束时会触发这些回调。Result 接口的具体定义如下:

    // Result.java
    
    public interface Result extends Serializable {
    
        /**
         * 获取此次调用的返回值
         */
        Object getValue();
    
        /**
         * 设置此次调用的返回值
         */
        void setValue(Object value);
    
        /**
         * 如果此次调用发生异常,则可以通过下面三个方法获取
         */
        Throwable getException();
        void setException(Throwable t);
        boolean hasException();
    
        /**
         * recreate()方法是一个复合操作,如果此次调用发生异常,则直接抛出异常,如果没有异常,则返回结果
         */
        Object recreate() throws Throwable;
    
        /**
         * 添加一个回调,当RPC调用完成时,会触发添加的回调
         */
        Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
        <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
    
        /**
         * 阻塞线程,等待此次RPC调用完成(或是超时)
         */
        Result get() throws InterruptedException, ExecutionException;
        Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
        /**
         *  Result中同样可以携带附加信息
         */
        Map<String, String> getAttachments();
    
        Map<String, Object> getObjectAttachments();
    
        void addAttachments(Map<String, String> map);
    
        void addObjectAttachments(Map<String, Object> map);
    
        void setAttachments(Map<String, String> map);
    
        void setObjectAttachments(Map<String, Object> map);
    
        String getAttachment(String key);
    
        Object getObjectAttachment(String key);
    
        String getAttachment(String key, String defaultValue);
    
        Object getObjectAttachment(String key, Object defaultValue);
    
        void setAttachment(String key, String value);
    
        void setAttachment(String key, Object value);
    
        void setObjectAttachment(String key, Object value);
    }

2.4 Exporter

Exporter接口用于暴露 Invoker 的实现。比如,在Dubbo Provider服务提供方中,我们定义的业务服务接口的实现会被包装成一个 AbstractProxyInvoker 对象,然后由 Exporter 暴露出去。Exporter 会让 Provider 能够根据请求的各种信息,找到对应的 Invoker:

    // Exporter.java
    
    public interface Exporter<T> {
    
        /**
         * 获取底层封装的Invoker对象
         */
        Invoker<T> getInvoker();
    
        /**
         * 取消发布底层的Invoker对象
         */
        void unexport();
    }

2.5 ExporterListener

ExporterListener监听器,用于监听服务发布事件,以及取消发布事件,其定义如下:

    // ExporterListener.java
    
    @SPI
    public interface ExporterListener {
    
        /**
         * 当有服务发布的时候,会触发该方法
         */
        void exported(Exporter<?> exporter) throws RpcException;
    
        /**
         * 当有服务取消发布的时候,会触发该方法
         */
        void unexported(Exporter<?> exporter);
    }

虽然 ExporterListener 是个SPI扩展接口,但是 Dubbo 并没有提供什么有用的扩展实现,我们需要自己提供具体实现,监听感兴趣的事情。

2.6 InvokerListener

InvokerListener 监听器,用于监听 Consumer 引用服务时触发的事件,定义如下:

    // InvokerListener.java
    
    @SPI
    public interface InvokerListener {
    
        /**
         * 当服务引用的时候,会触发该方法
         */
        void referred(Invoker<?> invoker) throws RpcException;
    
        /**
         * 当销毁引用的服务时,会触发该方法
         */
        void destroyed(Invoker<?> invoker);
    }

2.7 Protocol

Protocol 接口 ,也是整个 Dubbo Protocol 层的核心接口之一,其中定义了 export()refer() 两个核心方法,具体定义如下:

    // Protocol.java
    
    @SPI("dubbo")
    public interface Protocol {
    
        /**
         * 默认端口
         */
        int getDefaultPort();
    
        /**
         * 将一个Invoker暴露出去,export()方法实现需要是幂等的,即同一个服务暴露多次和暴露一次的效果是相同的
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        /**
         * 引用一个Invoker,refer()方法会根据参数返回一个Invoker对象,
         * Consumer端可以通过这个Invoker请求Provider端的服务
         */
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        /**
         * 销毁export()方法以及refer()方法使用到的Invoker对象,释放当前Protocol对象底层占用的资源
         */
        void destroy();
    
        /**
         * 返回当前Protocol底层的全部ProtocolServer
         */
        default List<ProtocolServer> getServers() {
            return Collections.emptyList();
        }
    }

在 Protocol 接口的实现中:

  • export() 方法并不是简单地将 Invoker 对象包装成 Exporter 对象返回,其中还涉及代理对象的创建、底层 Server 的启动等操作;
  • refer() 方法除了根据传入的 type 类型以及 URL 参数查询 Invoker 之外,还涉及相关 Client 的创建等操作。

2.8 ProxyFactory

Dubbo 在 Protocol 层专门定义了一个 ProxyFactory 接口 ,作为创建代理对象的工厂。ProxyFactory 接口是一个 SPI 扩展接口,其中定义了 getProxy() 方法为 Invoker 创建代理对象,还定义了 getInvoker() 方法将代理对象反向封装成 Invoker 对象:

    // ProxyFactory.java
    
    @SPI("javassist")
    public interface ProxyFactory {
    
        /**
         * 为传入的Invoker对象创建代理对象
         */
        @Adaptive({PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker) throws RpcException;
    
        /**
         * 为传入的Invoker对象创建代理对象
         */
        @Adaptive({PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
    
        /**
         * 将传入的代理对象封装成Invoker对象,可以暂时理解为getProxy()的逆操作
         */
        @Adaptive({PROXY_KEY})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    }

ProxyFactory 的默认实现是JavassistProxyFactory,使用 javassist 来创建代理对象,当然,Dubbo 还提供了其他方式来创建代码,例如 JDK 动态代理。

2.9 ProtocolServer

ProtocolServer 接口,是对 RemotingServer 的一层简单封装,其实现也都非常简单,这里就不再展开:

    // ProtocolServer.java
    
    public interface ProtocolServer {
    
        default RemotingServer getRemotingServer() {
            return null;
        }
    
        default void setRemotingServers(RemotingServer server) {
        }
    
        String getAddress();
    
        void setAddress(String address);
    
        default URL getUrl() {
            return null;
        }
    
        default void reset(URL url) {
        }
    
        void close();
    }

2.10 Filter

Filter 接口,是用来拦截 Dubbo 请求的。在 Dubbo 的 Filter 接口中,定义了一个 invoke() 方法将请求传递给后续的 Invoker 进行处理,本质就是一种 责任链模式 。Filter 接口的具体定义如下:

    // Filter.java
    
    @SPI
    public interface Filter {
        // 将请求传给后续的Invoker进行处理
        Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
    
        interface Listener {
            // 用于监听响应
            void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
            // 用于监听异常
            void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
        }
    }

Filter 也是一个 SPI 扩展接口,Dubbo 提供了丰富的 Filter 实现来进行功能扩展,当然我们也可以提供自己的 Filter 实现来扩展 Dubbo 的功能。

三、总结

本章,我对 Dubbo RPC 层的模块划分,以及 dubbo-rpc-api 模块的结构和其中各个包提供的基本功能进行了讲解,Dubbo RPC 层中的核心接口都定义在dubbo-rpc-api模块中,包括 Invoker、Invocation、Protocol、Result、ProxyFactory、ProtocolServer 等核心接口,以及 ExporterListener、Filter 等扩展类的接口。后续章节,我将对这些接口的核心实现类的功能和源码逐个分析讲解。

阅读全文