前面章节,我已经对 Dubbo 架构中的 Remoting 层进行了完整介绍,包括 Dubbo 底层的网络模型以及线程模型。从本章开始,我就开始介绍 Dubbo Remoting 上面的一层—— RPC 层。RPC 层对应dubbo-rpc
模块,也同样包含很多子层,整个RPC层在Dubbo框架中的位置如下图:
本章,我主要针对RPC层的核心接口进行讲解,这些接口大多定义在dubbo-rpc-api
中:
一、dubbo-rpc模块划分
下图展示了 dubbo-rpc
模块的结构,其中每个子模块对应一种RPC协议:
1.1 顶层抽象
上述的dubbo-rpc-api
模块,是 Dubbo 对 Rpc 层的顶层抽象,里面定义了很多核心接口,其它 dubbo-rpc-* 子模块都是依赖第三方协议框架实现了 dubbo-rpc-api
模块中的核心接口的。
dubbo-rpc-api 模块中的各个包的功能如下:
- filter 包 :包含了很多过滤器,当服务消费方进行服务引用时,会通过这些过滤器进行过滤;
- listener 包 :在服务发布/引用的过程中,我们可以添加一些 Listener 来监听相应的事件;
- protocol 包 :一些实现了 Protocol 接口以及 Invoker 接口的抽象类位于该包中,它们为 Protocol 接口及 Invoker 接口的具体实现提供了一些公共逻辑;
- proxy 包 :提供了创建代理的能力,支持 JDK 动态代理以及 Javassist 字节码两种方式;
- support 包 :包括了 RpcUtils 工具类、Mock 相关的 Protocol 实现以及 Invoker 实现;
- 其它接口 :没有在上述 package 中的接口和类,是更为核心的抽象接口。
二、核心接口
在 Dubbo RPC 层中涉及的核心接口有 Invoker、Invocation、Protocol、Result、Exporter、ProtocolServer、Filter 等,这些接口分别抽象了 Dubbo RPC 层的不同概念,看似相互独立,但又相互协同,一起构建出了 Dubbo RPC 层的骨架。本节,我就来逐一介绍这些核心接口的含义。
2.1 Invoker
Invoker 接口是整个Dubbo框架中最核心的接口之一,渗透在整个 Dubbo 代码实现里。Dubbo 中的很多设计思想都会向 Invoker 这个概念靠拢:
// Invoker.java
public interface Invoker<T> extends Node {
/**
* 服务接口
*/
Class<T> getInterface();
/**
* 进行一次调用,也有人称之为一次"会话"
*/
Result invoke(Invocation invocation) throws RpcException;
}
服务消费方发起RPC远程调用时,底层的代理对象就是通过Invoker完成网络调用的。
2.2 Invocation
Invocation 接口是 Invoker.invoke()
方法的入参,它抽象了一次 RPC 调用的目标服务和方法信息、相关参数信息、具体的参数值以及一些附加信息,具体定义如下:
// Invocation.java
public interface Invocation {
// 目标服务的唯一标识
String getTargetServiceUniqueName();
/**
* 目标服务的方法名
*/
String getMethodName();
/**
* 目标服务名称
*/
String getServiceName();
/**
* 方法的参数类型
*/
Class<?>[] getParameterTypes();
/**
* 方法的参数签名
*/
default String[] getCompatibleParamSignatures() {
return Stream.of(getParameterTypes())
.map(Class::getName)
.toArray(String[]::new);
}
/**
* 方法的参数值
*/
Object[] getArguments();
/**
* 此次调用关联的Invoker对象
*/
Invoker<?> getInvoker();
/**
* Invoker对象可以设置一些KV属性,这些属性并不会传递给Provider
*/
Object put(Object key, Object value);
Object get(Object key);
Map<Object, Object> getAttributes();
/**
* Invocation可以携带一个KV作为附加信息,一并传递给Provider(注意与attribute的区分)
*/
String getAttachment(String key);
Object getObjectAttachment(String key);
String getAttachment(String key, String defaultValue);
Object getObjectAttachment(String key, Object defaultValue);
Map<String, String> getAttachments();
Map<String, Object> getObjectAttachments();
void setAttachment(String key, String value);
void setAttachment(String key, Object value);
void setObjectAttachment(String key, Object value);
void setAttachmentIfAbsent(String key, String value);
void setAttachmentIfAbsent(String key, Object value);
void setObjectAttachmentIfAbsent(String key, Object value);
}
2.3 Result
Result 接口是 Invoker.invoke() 方法的返回值,抽象了一次调用的返回值,其中包含了被调用方的返回值(或是异常)以及附加信息,我们也可以添加回调方法,在 RPC 调用方法结束时会触发这些回调。Result 接口的具体定义如下:
// Result.java
public interface Result extends Serializable {
/**
* 获取此次调用的返回值
*/
Object getValue();
/**
* 设置此次调用的返回值
*/
void setValue(Object value);
/**
* 如果此次调用发生异常,则可以通过下面三个方法获取
*/
Throwable getException();
void setException(Throwable t);
boolean hasException();
/**
* recreate()方法是一个复合操作,如果此次调用发生异常,则直接抛出异常,如果没有异常,则返回结果
*/
Object recreate() throws Throwable;
/**
* 添加一个回调,当RPC调用完成时,会触发添加的回调
*/
Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
<U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
/**
* 阻塞线程,等待此次RPC调用完成(或是超时)
*/
Result get() throws InterruptedException, ExecutionException;
Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
/**
* Result中同样可以携带附加信息
*/
Map<String, String> getAttachments();
Map<String, Object> getObjectAttachments();
void addAttachments(Map<String, String> map);
void addObjectAttachments(Map<String, Object> map);
void setAttachments(Map<String, String> map);
void setObjectAttachments(Map<String, Object> map);
String getAttachment(String key);
Object getObjectAttachment(String key);
String getAttachment(String key, String defaultValue);
Object getObjectAttachment(String key, Object defaultValue);
void setAttachment(String key, String value);
void setAttachment(String key, Object value);
void setObjectAttachment(String key, Object value);
}
2.4 Exporter
Exporter接口用于暴露 Invoker 的实现。比如,在Dubbo Provider服务提供方中,我们定义的业务服务接口的实现会被包装成一个 AbstractProxyInvoker 对象,然后由 Exporter 暴露出去。Exporter 会让 Provider 能够根据请求的各种信息,找到对应的 Invoker:
// Exporter.java
public interface Exporter<T> {
/**
* 获取底层封装的Invoker对象
*/
Invoker<T> getInvoker();
/**
* 取消发布底层的Invoker对象
*/
void unexport();
}
2.5 ExporterListener
ExporterListener监听器,用于监听服务发布事件,以及取消发布事件,其定义如下:
// ExporterListener.java
@SPI
public interface ExporterListener {
/**
* 当有服务发布的时候,会触发该方法
*/
void exported(Exporter<?> exporter) throws RpcException;
/**
* 当有服务取消发布的时候,会触发该方法
*/
void unexported(Exporter<?> exporter);
}
虽然 ExporterListener 是个SPI扩展接口,但是 Dubbo 并没有提供什么有用的扩展实现,我们需要自己提供具体实现,监听感兴趣的事情。
2.6 InvokerListener
InvokerListener 监听器,用于监听 Consumer 引用服务时触发的事件,定义如下:
// InvokerListener.java
@SPI
public interface InvokerListener {
/**
* 当服务引用的时候,会触发该方法
*/
void referred(Invoker<?> invoker) throws RpcException;
/**
* 当销毁引用的服务时,会触发该方法
*/
void destroyed(Invoker<?> invoker);
}
2.7 Protocol
Protocol 接口 ,也是整个 Dubbo Protocol 层的核心接口之一,其中定义了 export()
和 refer()
两个核心方法,具体定义如下:
// Protocol.java
@SPI("dubbo")
public interface Protocol {
/**
* 默认端口
*/
int getDefaultPort();
/**
* 将一个Invoker暴露出去,export()方法实现需要是幂等的,即同一个服务暴露多次和暴露一次的效果是相同的
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用一个Invoker,refer()方法会根据参数返回一个Invoker对象,
* Consumer端可以通过这个Invoker请求Provider端的服务
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 销毁export()方法以及refer()方法使用到的Invoker对象,释放当前Protocol对象底层占用的资源
*/
void destroy();
/**
* 返回当前Protocol底层的全部ProtocolServer
*/
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
在 Protocol 接口的实现中:
- export() 方法并不是简单地将 Invoker 对象包装成 Exporter 对象返回,其中还涉及代理对象的创建、底层 Server 的启动等操作;
- refer() 方法除了根据传入的 type 类型以及 URL 参数查询 Invoker 之外,还涉及相关 Client 的创建等操作。
2.8 ProxyFactory
Dubbo 在 Protocol 层专门定义了一个 ProxyFactory 接口 ,作为创建代理对象的工厂。ProxyFactory 接口是一个 SPI 扩展接口,其中定义了 getProxy()
方法为 Invoker 创建代理对象,还定义了 getInvoker()
方法将代理对象反向封装成 Invoker 对象:
// ProxyFactory.java
@SPI("javassist")
public interface ProxyFactory {
/**
* 为传入的Invoker对象创建代理对象
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* 为传入的Invoker对象创建代理对象
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/**
* 将传入的代理对象封装成Invoker对象,可以暂时理解为getProxy()的逆操作
*/
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
ProxyFactory 的默认实现是JavassistProxyFactory
,使用 javassist 来创建代理对象,当然,Dubbo 还提供了其他方式来创建代码,例如 JDK 动态代理。
2.9 ProtocolServer
ProtocolServer 接口,是对 RemotingServer 的一层简单封装,其实现也都非常简单,这里就不再展开:
// ProtocolServer.java
public interface ProtocolServer {
default RemotingServer getRemotingServer() {
return null;
}
default void setRemotingServers(RemotingServer server) {
}
String getAddress();
void setAddress(String address);
default URL getUrl() {
return null;
}
default void reset(URL url) {
}
void close();
}
2.10 Filter
Filter 接口,是用来拦截 Dubbo 请求的。在 Dubbo 的 Filter 接口中,定义了一个 invoke() 方法将请求传递给后续的 Invoker 进行处理,本质就是一种 责任链模式 。Filter 接口的具体定义如下:
// Filter.java
@SPI
public interface Filter {
// 将请求传给后续的Invoker进行处理
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
interface Listener {
// 用于监听响应
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
// 用于监听异常
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
Filter 也是一个 SPI 扩展接口,Dubbo 提供了丰富的 Filter 实现来进行功能扩展,当然我们也可以提供自己的 Filter 实现来扩展 Dubbo 的功能。
三、总结
本章,我对 Dubbo RPC 层的模块划分,以及 dubbo-rpc-api
模块的结构和其中各个包提供的基本功能进行了讲解,Dubbo RPC 层中的核心接口都定义在dubbo-rpc-api
模块中,包括 Invoker、Invocation、Protocol、Result、ProxyFactory、ProtocolServer 等核心接口,以及 ExporterListener、Filter 等扩展类的接口。后续章节,我将对这些接口的核心实现类的功能和源码逐个分析讲解。