2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

上一章,我讲解了dubbo-registry注册中心模块的核心接口,并对AbstractRegistry的公共方法进行了分析,由于AbstractRegistry实现了注册信息的本地缓存,所以即使Consumer节点与注册中心断开了连接,还是可以通过调用 AbstractRegistry.getCacheUrls() 方法获取本地缓存,从而得到最近注册的 Provider URL。

这其实是 AbstractRegistry 提供的容错机制,保证了服务的可靠性。我们回顾一下Registry的类继承图,可以看到Registry的子类都继承了 FailbackRegistry

202308162140412881.png

FailbackRegistry 提供了 失败重试 的能力,本质也是一种保证服务可靠性的手段。它覆盖了 AbstractRegistry 中 register()/unregister()subscribe()/unsubscribe() 以及 notify() 这五个核心方法,并基于时间轮,实现了失败重试的能力。

本章,我就来对FailbackRegistry的核心功能和源码进行分析。

一、FailbackRegistry

首先,我们来看 FailbackRegistry 的核心字段有哪些?

1.1 核心字段

FailbackRegistry的核心字段如下:

    // FailbackRegistry.java
    
    public abstract class FailbackRegistry extends AbstractRegistry {
    
        // 注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务
        private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
    
        // 取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务
        private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
    
        // 订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务
        private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
    
        // 取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务
        private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
    
        // 通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务
        private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
    
        // 重试时间间隔(毫秒)
        private final int retryPeriod;
    
        // 时间轮,用于执行失败重试操作
        private final HashedWheelTimer retryTimer;
    
        public FailbackRegistry(URL url) {
            super(url);
            this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
            retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
        }
        //...
    }

在 上述 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试时间间隔(retry.period 参数)来初始化 retryPeriod 字段,最后初始化时间轮。

1.2 重试注册

FailbackRegistry 对 register()/unregister() 方法的实现都非常类似,所以这里我只介绍其中 register() 方法的具体实现流程:

  1. 根据URL中的 accepts 参数指定的匹配模式,决定是否接受注册该URL;

  2. 调用父类 AbstractRegistry.register() 方法,将 Provider URL 写入父类的集合缓存中;

  3. 将该 Provider URL 从 failedRegistered 集合和 failedUnregistered 集合中删除,并停止相关的重试任务;

  4. 调用 doRegister() 方法执行注册,该方法由子类实现;

  5. 如果 doRegister() 方法抛出异常,则根据 URL 参数及异常类型进行分类处理,满足以下任一条件则直接抛出异常,否则创建重试任务并添加到 failedRegistered 集合中:

    • 如果满足 待注册 URL 的 check 参数为 true(默认)待注册 URL 不是 consumer 协议当前节点自身的 URL 的 check 参数也为 true
    • 抛出的异常为 SkipFailbackWrapperException
    // FailbackRegistry.java
    
    public void register(URL url) {
        // 1.根据URL中的 accepts 参数指定的匹配模式,决定是否接受注册该URL
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        // 2.调用父类方法,将 URL写入集合缓存中;
        super.register(url);
        // 3.从集合中删除,并停止相关的重试任务
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
    
        // 4.执行注册
        try {
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;
            // 5.1 检测URL check参数,决定是否直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
            // 5.2 如果不抛出异常,则创建失败重试任务,并添加到failedRegistered集合中
            addFailedRegistered(url);
        }
    }

可以看到,FailbackRegistry.register()本质是委托子类完成实际的注册操作,自己则负责根据条件决定是否进行重试。

我们继续看上述代码中的addFailedRegistered方法,该方会创建失败重试任务:

    // FailbackRegistry.java
    
    private void addFailedRegistered(URL url) {
        // 已经存在同一个失败重试任务,则无须创建,直接返回
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) {
            return;
        }
        // 创建失败重试任务
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // 将任务提交到时间轮,等待retryPeriod毫秒后执行
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

1.3 重试订阅

