2023-08-16  阅读(2)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

Dubbo支持多种不同的注册中心实现,从源码的dubbo-registry模块的层级目录就可以看出来:

202308162140475541.png

Dubbo 官方推荐使用 ZooKeeper 作为注册中心,所以本章我就来讲解Dubbo 对 ZooKeeper 的集成,并对相关组件的源码进行分析。

一、Dubbo目录结构

ZooKeeper 是一款用于分布式协调的开源中间件。它是一个树型的目录结构,支持变更推送。下面是 Dubbo 官方文档中的一张图,展示了 Dubbo 在 Zookeeper 中的节点层级结构:

202308162140485682.png

上图的解释如下:

  • 名为 dubbo 的节点是 Dubbo 在 Zookeeper 中的根节点;
  • Service 层的节点名称是服务接口的全限定名,比如org.apache.dubbo.demo.DemoService等;
  • Type 层的节点是 URL 的分类,一共有四种分类:providers(服务提供者列表)、consumers(服务消费者列表)、routes(路由规则列表)和 configurations(配置规则列表);
  • 不同的 Type 节点,对应 URL 层的不同 URL:Provider URL 、Consumer URL 、Routes URL 和 Configurations URL。

二、ZookeeperRegistryFactory

我在透彻理解Apache Dubbo(四)——dubbo-registry模块:核心接口中,对RegistryFactory接口,及其抽象公共实现AbstractRegistryFactory进行过分析,AbstractRegistryFactory 仅仅提供了缓存 Registry 对象等公共能力,并未真正实现 Registry 的创建,具体的创建逻辑是由子类完成的。

每一个RegistryFactory子类,只会负责创建对应类型的Registry:

    // RegistryFactory.java
    
    @SPI("dubbo")
    public interface RegistryFactory {
        /**
         * 根据URL获取对应的Registry对象
         */
        @Adaptive({"protocol"})
        Registry getRegistry(URL url);
    }

dubbo-registry-zookeeper 模块中的 SPI 配置文件中,指定了RegistryFactory 的实现类—— ZookeeperRegistryFactory:

202308162140495343.png

2.1 构造

我们先来看ZookeeperRegistryFactory的构造,非常简单,只有一个无参构造函数。需要注意的是,ZookeeperRegistryFactory 提供了一个 setZookeeperTransporter() 方法,Dubbo 会通过 SPI 机制完成ZookeeperTransporter扩展对象的自动装载:

    // ZookeeperRegistryFactory.java
    
    public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    
        private ZookeeperTransporter zookeeperTransporter;
    
        public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
            this.zookeeperTransporter = zookeeperTransporter;
        }
    
        @Override
        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    }

ZookeeperRegistryFactory 继承了 AbstractRegistryFactory,覆写了 createRegistry() 方法:创建 ZookeeperRegistry 实例,后续将由该实例完成与 Zookeeper 注册中心的交互。

2.2 ZookeeperRegistry

ZookeeperRegistry就是一个具有与Zookeeper注册中心进行通信、服务注册/订阅/通知等能力的Registry节点。在它的构造方法中,会通过 ZookeeperTransporter 创建 ZookeeperClient 实例并连接到 Zookeeper 集群,同时还会添加一个连接状态监听器。该监听器主要关注RECONNECTED状态和NEW_SESSION_CREATED状态,在当前 Dubbo 节点与 Zookeeper 的连接恢复或是 Session 恢复时,会重新进行注册/订阅,防止数据丢失:

    // ZookeeperRegistry.java
    
    public class ZookeeperRegistry extends FailbackRegistry {
        // Zookeeper中的dubbo根目录
        private final static String DEFAULT_ROOT = "dubbo";
    
        private final String root;
    
        private final Set<String> anyServices = new ConcurrentHashSet<>();
    
        private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
    
        // Zookeeper客户端,位于dubbo-remoting模块
        private final ZookeeperClient zkClient;
    
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(PATH_SEPARATOR)) {
                group = PATH_SEPARATOR + group;
            }
            this.root = group;
            // 连接Zookeeper集群
            zkClient = zookeeperTransporter.connect(url);
            // 添加一个连接状态监听器
            zkClient.addStateListener((state) -> {
                // RECONNECTED重新连接状态
                if (state == StateListener.RECONNECTED) {
                    ZookeeperRegistry.this.fetchLatestAddresses();
                } 
                // NEW_SESSION_CREATED会话新建状态
                else if (state == StateListener.NEW_SESSION_CREATED) {
                    try {
                        ZookeeperRegistry.this.recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                } else if (state == StateListener.SESSION_LOST) {
                } else if (state == StateListener.SUSPENDED) {
                } else if (state == StateListener.CONNECTED) {
                }
            });
        }
        //...
    }

