上一章,我讲解了dubbo-registry
注册中心模块的核心接口,并对AbstractRegistry的公共方法进行了分析,由于AbstractRegistry实现了注册信息的本地缓存,所以即使Consumer节点与注册中心断开了连接,还是可以通过调用 AbstractRegistry.getCacheUrls()
方法获取本地缓存,从而得到最近注册的 Provider URL。
这其实是 AbstractRegistry 提供的容错机制,保证了服务的可靠性。我们回顾一下Registry的类继承图,可以看到Registry的子类都继承了 FailbackRegistry :
FailbackRegistry 提供了 失败重试 的能力,本质也是一种保证服务可靠性的手段。它覆盖了 AbstractRegistry 中 register()/unregister()
、subscribe()/unsubscribe()
以及 notify()
这五个核心方法,并基于时间轮,实现了失败重试的能力。
本章,我就来对FailbackRegistry的核心功能和源码进行分析。
一、FailbackRegistry
首先,我们来看 FailbackRegistry 的核心字段有哪些?
1.1 核心字段
FailbackRegistry的核心字段如下:
// FailbackRegistry.java
public abstract class FailbackRegistry extends AbstractRegistry {
// 注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
// 取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
// 订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
// 取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
// 通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
// 重试时间间隔(毫秒)
private final int retryPeriod;
// 时间轮,用于执行失败重试操作
private final HashedWheelTimer retryTimer;
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
//...
}
在 上述 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试时间间隔(retry.period
参数)来初始化 retryPeriod 字段,最后初始化时间轮。
1.2 重试注册
FailbackRegistry 对 register()/unregister()
方法的实现都非常类似,所以这里我只介绍其中 register() 方法的具体实现流程:
-
根据URL中的 accepts 参数指定的匹配模式,决定是否接受注册该URL;
-
调用父类
AbstractRegistry.register()
方法,将 Provider URL 写入父类的集合缓存中; -
将该 Provider URL 从
failedRegistered
集合和failedUnregistered
集合中删除,并停止相关的重试任务; -
调用 doRegister() 方法执行注册,该方法由子类实现;
-
如果 doRegister() 方法抛出异常,则根据 URL 参数及异常类型进行分类处理,满足以下任一条件则直接抛出异常,否则创建重试任务并添加到 failedRegistered 集合中:
- 如果满足 待注册 URL 的 check 参数为 true(默认) 且 待注册 URL 不是 consumer 协议 且 当前节点自身的 URL 的 check 参数也为 true
- 抛出的异常为
SkipFailbackWrapperException
。
// FailbackRegistry.java
public void register(URL url) {
// 1.根据URL中的 accepts 参数指定的匹配模式,决定是否接受注册该URL
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
// 2.调用父类方法,将 URL写入集合缓存中;
super.register(url);
// 3.从集合中删除,并停止相关的重试任务
removeFailedRegistered(url);
removeFailedUnregistered(url);
// 4.执行注册
try {
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 5.1 检测URL check参数,决定是否直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 5.2 如果不抛出异常,则创建失败重试任务,并添加到failedRegistered集合中
addFailedRegistered(url);
}
}
可以看到,FailbackRegistry.register()
本质是委托子类完成实际的注册操作,自己则负责根据条件决定是否进行重试。
我们继续看上述代码中的addFailedRegistered
方法,该方会创建失败重试任务:
// FailbackRegistry.java
private void addFailedRegistered(URL url) {
// 已经存在同一个失败重试任务,则无须创建,直接返回
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
// 创建失败重试任务
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// 将任务提交到时间轮,等待retryPeriod毫秒后执行
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
1.3 重试订阅
FailbackRegistry 对 subscribe()/unSubscribe()
方法的实现都非常类似,所以这里我只介绍其中 subscribe() 方法的具体实现流程:
- 调用父类
AbstractRegistry.subscribe()
方法,将URL 写入父类的集合缓存中; - 将该 URL 从
failedSubscribed
集合、failedUnsubscribed
集合、failedNotified
集合中删除,并停止相关的重试任务; - 调用 doSubscribe() 方法执行订阅,该方法由子类实现;
- 如果 doSubscribe() 方法抛出异常,则优先从父类缓存中获取订阅数据,并调用 notify() 方法。如果没有缓存相应的订阅数据,才会检查 check 参数决定是否继续抛出异常。
// FailbackRegistry.java
@Override
public void subscribe(URL url, NotifyListener listener) {
// 1.调用父类方法,将 URL写入集合缓存中;
super.subscribe(url, listener);
// 2.从集合中删除,并停止相关的重试任务
removeFailedSubscribed(url, listener);
// 3.执行订阅
try {
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
// 4.抛出异常,则优先从父类缓存中获取URL
List<URL> urls = getCacheUrls(url);
// 5.1 缓存不为空
if (CollectionUtils.isNotEmpty(urls)) {
// 调用notify方法
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
}
// 5.2 缓存为空
else {
// 根据 URL 参数及异常类型进行分类处理
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 添加重试任务
addFailedSubscribed(url, listener);
}
}
1.4 重试通知
上述流程中,当订阅异常并且父类缓存不为空时,会调用notify方法完成回调通知的逻辑,这部分逻辑由父类AbstractRegistry实现:
// FailbackRegistry.java
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// 回调通知
doNotify(url, listener, urls);
} catch (Exception t) {
// 失败,则创建相应的 FailedNotifiedTask 重试任务
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 调用父类AbstractRegistry的notify方法
super.notify(url, listener, urls);
}
父类AbstractRegistry的notify方法,我在上一章已经分析过了,其核心逻辑之一就是回调 NotifyListener:
// AbstractRegistry.java
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//...
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 回调 NotifyListener
listener.notify(categoryList);
saveProperties(url);
}
}
二、重试任务
FailbackRegistry在添加重试任务时(addFailedRegistered/addFailedSubscribed等方法),实际就是创建了AbstractRetryTask的各个子类对象,然后由时间轮调用执行,如下图所示:
2.1 AbstractRetryTask
AbstractRetryTask 中维护了当前任务关联的 URL、当前重试的次数等信息,其 run() 方法会根据重试 URL 中指定的重试次数(retry.times
参数,默认值为 3)、任务是否被取消、以及时间轮状态,决定此次任务的 doRetry()
方法是否正常执行,doRetry是一个抽象方法,由子类实现:
// AbstractRetryTask.java
public void run(Timeout timeout) throws Exception {
// 检测定时任务状态和时间轮状态
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
return;
}
// 检查重试次数
if (times > retryTimes) {
logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
return;
}
// 执行重试,抽象方法,子类负责实现
try {
doRetry(url, registry, timeout);
} catch (Throwable t) {
logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
// 异常则重新添加定时任务,等待重试
reput(timeout, retryPeriod);
}
}
如果子类任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数。
下面我以重试注册的子任务类FailedRegisteredTask为例,讲解处理流程。
2.2 FailedRegisteredTask
我们来看子类FailedRegisteredTask的重试逻辑实现:
// FailedRegisteredTask.java
public final class FailedRegisteredTask extends AbstractRetryTask {
private static final String NAME = "retry register";
public FailedRegisteredTask(URL url, FailbackRegistry registry) {
super(url, registry, NAME);
}
@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
// 重新执行注册
registry.doRegister(url);
// 注册成功则删除重试任务
registry.removeFailedRegisteredTask(url);
}
}
三、总结
本章,我对FailbackRegistry这个具有重试功能的节点进行了深入分析,它主要是在 AbstractRegistry 的基础上,提供了重试机制,底层依赖了时间轮来完成重试任务的调度。FailbackRegistry是所有Registry节点的父类,这样各个节点就可以根据自身需求实现重试功能。