FailbackRegistry 对 subscribe()/unSubscribe() 方法的实现都非常类似,所以这里我只介绍其中 subscribe() 方法的具体实现流程:

  1. 调用父类 AbstractRegistry.subscribe() 方法,将URL 写入父类的集合缓存中;
  2. 将该 URL 从 failedSubscribed 集合、 failedUnsubscribed 集合、failedNotified集合中删除,并停止相关的重试任务;
  3. 调用 doSubscribe() 方法执行订阅,该方法由子类实现;
  4. 如果 doSubscribe() 方法抛出异常,则优先从父类缓存中获取订阅数据,并调用 notify() 方法。如果没有缓存相应的订阅数据,才会检查 check 参数决定是否继续抛出异常。
    // FailbackRegistry.java
    
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        // 1.调用父类方法,将 URL写入集合缓存中;
        super.subscribe(url, listener);
    
        // 2.从集合中删除,并停止相关的重试任务
        removeFailedSubscribed(url, listener);
    
        // 3.执行订阅
        try {
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;
    
            // 4.抛出异常,则优先从父类缓存中获取URL
            List<URL> urls = getCacheUrls(url);
            // 5.1 缓存不为空
            if (CollectionUtils.isNotEmpty(urls)) {
                // 调用notify方法
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } 
            // 5.2 缓存为空
            else {
                // 根据 URL 参数及异常类型进行分类处理
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }
    
            // 添加重试任务
            addFailedSubscribed(url, listener);
        }
    }

1.4 重试通知

上述流程中,当订阅异常并且父类缓存不为空时,会调用notify方法完成回调通知的逻辑,这部分逻辑由父类AbstractRegistry实现:

    // FailbackRegistry.java
    
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            // 回调通知
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // 失败,则创建相应的 FailedNotifiedTask 重试任务
            addFailedNotified(url, listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }
    
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        // 调用父类AbstractRegistry的notify方法
        super.notify(url, listener, urls);
    }

父类AbstractRegistry的notify方法,我在上一章已经分析过了,其核心逻辑之一就是回调 NotifyListener:

    // AbstractRegistry.java
    
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        //...
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            // 回调 NotifyListener
            listener.notify(categoryList);
            saveProperties(url);
        }
    }

二、重试任务

FailbackRegistry在添加重试任务时(addFailedRegistered/addFailedSubscribed等方法),实际就是创建了AbstractRetryTask的各个子类对象,然后由时间轮调用执行,如下图所示:

202308162140431982.png

2.1 AbstractRetryTask

AbstractRetryTask 中维护了当前任务关联的 URL、当前重试的次数等信息,其 run() 方法会根据重试 URL 中指定的重试次数(retry.times 参数,默认值为 3)、任务是否被取消、以及时间轮状态,决定此次任务的 doRetry() 方法是否正常执行,doRetry是一个抽象方法,由子类实现:

    // AbstractRetryTask.java
    
    public void run(Timeout timeout) throws Exception {
        // 检测定时任务状态和时间轮状态
        if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
            return;
        }
    
        // 检查重试次数
        if (times > retryTimes) {
            logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
            return;
        }
    
        // 执行重试,抽象方法,子类负责实现
        try {
            doRetry(url, registry, timeout);
        } catch (Throwable t) {
            logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
            // 异常则重新添加定时任务,等待重试
            reput(timeout, retryPeriod);
        }
    }

如果子类任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数。

下面我以重试注册的子任务类FailedRegisteredTask为例,讲解处理流程。

2.2 FailedRegisteredTask

我们来看子类FailedRegisteredTask的重试逻辑实现:

    // FailedRegisteredTask.java
    
    public final class FailedRegisteredTask extends AbstractRetryTask {
    
        private static final String NAME = "retry register";
    
        public FailedRegisteredTask(URL url, FailbackRegistry registry) {
            super(url, registry, NAME);
        }
    
        @Override
        protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
            // 重新执行注册
            registry.doRegister(url);
            // 注册成功则删除重试任务
            registry.removeFailedRegisteredTask(url);
        }
    }

三、总结

本章,我对FailbackRegistry这个具有重试功能的节点进行了深入分析,它主要是在 AbstractRegistry 的基础上,提供了重试机制,底层依赖了时间轮来完成重试任务的调度。FailbackRegistry是所有Registry节点的父类,这样各个节点就可以根据自身需求实现重试功能。

阅读全文