ZookeeperRegistry的 doRegister() 方法和 doUnregister() 方法的实现都是通过 ZookeeperClient 找到合适的路径,然后创建(或删除)相应的 ZNode 节点。doRegister() 方法注册 Provider URL 的时,会根据 dynamic 参数决定创建临时 ZNode 节点(默认)还是持久 ZNode 节点,这样当 Provider 与 Zookeeper 会话关闭时,可以快速将变更推送到 Consumer 端。

ZookeeperRegistry的 doSubscribe() 方法的核心是通过 ZookeeperClient 在指定的 path 上添加 ChildListener 监听器,当订阅的节点发现变化时,会通过 ChildListener 监听器触发 notify() 方法,在 notify() 方法中会触发传入的 NotifyListener 监听器。

2.3 ZookeeperTransporter

我们再来看自动注入的ZookeeperTransporter是个什么东西?ZookeeperTransporter 位于dubbo-remoting模块的dubbo-remoting-zookeeper子模块中。我在前面章节讲过dubbo-remoting模块负责远程通信,其中的子模块依赖各种开源组件实现远程通信。

dubbo-remoting-zookeeper 就是在 Apache Curator 的基础上封装了一套 Zookeeper 客户端,将与 Zookeeper 的交互融合到 Dubbo 的体系之中。dubbo-remoting-zookeeper 模块中有两个核心接口: ZookeeperTransporter 接口和 ZookeeperClient 接口:

  • ZookeeperTransporter:负责创建 ZookeeperClient 对象;
  • ZookeeperClient:负责与Zookeeper注册中心通信。

也就是说,ZookeeperRegistry的服务注册/订阅/通知等能力,底层是依赖ZookeeperTransporter和ZookeeperClient来实现的:

    // ZookeeperTransporter.java
    
    @SPI("curator")
    public interface ZookeeperTransporter {
        // 与URL中指定的Zookeeper集群建立连接,并返回一个ZookeeperClient实例
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        ZookeeperClient connect(URL url);
    }

ZookeeperTransporter 接口被 @SPI 注解修饰,成为一个扩展点,默认选择扩展名为 “curator” 的实现,其中的 connect() 方法用于创建 ZookeeperClient 实例,该方法被 @Adaptive 注解修饰,所以可以通过 URL 参数中的 clienttransporter 参数覆盖 @SPI 注解指定的默认扩展对象。

CuratorZookeeperTransporter

我们来看CuratorZookeeperTransporter,它继承自AbstractZookeeperTransporter:

    // CuratorZookeeperTransporter.java
    
    public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
        @Override
        public ZookeeperClient createZookeeperClient(URL url) {
            return new CuratorZookeeperClient(url);
        }
    }

AbstractZookeeperTransporter

AbstractZookeeperTransporter 的核心功能如下:

  • 缓存 ZookeeperClient 实例;

  • 在某个 Zookeeper 节点无法连接时,切换到备用 Zookeeper 地址:

    • 在配置 Zookeeper 地址时,我们可以配置多个 Zookeeper 节点地址,这样即使一个Zookeeper 节点宕机后,Dubbo 也可以主动切换到其它 Zookeeper 节点。例如,我们可以提供如下的 URL 配置:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:8989,127.0.0.1:9999
    // AbstractZookeeperTransporter.java
    
    public abstract class AbstractZookeeperTransporter implements ZookeeperTransporter {
    
        // 缓存,Key 为 Zookeeper 节点地址,Value 是相应的 ZookeeperClient 实例
        private final Map<String, ZookeeperClient> zookeeperClientMap = new ConcurrentHashMap<>();
    
        @Override
        public ZookeeperClient connect(URL url) {
            ZookeeperClient zookeeperClient;
    
            // 从URL中解析备份Zookeeper节点地址
            List<String> addressList = getURLBackupAddress(url);
    
            // 优先从缓存获取zookeeperClient实例
            if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null 
                && zookeeperClient.isConnected()) {
                logger.info("find valid zookeeper client from the cache for address: " + url);
                return zookeeperClient;
            }
    
            synchronized (zookeeperClientMap) {
                if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null 
                    && zookeeperClient.isConnected()) {
                    logger.info("find valid zookeeper client from the cache for address: " + url);
                    return zookeeperClient;
                }
    
                // 不存在则创建并缓存,createZookeeperClient是一个抽象方法,由子类实现
                zookeeperClient = createZookeeperClient(url);
                logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
                writeToClientMap(addressList, zookeeperClient);
            }
            return zookeeperClient;
        }
    }

