从本章开始,我将对 Dubbo 架构中的 Cluster 层进行分析。
在生产环境中,为了保证服务的高可用、高性能以及容错能力,我们通常会在多个服务器上运行相同的服务端程序,然后以 集群 的形式对外提供服务。根据各项性能指标的要求不同,各个服务端集群中服务实例的个数也不尽相同,从几个实例到几百个实例不等。
针对这种服务集群的情况,客户端程序需要解决一些问题,比如:
- 客户端程序是否要感知每个服务端地址?
- 客户端程序的一次请求,到底调用哪个服务端程序呢?
- 请求失败之后是重试,还是抛出异常?如果是重试,是再次请求该服务实例,还是尝试请求其它服务实例?
- 服务端集群如何做到负载均衡,负载均衡的标准是什么呢?
为了解决上述问题, Dubbo 独立出了一个实现集群功能的模块—— dubbo-cluster :
Dubbo Cluster 层隶属于 RPC 层,它在整个Dubbo 架构图中的位置如下:
一、Cluster 架构
dubbo-cluster
模块的主要功能是将多个 Provider 聚合成一个 Provider 供 Consumer 调用,其中涉及集群的容错处理、路由规则的处理以及负载均衡。下图展示了 dubbo-cluster 模块中的核心组件:
从上图可以看出,dubbo-cluster
主要包括以下四个核心接口:
- Cluster 接口 :集群容错接口,主要是在某些 Provider 节点发生故障时,让 Consumer 的调用请求能够发送到正常的 Provider 节点,从而保证服务集群的高可用;
- Directory 接口 :表示服务目录,是多个 Invoker 的集合,是后续路由规则、负载均衡策略以及集群容错的基础;
- Router 接口 :表示路由器,请求经过 Router 时会按照用户指定的规则匹配出符合条件的 Provider;
- LoadBalance 接口 :负载均衡接口,Consumer 会按照指定的负载均衡策略,从 Provider 集合中选出一个最合适的 Provider 节点来处理请求。
Cluster 层的核心流程是:
- 当调用进入 Cluster 时,Cluster 会创建一个 AbstractClusterInvoker 对象;
- 这个 AbstractClusterInvoker 首先从 Directory 中获取当前 Invoker 集合;
- 然后,按照 Router 集合进行路由,得到符合条件的 Invoker 集合;
- 最后,按照 LoadBalance 指定的负载均衡策略,得到最终要调用的 Invoker 对象。
了解了 dubbo-cluster 模块的核心架构和基础组件之后,我来按照上面架构图的顺序介绍每个接口的定义及相关实现。
二、Directory 接口
Directory 接口表示的是一个集合,该集合由多个 Invoker 构成,后续的路由处理、负载均衡、集群容错等一系列操作都是在 Directory 基础上实现的。
// Directory.java
public interface Directory<T> extends Node {
/**
* 获取服务接口的类型
*/
Class<T> getInterface();
/**
* 根据传入的Invocation请求,过滤自身维护的Invoker集合,返回符合条件的Invoker集合
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
/**
* 返回当前Directory对象维护的全部Invoker对象
*/
List<Invoker<T>> getAllInvokers();
/**
* 返回Consumer端的Url
*/
URL getConsumerUrl();
}
2.1 AbstractDirectory
AbstractDirectory 是 Directory 接口的抽象实现,其中除了维护 Consumer 端的 URL 信息,还维护了一个 RouterChain 对象,用于记录当前使用的 Router 对象集合。
AbstractDirectory 对 list() 方法的实现也比较简单,就是直接委托给了 doList() 方法,doList() 是个抽象方法,由 AbstractDirectory 的子类实现。
// AbstractDirectory.java
public abstract class AbstractDirectory<T> implements Directory<T> {
private final URL url;
private volatile boolean destroyed = false;
private volatile URL consumerUrl;
protected RouterChain<T> routerChain;
public AbstractDirectory(URL url, RouterChain<T> routerChain) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url.removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
this.consumerUrl = url.addParameters(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)))
.removeParameter(MONITOR_KEY);
setRouterChain(routerChain);
}
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);
}
@Override
public URL getUrl() {
return url;
}
protected void addRouters(List<Router> routers) {
routers = routers == null ? Collections.emptyList() : routers;
routerChain.addRouters(routers);
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
Directory 接口有 RegistryDirectory 和 StaticDirectory 两个具体实现,如下图所示:
2.2 StaticDirectory
StaticDirectory 实现中维护的 Invoker 集合是静态的,在 StaticDirectory 对象创建完成之后,不会再发生变化。
StaticDirectory 的实现比较简单:
- 在构造方法中,StaticDirectory 会接收一个 Invoker 集合,并赋值到自身的
invokers
字段中,作为底层的 Invoker 集合; - 在 doList() 方法中,StaticDirectory 会使用 RouterChain 中的 Router 从
invokers
集合中过滤出符合路由规则的 Invoker 对象集合。
// StaticDirectory.java
public class StaticDirectory<T> extends AbstractDirectory<T> {
private final List<Invoker<T>> invokers;
public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {
super(url == null && CollectionUtils.isNotEmpty(invokers) ? invokers.get(0).getUrl() : url, routerChain);
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
this.invokers = invokers;
}
@Override
public Class<T> getInterface() {
return invokers.get(0).getInterface();
}
@Override
public List<Invoker<T>> getAllInvokers() {
return invokers;
}
public void buildRouterChain() {
// 根据 URL 构造一个包含内置 Router 的 RouterChain 对象
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
// 将invokers与RouterChain关联
routerChain.setInvokers(invokers);
// 设置routerChain字段
this.setRouterChain(routerChain);
}
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
try {
// 通过RouterChain过滤出符合条件的Invoker集合
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
}
2.3 RegistryDirectory
RegistryDirectory 中维护的 Invoker 集合会随着注册中心中维护的注册信息动态发生变化,这就依赖了 ZooKeeper 等注册中心的推送能力。
RegistryDirectory 实现了 NotifyListener
接口,当注册中心中的服务信息发生变化时,RegistryDirectory 会收到变更通知,然后动态增删底层 Invoker 集合。
核心字段
我们先来看一下 RegistryDirectory 中的核心字段,在 RegistryDirectory 的构造方法中,会根据传入的注册中心 URL 初始化核心字段:
// RegistryDirectory.java
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
// 集群策略适配器,通过 Dubbo SPI 方式动态创建
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
// 路由工厂适配器,通过 Dubbo SPI 方式动态创建
private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
// 服务对应的 ServiceKey,默认由 {interface}:[group]:[version] 三部分构成
private final String serviceKey;
// 服务接口类型,例如org.apache.dubbo.demo.DemoService
private final Class<T> serviceType;
// Consumer URL 中 refer 参数解析后得到的全部 KV
private final Map<String, String> queryMap;
// 只保留 Consumer 属性的 URL,也就是由 queryMap 集合重新生成的 URL
private final URL directoryUrl;
// 是否引用多个服务组
private final boolean multiGroup;
// 使用的 Protocol 实现
private Protocol protocol;
// 使用的注册中心实现
private Registry registry;
private volatile boolean forbidden = false;
private boolean shouldRegister;
private boolean shouldSimplified;
private volatile URL overrideDirectoryUrl;
private volatile URL registeredConsumerUrl;
// 动态更新的配置信息
private volatile List<Configurator> configurators;
// Provider URL 与对应 Invoker 之间的映射,该集合会与 invokers 字段同时动态更新
private volatile Map<String, Invoker<T>> urlInvokerMap;
// 动态更新的 Invoker 集合
private volatile List<Invoker<T>> invokers;
// 当前缓存的所有 Provider 的 URL,该集合会与 invokers 字段同时动态更新
private volatile Set<URL> cachedInvokerUrls;
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
public RegistryDirectory(Class<T> serviceType, URL url) {
// 传入的url参数是注册中心的URL
super(url);
shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
// 解析refer参数值,得到其中Consumer的属性信息
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 将queryMap中的KV作为参数,重新构造URL,其中的protocol和path部分不变
this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(GROUP_KEY, "");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
}
//...
}
subscribe方法
在完成初始化之后,我们来看 subscribe()
方法,该方法会在 Consumer 进行订阅的时候被调用,内部会调用 Registry.subscribe()
完成订阅操作,同时还会将当前 RegistryDirectory 对象作为 NotifyListener 监听器添加到 Registry 中,具体实现如下:
// RegistryDirectory.java
public void subscribe(URL url) {
setConsumerUrl(url);
// 将当前RegistryDirectory对象作为ConfigurationListener记录到CONSUMER_CONFIGURATION_LISTENER中
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 完成订阅操作,注册中心的相关操作在前文已经介绍过了,这里不再重复
registry.subscribe(url, this);
}
notify方法
在 RegistryDirectory.notify() 方法中,首先会按照 category 对发生变化的 URL 进行分类,分成 configurators、routers、providers 三类,并分别对不同类型的 URL 进行处理:
- 将 configurators 类型的 URL 转化为 Configurator,保存到 configurators 字段中;
- 将 router 类型的 URL 转化为 Router,并通过 routerChain.addRouters() 方法添加 routerChain 中保存;
- 将 provider 类型的 URL 转化为 Invoker 对象,并记录到 invokers 集合和 urlInvokerMap 集合中。
notify() 方法的具体实现如下:
// RegistryDirectory.java
public synchronized void notify(List<URL> urls) {
// 按照category进行分类,分成configurators、routers、providers三类
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
// 获取configurators类型的URL,并转换成Configurator对象
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 获取routers类型的URL,并转成Router对象,添加到RouterChain中
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 获取providers类型的URL,调用refreshOverrideAndInvoker()方法进行处理
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
//... 在Dubbo3.0中会触发AddressListener监听器,但是现在AddressListener接口还没有实现,所以省略这段代码
refreshOverrideAndInvoker(providerURLs);
}
doList方法
该方法是 AbstractDirectory 留给其子类实现的一个方法,也是通过 Directory 接口获取 Invoker 集合的核心所在,具体实现如下:
// RegistryDirectory.java
public List<Invoker<T>> doList(Invocation invocation) {
// 检测forbidden字段,当该字段在refreshInvoker()过程中设置为true时,表示无Provider可用,直接抛出异常
if (forbidden) {
throw new RpcException("...");
}
if (multiGroup) {
// multiGroup为true时的特殊处理,在refreshInvoker()方法中针对multiGroup为true的场景,已经使用Router进行了筛选,所以这里直接返回接口
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
// 通过RouterChain.route()方法筛选Invoker集合,最终得到符合路由条件的Invoker集合
invokers = routerChain.route(getConsumerUrl(), invocation);
return invokers == null ? Collections.emptyList() : invokers;
}
三、总结
本章,我首先介绍了 dubbo-cluster
模块的整体架构,简单说明了 Cluster、Directory、Router、LoadBalance 四个核心接口的功能。接下来我深入介绍了 Directory 接口的定义以及 StaticDirectory、RegistryDirectory 两个类的核心实现。