Dubbo 为了更好地达到 OCP 原则(即“对扩展开放,对修改封闭”的原则),采用了“ 微内核+插件 ”的架构。微内核架构中,内核通常采用 Factory、IoC、OSGi 等方式管理插件生命周期, Dubbo 采用 SPI 机制来加载插件 ,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。
本章,我将分为三个部分循序渐进讲解Dubbo SPI机制:
- Dubbo SPI机制中大量运用了“统一配置模型”,这是一种基于URL的接口契约,首先,我对Dubbo的URL统一配置模型进行讲解;
- Dubbo SPI机制参考了JDK SPI机制,并对它进行了增强,接着我对JDK SPI机制进行讲解;
- 最后,我将讲解Dubbo SPI机制的原理和运用。
一、数据总线URL
Dubbo 采用了 URL 形式的"统一配置模型",有时也叫配置总线,Dubbo 将自己的任意组件的实现抽象为一个 URL,使用 URL 来统一描述了所有对象和配置信息,并贯穿在整个 Dubbo 框架之中。下面是 Dubbo 中一个典型 URL 的示例:
dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714
上述URL可以抽象成以下格式:
protocol://username:password@host:port/path?key=value&key=value
一共分为五部分:
- protocol :dubbo 协议;
- username/password :没有用户名和密码;
- host/port :172.17.32.91:20880;
- path :org.apache.dubbo.demo.DemoService;
- parameters :参数键值对,即?后面的参数。
1.1 Dubbo URL
Dubbo 中的 URL,之所以被称为”配置总线”,或者“统一配置模型”,是因为 URL 在 Dubbo 中被当作是“ 公共的契约 ”。一个优秀的开源框架都有一套灵活清晰的扩展契约,这样不仅第三方可以按照这个契约进行扩展,其自身内核也可以按照这个契约进行搭建:
- 使用 URL 进行上下文信息传递,可以形成一个统一的规范,使代码更加易读、易懂;
- 使用 URL 作为方法入参,所表达的含义比单个参数更丰富,当代码需要扩展时,可以将新的参数以 Key/Value 的形式追加到 URL 之中,而不需要改变入参或是返回值的结构。
1.2 运用示例
我们来看几个在Dubbo中应用URL这种契约规范的示例。
服务注册
服务提供者 Provider 在启动时,会将自身暴露的服务注册到 ZooKeeper 上,那么具体是注册哪些信息呢?
我们来看 ZookeeperRegistry.doRegister()
方法,其入参就是org.apache.dubbo.common.URL
:
// ZookeeperRegistry.java
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
//...
}
}
URL 中包含了 Provider 的地址/端口、暴露的接口(org.apache.dubbo.demo.DemoService)等信息, toUrlPath() 方法会根据传入的 URL 参数确定在 ZooKeeper 上创建的节点路径,还会通过 URL 中的 dynamic 参数值确定创建的 ZNode 是临时节点还是持久节点。
服务订阅
服务消费者 Consumer 在启动时,会向注册中心发起订阅操作,并监听自己关注的 Provider。那 Consumer 是如何告诉注册中心自己关注哪些 Provider 呢?
我们来看 ZookeeperRegistry.doSubscribe()
方法,其入参就是org.apache.dubbo.common.URL
:
// ZookeeperRegistry.java
public void doSubscribe(final URL url, final NotifyListener listener) {
//...
}
Consumer 的订阅URL一般是以下格式:
consumer://...?application=dubbo-demo-api-consumer&category=providers,configurators,routers&interface=org.apache.dubbo.demo.DemoService...
其中 protocol
为 consumer ,表示是 Consumer 的订阅协议,其中的 category 参数表示要订阅的分类,这里要订阅 providers、configurators 以及 routers 三个分类;interface 参数表示订阅的服务接口,这里订阅的是暴露 org.apache.dubbo.demo.DemoService 实现的 Provider。
通过 URL 中的上述参数,ZookeeperRegistry 会将其整理成一个 ZooKeeper 路径,然后调用 zkClient 在其上添加监听。
二、JDK SPI
SPI(Service Provider Interface)主要是被 框架开发人员 使用的一种技术。例如,使用 Java 语言访问数据库时我们会使用到 java.sql.Driver 接口,不同数据库产品底层的协议不同,提供的 java.sql.Driver 实现也不同,在开发 java.sql.Driver 接口时,开发人员并不清楚用户最终会使用哪个数据库,在这种情况下就可以使用 Java SPI 机制在实际运行过程中,为 java.sql.Driver 接口寻找具体的实现。
当服务提供者提供了一种接口的实现后,需要在 classpath 的 META-INF/services/
目录里创建一个以服务接口命名的文件,该文件记录了该 jar 包提供的服务接口的具体实现类。当某个应用引入了该 jar 包且需要使用该服务时,JDK SPI 机制就可以通过查找这个 jar 包的 META-INF/services/
中的配置文件来获得具体的实现类名,进行实现类的加载和实例化,最终使用该实现类完成业务功能。
2.1 使用示例
下面我通过一个简单的示例,演示 JDK SPI 的基本使用方式。
假设我们需要一个 Log 接口,用于日志打印:
package com.tpvlog.log;
public interface Log {
void log(String info);
}
接下来提供两个实现—— Logback 和 Log4j,分别代表两个不同日志框架的实现:
package com.tpvlog.log.impl;
public class Logback implements Log {
@Override
public void log(String info) {
System.out.println("Logback:" + info);
}
}
package com.tpvlog.log.impl;
public class Log4j implements Log {
@Override
public void log(String info) {
System.out.println("Log4j:" + info);
}
}
接着,在项目的 resources/META-INF/services
目录下添加一个名为 com.tpvlog.log.Log
的文件,这是 JDK SPI 需要读取的配置文件,具体内容如下:
com.tpvlog.log.impl.Log4j
com.tpvlog.log.impl.Logback
我们可以将上述工程打成一个jar包,在其它工程项目中的引用,其它工程项目可以像下面这样使用:
public class Main {
public static void main(String[] args) {
ServiceLoader<Log> serviceLoader = ServiceLoader.load(Log.class);
Iterator<Log> iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
Log log = iterator.next();
log.log("JDK SPI");
}
}
}
输出如下:
Log4j:JDK SPI
Logback:JDK SPI
2.2 实现原理
ServiceLoader是JDK SPI机制的核心类,它会加载/META-INF/services
路径中的配置文件,创建全部接口的实现类对象。ServiceLoader.load()
方法,首先会获取当前使用的 ClassLoader,然后调用 reload() 方法:
// ServiceLoader.java
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
public static <S> ServiceLoader<S> load(Class<S> service, ClassLoader loader) {
return new ServiceLoader<>(service, loader);
}
private ServiceLoader(Class<S> svc, ClassLoader cl) {
service = Objects.requireNonNull(svc, "Service interface cannot be null");
loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
reload();
}
public void reload() {
// 清空缓存
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}
前面示例中使用的迭代器,其底层就是LazyIterator,在迭代过程中,最终调用了hasNextService法,该方法负责查找 META-INF/services
目录下的 SPI 配置文件,并进行遍历:
// LazyIterator.java
private static final String PREFIX = "META-INF/services/";
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
// 文件名,即示例中的META-INF/services/com.tpvlog.log.Log)
String fullName = PREFIX + service.getName();
// 加载配置文件
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
// 解析配置文件
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}
在上述方法中完成 SPI 配置文件的解析后,再来看 LazyIterator.nextService() 方法,该方法 负责实例化 hasNextService() 方法读取到的实现类 ,并将对象放到 providers 集合中缓存,核心实现如下所示:
// LazyIterator.java
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
// 加载指定类
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
// 创建对象
S p = service.cast(c.newInstance());
// 缓存对象,Key为类名
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error();
}
2.3 应用场景
JDK SPI 在很多场景都有应用,最典型的就是JDBC:使用 Java 语言访问数据库时,我们会使用到 java.sql.Driver
接口,不同数据库产品底层的协议不同,针对 java.sql.Driver
的实现也不同。
这里我以 MySQL 提供的 JDBC 实现包为例进行分析。我们可以在 MySQL 的 JDBC 驱动包 mysql-connector-java-*.jar
中的 META-INF/services
目录下,发现一个 名为java.sql.Driver
的 文件,里面只有一行内容:
com.mysql.cj.jdbc.Driver
代表了MySQL针对java.sql.Driver
接口的实现类是com.mysql.cj.jdbc.Driver
。当我们使用JDBC连接MySQL数据库时会用到JDK提供了DriverManager类:
Connection conn = DriverManager.getConnection(url, username, pwd);
DriverManager类在加载时,有这么一段代码,实际上就是通过 JDK SPI 扫描 classpath 下 java.sql.Driver
接口实现类并加载到ClassLoader中:
// DriverManager.java
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
static {
loadInitialDrivers();
println("JDBC DriverManager initialized");
}
private static void loadInitialDrivers() {
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// 使用 JDK SPI机制加载所有 java.sql.Driver实现类
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try{
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});
println("DriverManager.initialize: jdbc.drivers = " + drivers);
if (drivers == null || drivers.equals("")) {
return;
}
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
// 加载类到ClassLoader
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
Class.forName(aDriver, true, ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}
在 MySQL 提供的 com.mysql.cj.jdbc.Driver
实现类中,同样有一段 static 静态代码块,这段代码会创建一个 com.mysql.cj.jdbc.Driver
对象并注册到 DriverManager.registeredDrivers 集合中:
static {
java.sql.DriverManager.registerDriver(new Driver());
}
在 DriverManager.getConnection()
方法中,DriverManager 从该 registeredDrivers
集合中获取对应的 Driver 对象创建 Connection,核心实现如下所示:
// DriverManager.java
private static Connection getConnection(String url, java.util.Properties info, Class<?> caller)
throws SQLException {
//...
for(DriverInfo aDriver : registeredDrivers) {
Connection con = aDriver.driver.connect(url, info);
return con;
}
}
我们思考一下,为什么JDBC要这样设计?其实本质就是一种契约式编程的思想,JDK提供了一套标准的数据库操作接口规范,比如DriverManager、Driver等等,然后利用了 桥接模式 完成各个接口组件之间的组装,具体的接口实现留给数据库厂商们去做。而JDBC在加载驱动时,利用了SPI机制,以达到动态灵活的目的。
三、Dubbo SPI
Dubbo 并没有直接使用 JDK SPI 机制,而是借鉴其思想,实现了自身的一套 SPI 机制。在开始介绍 Dubbo SPI 实现之前,我先来统一几个概念。
- 扩展点 :通过 SPI 机制查找并加载实现的接口,也叫“扩展点”,比如前文示例中的
com.tpvlog.log.Log
接口、com.mysql.cj.jdbc.Driver
接口,都是扩展点; - 扩展点实现 :实现了扩展点的实现类,比如前文示例中的
com.tpvlog.log.impl.Log4j
; - 扩展点Wrapper类 :一种特殊的扩展点实现,有且只有一个构造函数,构造函数的参数类型扩展点,比如
Log4j (Log log)
; - 自适应扩展点实现类 :一种特殊的扩展点实现,用@Adaptive注解标注,最多只能有一个这样的实现类,作为默认的扩展点实现;
- 扩展点对象: 扩展点实现类的对象;
- 扩展点名称 :相当于扩展点实现类的一个唯一标识。
Dubbo SPI的核心框架实现,都位于dubbo-common
模块中:
3.1 特点
我们来看看Dubbo SPI具有哪些特点。
-
Dubbo SPI 可以解决类加载过程中的资源浪费问题:
JDK SPI 在查找扩展点实现类的过程中,需要遍历 SPI 配置文件中定义的所有实现类,该过程中会将这些实现类全部实例化或加载到ClassLoader中。如果 SPI 配置文件中定义了多个实现类,而我们只需要使用其中一个实现类时,就会生成不必要的对象。
例如,org.apache.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol、ThriftProtocol 等多个实现,如果使用 JDK SPI,就会加载全部实现类,导致资源的浪费。 -
Dubbo SPI 对 SPI 配置文件进行了扩展和修改,按照 SPI 配置文件的用途,将其分成了三类目录:
META-INF/dubbo/internal/
目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件;META-INF/dubbo/
目录:该目录用于存放用户自定义 SPI 配置文件;META-INF/services/
目录:该目录下的 SPI 配置文件用来兼容 JDK SPI ;
-
Dubbo 将 SPI 配置文件改成了 KV 格式,例如:
dubbo = org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
其中 key 被称为扩展点名称(ExtensionName),当我们在为一个接口查找具体实现时,可以指定扩展点名称来选择相应的扩展实现。例如,这里指定扩展点名称为 dubbo,Dubbo SPI 就知道我们要使用:`org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol` 这个扩展点实现类,只实例化这一个对象即可,无须实例化 SPI 配置文件中的其他扩展点对象。
使用 KV 格式的另一个好处是:更容易定位问题。假设我们使用的一个扩展点实现类所在的 jar 包没有引入到项目中,那么 Dubbo SPI 在抛出异常的时候,会携带该扩展点名称信息,而不是简单地提示扩展点实现类无法加载,提高了排查问题的效率。
- Dubbo SPI增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。
3.2 LoadingStrategy
前面说过,Dubbo支持三种目录的SPI文件的加载,这个过程是通过JDK SPI 方式实现的。ExtensionLoader中有一个静态字段strategies
,是 LoadingStrategy[]类型 , 代表加载策略。
LoadingStrategy 接口有三个实现类,它们是通过JDK SPI加载的,优先级是DubboInternalLoadingStrategy > DubboLoadingStrategy > ServicesLoadingStrateg
:
// ExtensionLoader.java
public class ExtensionLoader<T> {
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
private static LoadingStrategy[] loadLoadingStrategies() {
return stream(load(LoadingStrategy.class).spliterator(), false)
.sorted()
.toArray(LoadingStrategy[]::new);
}
}
来看load方法,其实就是调用了ServiceLoader.load()
,所以,本质是通过JDK SPI完成LoadingStrategy扩展实现类的加载的:
// ServiceLoader.java
public static <S> ServiceLoader<S> load(Class<S> service) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
return ServiceLoader.load(service, cl);
}
3.3 @SPI注解
在 Dubbo 中,当某个接口被 @SPI
注解修饰时,就表示该接口是 扩展点 ,@SPI
注解的 value 值指定了默认的扩展点名称:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
/**
* 扩展点名称
*/
String value() default "";
}
比如org.apache.dubbo.rpc.Protocol
就是一个扩展点:
package org.apache.dubbo.rpc;
@SPI("dubbo")
public interface Protocol {
//...
}
通过 Dubbo SPI 加载 org.apache.dubbo.rpc.Protocol
接口的实现类时,默认会加载META-INF/dubbo/internal/
目录下文件org.apache.dubbo.rpc.Protocol
中的Key为dubbo
的类:
3.4 ExtensionLoader
被@SPI
注解的扩展点,对应的实现类是如何加载进来的呢?我们知道 JDK SPI 通过 java.util.ServiceLoader 完成的,那么Dubbo呢?
事实上,Dubbo也有一个类似的类—— ExtensionLoader ,ExtensionLoader类位于dubbo-common
模块中,负责加载@SPI
注解的扩展点实现。Dubbo SPI 的核心逻辑几乎都封装在 ExtensionLoader 之中。
比如,我们可以像下面这样使用ExtensionLoader来加载名为“dubbo”的Protocol扩展点实现:
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
ExtensionLoader加载的扩展点必须有@SPI注解,并且一个扩展点只会加载一次,通过ExtensionLoader.getExtensionLoader(Class<T> type)
可以获取扩展点对应的ExtensionLoader对象, 一个扩展点对应一个ExtensionLoader对象 。
核心字段
我们先来看ExtensionLoader的核心字段:
// ExtensionLoader.java
public class ExtensionLoader<T> {
// 扩展点名称分隔符——逗号
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
// 缓存了全部 ExtensionLoader 实例,Key为扩展点,Value为加载该扩展点实现的ExtensionLoader实例
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
// 缓存了全部扩展点实现类与扩展对象的映射关系,即Key为扩展点实现类,Value为扩展对象
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);
// 当前ExtensionLoader实例负责加载的扩展点
private final Class<?> type;
// 当前ExtensionLoader实例负责加载的扩展点的对应扩展名
private String cachedDefaultName;
// 扩展点工厂
private final ExtensionFactory objectFactory;
// 缓存了该ExtensionLoader加载的扩展点实现与扩展名之间的映射关系
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();
// 缓存了该ExtensionLoader加载的扩展名与扩展点实现的映射关系
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
// 缓存了该ExtensionLoader加载的扩展名与扩展对象之间的映射关系
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>();
// 扩展点的自适应扩展对象,即有@Adaptive注解的实现类的对象
private final Holder<Object> cachedAdaptiveInstance = new Holder<>();
// 扩展点的自适应扩展实现类,即有@Adaptive注解的实现类
private volatile Class<?> cachedAdaptiveClass = null;
private volatile Throwable createAdaptiveInstanceError;
// 扩展点的Wapper实现类集合
private Set<Class<?>> cachedWrapperClasses;
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<>();
// SPI加载策略,默认三种,分别从internal/dubbo、dubbo、service目录加载SPI配置文件
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
}
创建ExtensionLoader对象
ExtensionLoader没有提供public
的构造方法,但是提供了一个getExtensionLoader
静态方法,该方法会根据传入的扩展点创建一个对应的ExtensionLoader
实例:
// ExtensionLoader.java
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
// 扩展点必须是接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
// 扩展点必须有@SPI注解
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
// 先根据扩展点从缓存查找对应ExtensionLoader实例
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 缓存中不存在则新建ExtensionLoader实例
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
// 缓存起来
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
在构造ExtensionLoader对象时,有一个判断:
- 如果扩展点是
ExtensionFactory
类型,则objectFactory字段赋null; - 如果扩展点不是
ExtensionFactory
类型,则创建ExtensionFactory扩展点的自适应实现对象,并赋给objectFactory。
// ExtensionLoader.java
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
ExtensionFactory是什么?我会在后面小节详细讲解,这里,你可以把它理解成一个创建扩展对象的工厂,ExtensionLoader在创建扩展对象时,本质都是通过内部的ExtensionFactory完成的。
获取扩展点对象
ExtensionLoader提供了一组方法用于获取扩展点对象:
- getDefaultExtension():获取默认的扩展点对象,也就是根据@SPI的value值确认;
- getExtension(String name):根据扩展名,获取扩展点对象;
- getActivateExtension(URL url, String[] values, String group):根据URL参数和扩展点名称,获取扩展对象列表;
- getAdaptiveExtension():获取当前扩展点的自适应实现对象,也就是获取@Adaptive注解的实现类的对象;
- getSupportedExtensionInstances:获取当前扩展点的所有扩展对象集合。
getExtension
getExtension方法可以根据传入的扩展名,查找扩展对象:
// ExtensionLoader.java
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
// 扩展名为“true”,则获取默认的扩展点对象
if ("true".equals(name)) {
return getDefaultExtension();
}
// 优先从缓存中查找
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
// 不存在则创建扩展对象并缓存
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
private Holder<Object> getOrCreateHolder(String name) {
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
return holder;
}
我们重点来看当缓存中不存在时,是如何创建扩展对象的,即createExtension
方法:
- 首先,根据扩展名从缓存中获取扩展实现类。如果缓存未初始化,则扫描三个 SPI 目录查找对应的扩展实现类并加载,将扩展名和扩展实现类的映射关系记录到
cachedClasses
缓存中; - 接着,根据扩展实现类从
EXTENSION_INSTANCES
缓存中查找扩展对象,如果缓存中不存在,则通过反射创建扩展对象; - 然后,调用injectExtension方法对扩展对象进行自动装配,即属性的自动注入,它的内部使用了
ExtensionFactory
去创建扩展对象; - 接着,对扩展对象进行包装;
- 最后,如果扩展实现类实现了 Lifecycle 接口,则在 initExtension() 方法中对扩展对象进行初始化。
// ExtensionLoader.java
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
private T createExtension(String name) {
// 1.加载扩展点实现类,并根据扩展名获取对应的实现类
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 2.获取扩展对象,不存在则创建实例对象并缓存
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通过反射创建扩展对象
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 3.扩展对象的自动装配(set方法设置属性)
injectExtension(instance);
// 4.遍历扩展点的Wapper实现类,注入扩展对象
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
// 注入扩展对象
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 5.初始化扩展对象
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
我们一步步来看上述的步骤,首先是 加载该扩展点的所有扩展实现类 :
private Map<String, Class<?>> getExtensionClasses() {
// 优先从缓存查找扩展点实现类,classes的Key是扩展名,Value是扩展实现类
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 缓存中不存在,则加载扩展实现类
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
上述代码内部调用了loadExtensionClasses方法,就是从SPI文件加载所有扩展实现类,当loadExtensionClasses方法执行完成后,以下几个字段就会被附上值:
cachedAdaptiveClass
: 当前扩展点的自适应扩展实现类,即@Adaptive注解的那个扩展实现类;cachedWrapperClasses
: 当前扩展点的所有Wrapper扩展实现类的集合;cachedActivates
: 当前扩展点的自动激活扩展实现类的集合;cachedNames
: 扩展点的默认扩展名,即@SPI的value值。
private Map<String, Class<?>> loadExtensionClasses() {
// 1.缓存@SPI注解上的默认扩展名
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 2.从配置文件中加载扩展实现类,默认就是三个目录:services、dubbo、dubbo/internal
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
private void cacheDefaultExtensionName() {
// 获取SPI注解
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}
// 默认扩展名
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) {
// 缓存默认扩展名
cachedDefaultName = names[0];
}
}
}
重点看loadDirectory
是如何从META-INF目录中加载扩展实现类的,本质就是遍历指定目录下的所有SPI文件,然后挨个加载:
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
// 配置文件名称
String fileName = dir + type;
try {
// 获取该路径下所有文件
Enumeration<java.net.URL> urls = null;
ClassLoader classLoader = findClassLoader();
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}
if (urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}
// 遍历这些文件并进行处理
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 读取文件内容
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim(); // 扩展名称
line = line.substring(i + 1).trim();// 扩展实现类
}
if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
// 加载扩展实现类
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name, boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
// 扩展实现类有@Adaptive注解
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
}
// 扩展实现类是一个Wapper类
else if (isWrapperClass(clazz)) {
// 缓存到集合中
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
// 对于具有@Activate注解的扩展实现类,进行缓存,key为扩展名,value为该扩展实现类
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
注意,上述的isWrapperClass
方法,用来判断扩展实现类是否是一个Wrapper类。所谓Wrapper类,就是构造函数只有一个参数且为扩展点类型,比如Logback (Log log)
。Wrapper 类会记录到 cachedWrapperClasses
(Set >类型)这个字段中进行缓存。
getDefaultExtension
getDefaultExtension方法,用于获取默认扩展名对应的扩展对象:
// ExtensionLoader.java
public T getDefaultExtension() {
// 加载该扩展点的所有扩展实现类
getExtensionClasses();
// 默认扩展名校验
if (StringUtils.isBlank(cachedDefaultName) || "true".equals(cachedDefaultName)) {
return null;
}
// 根据扩展名,查找扩展对象并返回
return getExtension(cachedDefaultName);
}
getActivateExtension
getActivateExtension方法,用于获取当前扩展点的所有可自动激活的扩展对象:
// ExtensionLoader.java
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> activateExtensions = new ArrayList<>();
// values为扩展名列表
List<String> names = values == null ? new ArrayList<>(0) : asList(values);
// 如果不含"-default",则加载所有values指定的Activates扩展实现
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
// 排除指定的扩展
if (isMatchGroup(group, activateGroup) // 判断group是否匹配
&& !names.contains(name) // 判断名称是否匹配
&& !names.contains(REMOVE_VALUE_PREFIX + name) // 判断名称
&& isActive(activateValue, url)) {
activateExtensions.add(getExtension(name));
}
}
// 排序
activateExtensions.sort(ActivateComparator.COMPARATOR);
}
// 对names指定的扩展进行专门的处理
List<T> loadedExtensions = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
loadedExtensions.add(getExtension(name));
}
}
}
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
getAdaptiveExtension
getAdaptiveExtension方法,用于获取当前扩展点的自适应扩展对象:
// ExtensionLoader.java
public T getAdaptiveExtension() {
// 先从缓存查询
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
// 缓存中不存在
if (instance == null) {
try {
// 创建自适应扩展对象
instance = createAdaptiveExtension();
// 进行缓存
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
这里关键看下createAdaptiveExtension方法,即自适应扩展对象是如何创建的:
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
// 加载扩展点的所有实现类
getExtensionClasses();
// 如果扩展点具有@Adaptive注解的实现类,那么直接返回,因为上一步加载时,已经给cachedAdaptiveClass字段赋值了
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 如果扩展点没有@Adaptive注解的实现类,则创建一个
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
getExtensionClasses方法执行后,如果扩展点具有@Adaptive注解的实现类,那么cachedAdaptiveClass
字段会赋值自适应扩展类,为空则说明当前扩展点压根没有自适应扩展实现,这个时候就需要通过createAdaptiveExtensionClass方法动态生成一个自适应实现类:
private Class<?> createAdaptiveExtensionClass() {
// 动态生成一段代码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
// 获取ClassLoader
ClassLoader classLoader = findClassLoader();
// 通过SPI机制,获取Compiler的自适应扩展对象
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 编译出动态类
return compiler.compile(code, classLoader);
}
动态编译出的自适应实现类长什么样呢?以Protocol这个扩展点为例,会生成一个名为Protocol$Adpative
的类:
public class Protocol$Adpative implements Protocol {
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (invoker == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (invoker.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = invoker.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(invoker);
}
public <T> Invoker<T> refer(Class<T> type, URL url1) throws RpcException {
if (url1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = url1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(type, url1);
}
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}
上述Protocol$Adpative
类,本质是一个代理类,它的export方法内部通过解析URL来选择到底采用哪一种Protocol,如果没有参数值,则默认采用"dubbo"扩展名的实现。
getSupportedExtensionInstances
getSupportedExtensionInstances方法,用于获取当前扩展点的所有扩展对象:
public Set<T> getSupportedExtensionInstances() {
List<T> instances = new LinkedList<>();
// 当前扩展点的所有扩展实现类
Set<String> supportedExtensions = getSupportedExtensions();
if (CollectionUtils.isNotEmpty(supportedExtensions)) {
for (String name : supportedExtensions) {
// 获取对应的扩展对象
instances.add(getExtension(name));
}
}
// 按优先级排序
sort(instances, Prioritized.COMPARATOR);
return new LinkedHashSet<>(instances);
}
public Set<String> getSupportedExtensions() {
// 加载所有扩展实现类
Map<String, Class<?>> clazzes = getExtensionClasses();
return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));
}
四、自适应扩展
什么是自适应扩展? 在实际应用场景中,一个扩展点往往会有多种实现类,因为Dubbo是基于URL驱动,所以在运行时,通过传入URL中的某些参数来动态控制具体实现,这便是Dubbo的自适应扩展特性。
那么对应的,就有一个自适应扩展对象。你可以把它理解成扩展点的默认实现对象,通过ExtensionLoader.getAdaptiveExtension()
方法获取的就是扩展点的默认实现。要实现自适应扩展,有两种方式:
- 在扩展实现类上注解
@Adaptive
:在整个Dubbo框架中,只有少数几个地方使用在类级别上,如AdaptiveExtensionFactory和AdaptiveCompiler,其余都标注在方法上; - 在扩展实现类上的方法上注解
@Adaptive
:在Dubbo初始化扩展点时会自动生成和编译一个动态的Adaptive类。
需要注意的是, 每个扩展点最多只能有一个自适应扩展实现类 。
4.1 @Adaptive注解
我们来看下@Adaptive注解,从声明可以看出,它既可以注解在类上,也可以注解在方法上:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
/**
* URL中的参数Key
*/
String[] value() default {};
}
如果一个扩展点没有自适应的扩展实现类,就会动态生成一个,具体是通过ExtensionLoader.getAdaptiveExtension
方法,上一节已经分析过,我就不赘述了。这里我们主要看下动态生成的自适应扩展实现类的规则:
package <扩展点所在包>;
public class <扩展点>$Adpative implements <扩展点> {
public <有@Adaptive注解的接口方法>(<方法参数>) {
if(是否有URL类型方法参数?) 使用该URL参数
else if(是否有方法类型上有URL属性) 使用该URL属性
# <else 在加载扩展点生成自适应扩展点类时抛异常,即加载扩展点失败!>
if(获取的URL == null) {
throw new IllegalArgumentException("url == null");
}
根据@Adaptive注解上声明的Key的顺序,从URL获致Value,作为实际扩展点名称。
如URL没有Value,则使用缺省扩展点实现。如没有扩展点, throw new IllegalStateException("Fail to get extension");
在扩展点实现调用该方法,并返回结果。
}
public <没有@Adaptive注解的接口方法>(<方法参数>) {
throw new UnsupportedOperationException("is not adaptive method!");
}
}
再举两个例子来看下,首先是@Adpative注解在方法上:
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
默认会生成这样的一个自适应扩展实现类:
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =
(org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension =
(org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
接着是@Adpative注解在类上,以AdaptiveCompiler为例,它作为Compiler扩展点的实现类,被@Adaptive在类级别修饰:
@Adaptive
public class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
4.2 ExtensionFactory
ExtensionFactory是一个扩展点工厂类,用于创建扩展点对象:
@SPI
public interface ExtensionFactory {
/**
* 获取扩展点对象.
*
* @param type 扩展点
* @param name 扩展点名称
* @return object 扩展点对象
*/
<T> T getExtension(Class<T> type, String name);
}
可以看到,ExtensionFactory本身就有@SPI
注解,也就是说它自己就是一个扩展点,Dubbo启动后会根据 JDK SPI 机制去目录META-INF/dubbo/internal
加载对应的扩展点实现:
ExtensionFactory接口有三个实现类:
SpiExtensionFactory
SpiExtensionFactory,用于从Dubbo容器中获取指定 SPI 扩展点的默认实现类对象:
// SpiExtensionFactory.java
public class SpiExtensionFactory implements ExtensionFactory {
@Override
public <T> T getExtension(Class<T> type, String name) {
// 要求扩展点type必须是接口,并且有SPI注解
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
// 获取一个ExtensionLoader实例
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
// loader.getSupportedExtensions()返回扩展点名称的集合
if (!loader.getSupportedExtensions().isEmpty()) {
// 获取自适应扩展点对象
return loader.getAdaptiveExtension();
}
}
return null;
}
}
从 SpiExtensionFactory 的实现可以看出,扩展点名称参数name
在整个过程没有被使用,返回的结果是该扩展点的 自适应对象 。
SpringExtensionFactory
SpringExtensionFactory,用于从Spring容器中获取指定 SPI 扩展点对象,也就是说,这个类其实是一个和Spring框架做黏合的类:
// SpringExtensionFactory.java
public class SpringExtensionFactory implements ExtensionFactory {
// Spring上下文对象集合
private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();
//...
@Override
public <T> T getExtension(Class<T> type, String name) {
// 要求扩展点type必须是接口,并且有SPI注解
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
return null;
}
// 遍历Spring上下文对象,找到第一个扩展点实例
for (ApplicationContext context : CONTEXTS) {
T bean = BeanFactoryUtils.getOptionalBean(context, name, type);
if (bean != null) {
return bean;
}
}
return null;
}
}
从SpringExtensionFactory的实现可以看出,扩展点对象由Spring容器管理,获取扩展点对象时就遍历Spring上下文,根据扩展点类型type
和扩展点名称name
查找,找到第一个就返回。
AdaptiveExtensionFactory
AdaptiveExtensionFactory 不实现任何具体的功能,是用来适配 SpiExtensionFactory 和 SpringExtensionFactory 的。AdaptiveExtensionFactory 会根据运行时的一些状态来选择使用 ExtensionFactory 的哪个实现:
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
// 缓存SpiExtensionFactory、SpringExtensionFactory实例
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
// ExtensionFactory本身有@SPI注解,是一个扩展点,拿到它对应的ExtensionLoader
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
// 遍历ExtensionFactory的所有扩展点实现
// loader.getSupportedExtensions()会进行排序,默认第一个是SpiExtensionFactory
for (String name : loader.getSupportedExtensions()) {
// 将扩展点实现对象添加到缓存
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
// 遍历ExtensionFactory实例,默认第一个是SpiExtensionFactory
for (ExtensionFactory factory : factories) {
// 获取扩展点对象,这样优先从Dubbo容器中获取扩展点对象,获取不到再从Spring容器获取
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
从AdaptiveExtensionFactory的实现可以看出,它在创建时,会首先把当前应用支持的所有ExtensionFactory的实例缓存起来(默认第一个是SpiExtensionFactory),后续通过该工厂获取扩展点对象时,就从这些缓存的工厂中去查找。
另外,AdaptiveExtensionFactory拥有一个@Adaptive
注解,说明它就是ExtensionFactory扩展点对应的自适应扩展实现(每个扩展点最多只能有一个自适应实现,如果所有实现都没有被@Adaptive注解,那么Dubbo会动态生成一个自适应实现类),所有对ExtensionFactory调用的地方,实际上调用的都是AdpativeExtensionFactory。
五、自动激活扩展
在Dubbo中,还有一个和 SPI 机制息息相关的注解——@Activate
。@Activate称为 自动激活扩展点 注解,主要使用在具有多个扩展实现、需要同时根据不同条件被激活的场景中。
以 Dubbo 中的 Filter 为例,org.apache.dubbo.rpc.Filter
接口有非常多的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。
5.1 @Activate注解
@Activate注解既可以作用于类上,也可以作用于方法上:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* 修饰的扩展实现类只有在URL中的分组匹配时才激活,比如是在Provider端被激活还是在Consumer端被激活
*/
String[] group() default {};
/**
* 修饰的扩展实现类只在URL参数中出现指定的key时才会被激活
*/
String[] value() default {};
/**
* 用来确定扩展实现类的排序
*/
int order() default 0;
}
5.2 激活流程
我们来看@Activate注解的扩展实现类是如何被加载的,ExtensionLoader获取扩展对象时,会从SPI配置目录下加载所有扩展实现类,并将具有@Activate注解的扩展实现类缓存到cachedActivates
字段,key为扩展名称,value为该扩展实现类。
当调用ExtensionLoader.getActivateExtension
方法时,会完成扩展实现类的自动激活流程,getActivateExtension() 方法的参数如下:
- url:URL配置信息;
- values:配置中指定的扩展名称;
- group:Provider 或 Consumer。
getActivateExtension的核心逻辑如下:
-
首先,获取默认激活的扩展集合。默认激活的扩展实现类有几个条件:
- 在 cachedActivates 集合中存在;
- @Activate 注解指定的 group 属性与当前 group 匹配;
- 扩展名没有出现在 values 中(即未在配置中明确指定,也未在配置中明确指定删除);
- URL 中出现了 @Activate 注解中指定的 Key。
-
然后,按照 @Activate 注解中的 order 属性对默认激活的扩展集合进行排序;
-
最后,按序添加自定义扩展实现类的对象。
// ExtensionLoader.java
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> activateExtensions = new ArrayList<>();
// values配置就是扩展名
List<String> names = values == null ? new ArrayList<>(0) : asList(values);
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
// 加载扩展实现类
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey(); // 扩展名
Object activate = entry.getValue(); // @Activate注解
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
// @Activate注解中的配置
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else {
continue;
}
if (isMatchGroup(group, activateGroup) // 匹配group
&& !names.contains(name) // 没有出现在values配置中的,即为默认激活的扩展实现
&& !names.contains(REMOVE_VALUE_PREFIX + name) // 通过"-"明确指定不激活该扩展实现
&& isActive(activateValue, url)) { // 检测URL中是否出现了指定的Key
activateExtensions.add(getExtension(name)); // 加载扩展实现的实例对象,这些都是激活的
}
}
// 排序
activateExtensions.sort(ActivateComparator.COMPARATOR);
}
List<T> loadedExtensions = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
// 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
// 按照顺序,将自定义的扩展添加到默认扩展集合前面
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
loadedExtensions.add(getExtension(name));
}
}
}
// 按照顺序,将自定义的扩展添加到默认扩展集合后面
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
最后,举个例子来说明上述处激活流程,便于大家理解。假设 cachedActivates
集合缓存的扩展实现如下表所示:
扩展名 | @Activate中的group | @Activate中的order |
---|---|---|
demoFilter1 | Provider | 6 |
demoFilter2 | Provider | 5 |
demoFilter3 | Provider | 4 |
demoFilter4 | Provider | 3 |
demoFilter5 | Consumer | 2 |
demoFilter6 | Provider | 1 |
如果在 Provider 端调用 ExtensionLoader.getActivateExtension()
方法时,传入的 values 配置为 "demoFilter3、-demoFilter2、default、demoFilter1",那么根据getActivateExtension的逻辑:
- 得到默认激活的扩展实现集合中有 [ demoFilter4, demoFilter6 ];
- 排序后为 [ demoFilter6, demoFilter4 ];
- 按序添加自定义扩展实例之后得到 [ demoFilter3, demoFilter6, demoFilter4, demoFilter1 ]。
六、总结
本章,我对 Dubbo SPI 机制进行了深入讲解,并类比 JDK SPI 进行了分析。Dubbo SPI 机制相对于原生的 JDK SPI 更易于使用,性能和资源消耗上也更优秀。Dubbo SPI 依赖三个核心注解:@SPI、@Adaptive、@Activate,它们是 Dubbo 框架实现扩展机制的核心,理解SPI机制是我们后续阅读Dubbo源码的基础。
最后,我对本章的核心知识点进行一个总结:
- 每一个扩展点对应一个ExtensionLoader对象;
- 一个扩展实现类,最多只有一个实例对象;
- 一个扩展点实现有多个扩展名称;
- @Adaptive注解要么作用在扩展实现类上,要么作用在扩展实现类的方法上,如果@Adaptive作为在方法上,那么Dubbo会生成一个代理类,该类的非@Adaptive方法,默认抛出异常;
- 每个扩展点最多只能有一个@Adaptive注解的扩展实现类;
- 每个扩展点可以有多个可自动激活的扩展实现类(使用@Activate注解),@Activate就是用来在运行时确认要加载哪些扩展实现类。