上述 AbstractZookeeperTransporter 的 connect() 方法主要逻辑如下:

  1. 首先解析 URL 中配置的Zookeeper节点地址;

  2. 然后从 ZookeeperClientMap 缓存(Key 为 Zookeeper 节点地址,Value 是相应的 ZookeeperClient 实例)中查找一个可用 ZookeeperClient 实例:

    • 如果查找成功,则复用 ZookeeperClient 实例;
    • 如果查找失败,则创建一个新的 ZookeeperClient 实例返回并更新 ZookeeperClientMap 缓存。

2.4 ZookeeperClient

ZookeeperClient 接口是 Dubbo 封装的 Zookeeper 客户端,该接口定义了大量的方法,都是用来与 Zookeeper 进行交互的:

    // ZookeeperClient.java
    
    public interface ZookeeperClient {
    
        // 创建 ZNode 节点
        void create(String path, boolean ephemeral);
    
        // 创建临时 ZNode 节点
        void create(String path, String content, boolean ephemeral);
    
        // 删除节点
        void delete(String path);
    
        // 获取指定节点的子节点集合
        List<String> getChildren(String path);
    
        // 关闭当前 ZookeeperClient 实例
        void close();
    
        // 获取某个节点存储的内容
        String getContent(String path);
    
        // 添加子节点监听器,监听某个ZNode节点下的子节点变化
        List<String> addChildListener(String path, ChildListener listener);
    
        // 添加数据监听器,监听某个节点存储的数据变化
        void addDataListener(String path, DataListener listener);
    
        // 添加状态监听器,监听Dubbo与Zookeeper集群的连接状态
        // SESSION_LOST、CONNECTED、RECONNECTED、SUSPENDED、NEW_SESSION_CREATED
        void addStateListener(StateListener listener);
    
        //...
    }

202308162140501614.png

AbstractZookeeperClient

AbstractZookeeperClient 是 ZookeeperClient 接口的抽象实现,主要提供了如下几项能力:

  • 缓存当前 ZookeeperClient 实例创建的持久 ZNode 节点;
  • 管理当前 ZookeeperClient 实例添加的各类监听器;
  • 管理当前 ZookeeperClient 的运行状态。
    // AbstractZookeeperClient.java
    
    public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
        // 连接超时时长,默认5秒
        protected int DEFAULT_CONNECTION_TIMEOUT_MS = 5 * 1000;
        // 客户端会话超时时长,默认60秒
        protected int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
    
        // 目标URL
        private final URL url;
    
        // 状态监听器集合
        private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
    
        // 字节点监听器集合
        private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
    
        // 数据监听器集合
        private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();
    
        // 缓存了当前 ZookeeperClient 创建的持久ZNod节点路径
        // 在创建ZNode节点之前,会先查这个缓存,而不是与Zookeeper集群交互来判断持久ZNode节点是否存在
        // 从而减少了一次与Zookeeper集群通讯的网络开销
        private final Set<String>  persistentExistNodePath = new ConcurrentHashSet<>();
    
        //...
    }

AbstractZookeeperClient 内部维护了 stateListenerslisteners 以及 childListeners 三个集合,分别管理三种类型的监听器,虽然监听内容不同,但是它们的管理方式是类似的,所以这里我只分析 listeners 集合的操作:

    // AbstractZookeeperClient.java
    
    public void addDataListener(String path, DataListener listener) {
        this.addDataListener(path, listener, null);
    }
    
    public void addDataListener(String path, DataListener listener, Executor executor) {
        // 获取指定path上的DataListener集合
        ConcurrentMap<DataListener, TargetDataListener> dataListenerMap = 
            listeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
    
        // 查询该DataListener关联的TargetDataListener
        TargetDataListener targetListener = dataListenerMap.computeIfAbsent(listener,
             k -> createTargetDataListener(path, k));
    
        // 通过TargetDataListener在指定的path上添加监听
        addTargetDataListener(path, targetListener, executor);
    }
    
    protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener);
    
    protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor);

上述的 createTargetDataListener() 方法和 addTargetDataListener() 方法都是抽象方法,由 AbstractZookeeperClient 的子类实现。

TargetDataListener 是 AbstractZookeeperClient 中标记的一个泛型,为什么 AbstractZookeeperClient 要使用泛型定义?

因为不同的 ZookeeperClient 实现可能依赖不同的 Zookeeper 客户端组件,不同 Zookeeper 客户端组件的监听器实现也有所不同,而整个 dubbo-remoting-zookeeper 模块对外暴露的监听器是统一的,就是上面介绍的那三种。因此,这时就需要一层转换进行解耦,这层解耦就是通过 TargetDataListener 完成的。

Dubbo 2.7.x 版本中只支持Apache Curator,老版本 Dubbo 2.6.5 中,ZookeeperClient 使用了 ZkClient 。

CuratorZookeeperClient

CuratorZookeeperClient 在构造时会初始化 Curator 客户端并阻塞等待连接成功,CuratorZookeeperClient 与 Zookeeper 交互的全部操作,都是围绕着 Apache Curator 客户端展开的:

    // CuratorZookeeperClient.java
    
    public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
    
        public CuratorZookeeperClient(URL url) {
            super(url);
            try {
                // 解析连接超时参数
                int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
                // 解析绘画超时参数
                int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
    
                // 使用Curator建立与Zookeeper集群的连接
                CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                        .connectString(url.getBackupAddress())
                        .retryPolicy(new RetryNTimes(1, 1000))
                        .connectionTimeoutMs(timeout)
                        .sessionTimeoutMs(sessionExpireMs);
                String authority = url.getAuthority();
                if (authority != null && authority.length() > 0) {
                    builder = builder.authorization("digest", authority.getBytes());
                }
                client = builder.build();
                // 添加连接状态的监听
                client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
                client.start();
                // 阻塞等待直到建立连接
                boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
                if (!connected) {
                    throw new IllegalStateException("zookeeper not connected");
                }
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        //...
    }

CuratorWatcherImpl 是 CuratorZookeeperClient 实现 AbstractZookeeperClient 时指定的泛型类,它实现了 TreeCacheListener 接口,可以添加到 TreeCache 上监听自身节点以及子节点的变化。

CuratorZookeeperClient.childEvent() 方法的实现中我们可以看到,当 TreeCache 关注的树型结构发生变化时,会将触发事件的路径、节点内容以及事件类型传递给关联的 DataListener 实例进行回调:

    // CuratorZookeeperClient.CuratorWatcherImpl.java
    
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
        if (dataListener != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
            }
            TreeCacheEvent.Type type = event.getType();
            EventType eventType = null;
            String content = null;
            String path = null;
            switch (type) {
                case NODE_ADDED:
                    eventType = EventType.NodeCreated;
                    path = event.getData().getPath();
                    content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                    break;
                case NODE_UPDATED:
                    eventType = EventType.NodeDataChanged;
                    path = event.getData().getPath();
                    content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
                    break;
                case NODE_REMOVED:
                    path = event.getData().getPath();
                    eventType = EventType.NodeDeleted;
                    break;
                case INITIALIZED:
                    eventType = EventType.INITIALIZED;
                    break;
                case CONNECTION_LOST:
                    eventType = EventType.CONNECTION_LOST;
                    break;
                case CONNECTION_RECONNECTED:
                    eventType = EventType.CONNECTION_RECONNECTED;
                    break;
                case CONNECTION_SUSPENDED:
                    eventType = EventType.CONNECTION_SUSPENDED;
                    break;
    
            }
            // 回调DataListener,传递触发事件的path、节点内容以及事件类型
            dataListener.dataChanged(path, content, eventType);
        }
    }

CuratorZookeeperClient.addTargetDataListener() 方法中,我们可以看到 TreeCache 的创建、启动逻辑以及添加 CuratorWatcherImpl 监听的逻辑:

    // CuratorZookeeperClient.java
    
    protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
        try {
            // 创建TreeCache
            TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
            // 缓存TreeCache
            treeCacheMap.putIfAbsent(path, treeCache);
    
            // 添加监听
            if (executor == null) {
                treeCache.getListenable().addListener(treeCacheListener);
            } else {
                treeCache.getListenable().addListener(treeCacheListener, executor);
            }
            //启动
            treeCache.start();
        } catch (Exception e) {
            throw new IllegalStateException("Add treeCache listener for path:" + path, e);
        }
    }

dubbo-remoting-zookeeper 模块的核心实现就是上述介绍的CuratorZookeeperClient、CuratorZookeeperTransporter,该模块是 Dubbo 与 Zookeeper 交互的基础。

三、总结

本章,我对 Dubbo 接入 Zookeeper 注册中心的核心实现进行了讲解,与 Zookeeper 注册中心的通信事实上是通过dubbo-remoting模块中的 ZookeeperTransporter 和 ZookeeperClient 完成的。

ZookeeperClient 本质就是一个Zookeeper客户端,Dubbo在2.7.x中,只提供了基于Apache Curator的ZookeeperClient 实现,而ZookeeperRegistry底层就是依赖ZookeeperClient 完成与注册中心之间的通信,从而实现实现 Registry 的相关功能。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文