Dubbo支持多种不同的注册中心实现,从源码的dubbo-registry
模块的层级目录就可以看出来:
Dubbo 官方推荐使用 ZooKeeper 作为注册中心,所以本章我就来讲解Dubbo 对 ZooKeeper 的集成,并对相关组件的源码进行分析。
一、Dubbo目录结构
ZooKeeper 是一款用于分布式协调的开源中间件。它是一个树型的目录结构,支持变更推送。下面是 Dubbo 官方文档中的一张图,展示了 Dubbo 在 Zookeeper 中的节点层级结构:
上图的解释如下:
- 名为
dubbo
的节点是 Dubbo 在 Zookeeper 中的根节点; - Service 层的节点名称是服务接口的全限定名,比如
org.apache.dubbo.demo.DemoService
等; - Type 层的节点是 URL 的分类,一共有四种分类:providers(服务提供者列表)、consumers(服务消费者列表)、routes(路由规则列表)和 configurations(配置规则列表);
- 不同的 Type 节点,对应 URL 层的不同 URL:Provider URL 、Consumer URL 、Routes URL 和 Configurations URL。
二、ZookeeperRegistryFactory
我在透彻理解Apache Dubbo(四)——dubbo-registry模块:核心接口中,对RegistryFactory
接口,及其抽象公共实现AbstractRegistryFactory
进行过分析,AbstractRegistryFactory 仅仅提供了缓存 Registry 对象等公共能力,并未真正实现 Registry 的创建,具体的创建逻辑是由子类完成的。
每一个RegistryFactory子类,只会负责创建对应类型的Registry:
// RegistryFactory.java
@SPI("dubbo")
public interface RegistryFactory {
/**
* 根据URL获取对应的Registry对象
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
在 dubbo-registry-zookeeper
模块中的 SPI 配置文件中,指定了RegistryFactory 的实现类—— ZookeeperRegistryFactory:
2.1 构造
我们先来看ZookeeperRegistryFactory的构造,非常简单,只有一个无参构造函数。需要注意的是,ZookeeperRegistryFactory 提供了一个 setZookeeperTransporter()
方法,Dubbo 会通过 SPI 机制完成ZookeeperTransporter
扩展对象的自动装载:
// ZookeeperRegistryFactory.java
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
ZookeeperRegistryFactory 继承了 AbstractRegistryFactory,覆写了 createRegistry()
方法:创建 ZookeeperRegistry
实例,后续将由该实例完成与 Zookeeper 注册中心的交互。
2.2 ZookeeperRegistry
ZookeeperRegistry就是一个具有与Zookeeper注册中心进行通信、服务注册/订阅/通知等能力的Registry节点。在它的构造方法中,会通过 ZookeeperTransporter 创建 ZookeeperClient 实例并连接到 Zookeeper 集群,同时还会添加一个连接状态监听器。该监听器主要关注RECONNECTED
状态和NEW_SESSION_CREATED
状态,在当前 Dubbo 节点与 Zookeeper 的连接恢复或是 Session 恢复时,会重新进行注册/订阅,防止数据丢失:
// ZookeeperRegistry.java
public class ZookeeperRegistry extends FailbackRegistry {
// Zookeeper中的dubbo根目录
private final static String DEFAULT_ROOT = "dubbo";
private final String root;
private final Set<String> anyServices = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
// Zookeeper客户端,位于dubbo-remoting模块
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 连接Zookeeper集群
zkClient = zookeeperTransporter.connect(url);
// 添加一个连接状态监听器
zkClient.addStateListener((state) -> {
// RECONNECTED重新连接状态
if (state == StateListener.RECONNECTED) {
ZookeeperRegistry.this.fetchLatestAddresses();
}
// NEW_SESSION_CREATED会话新建状态
else if (state == StateListener.NEW_SESSION_CREATED) {
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
//...
}
ZookeeperRegistry的 doRegister()
方法和 doUnregister()
方法的实现都是通过 ZookeeperClient 找到合适的路径,然后创建(或删除)相应的 ZNode 节点。doRegister()
方法注册 Provider URL 的时,会根据 dynamic
参数决定创建临时 ZNode 节点(默认)还是持久 ZNode 节点,这样当 Provider 与 Zookeeper 会话关闭时,可以快速将变更推送到 Consumer 端。
ZookeeperRegistry的 doSubscribe()
方法的核心是通过 ZookeeperClient 在指定的 path 上添加 ChildListener 监听器,当订阅的节点发现变化时,会通过 ChildListener 监听器触发 notify()
方法,在 notify() 方法中会触发传入的 NotifyListener 监听器。
2.3 ZookeeperTransporter
我们再来看自动注入的ZookeeperTransporter是个什么东西?ZookeeperTransporter 位于dubbo-remoting
模块的dubbo-remoting-zookeeper
子模块中。我在前面章节讲过dubbo-remoting
模块负责远程通信,其中的子模块依赖各种开源组件实现远程通信。
dubbo-remoting-zookeeper
就是在 Apache Curator 的基础上封装了一套 Zookeeper 客户端,将与 Zookeeper 的交互融合到 Dubbo 的体系之中。dubbo-remoting-zookeeper
模块中有两个核心接口: ZookeeperTransporter 接口和 ZookeeperClient 接口:
- ZookeeperTransporter:负责创建 ZookeeperClient 对象;
- ZookeeperClient:负责与Zookeeper注册中心通信。
也就是说,ZookeeperRegistry的服务注册/订阅/通知等能力,底层是依赖ZookeeperTransporter和ZookeeperClient来实现的:
// ZookeeperTransporter.java
@SPI("curator")
public interface ZookeeperTransporter {
// 与URL中指定的Zookeeper集群建立连接,并返回一个ZookeeperClient实例
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
ZookeeperTransporter 接口被 @SPI 注解修饰,成为一个扩展点,默认选择扩展名为 “curator” 的实现,其中的 connect() 方法用于创建 ZookeeperClient 实例,该方法被 @Adaptive 注解修饰,所以可以通过 URL 参数中的 client
或 transporter
参数覆盖 @SPI 注解指定的默认扩展对象。
CuratorZookeeperTransporter
我们来看CuratorZookeeperTransporter
,它继承自AbstractZookeeperTransporter:
// CuratorZookeeperTransporter.java
public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
@Override
public ZookeeperClient createZookeeperClient(URL url) {
return new CuratorZookeeperClient(url);
}
}
AbstractZookeeperTransporter
AbstractZookeeperTransporter 的核心功能如下:
-
缓存 ZookeeperClient 实例;
-
在某个 Zookeeper 节点无法连接时,切换到备用 Zookeeper 地址:
- 在配置 Zookeeper 地址时,我们可以配置多个 Zookeeper 节点地址,这样即使一个Zookeeper 节点宕机后,Dubbo 也可以主动切换到其它 Zookeeper 节点。例如,我们可以提供如下的 URL 配置:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?backup=127.0.0.1:8989,127.0.0.1:9999
。
- 在配置 Zookeeper 地址时,我们可以配置多个 Zookeeper 节点地址,这样即使一个Zookeeper 节点宕机后,Dubbo 也可以主动切换到其它 Zookeeper 节点。例如,我们可以提供如下的 URL 配置:
// AbstractZookeeperTransporter.java
public abstract class AbstractZookeeperTransporter implements ZookeeperTransporter {
// 缓存,Key 为 Zookeeper 节点地址,Value 是相应的 ZookeeperClient 实例
private final Map<String, ZookeeperClient> zookeeperClientMap = new ConcurrentHashMap<>();
@Override
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
// 从URL中解析备份Zookeeper节点地址
List<String> addressList = getURLBackupAddress(url);
// 优先从缓存获取zookeeperClient实例
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null
&& zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null
&& zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// 不存在则创建并缓存,createZookeeperClient是一个抽象方法,由子类实现
zookeeperClient = createZookeeperClient(url);
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
}
上述 AbstractZookeeperTransporter 的 connect() 方法主要逻辑如下:
-
首先解析 URL 中配置的Zookeeper节点地址;
-
然后从 ZookeeperClientMap 缓存(Key 为 Zookeeper 节点地址,Value 是相应的 ZookeeperClient 实例)中查找一个可用 ZookeeperClient 实例:
- 如果查找成功,则复用 ZookeeperClient 实例;
- 如果查找失败,则创建一个新的 ZookeeperClient 实例返回并更新 ZookeeperClientMap 缓存。
2.4 ZookeeperClient
ZookeeperClient 接口是 Dubbo 封装的 Zookeeper 客户端,该接口定义了大量的方法,都是用来与 Zookeeper 进行交互的:
// ZookeeperClient.java
public interface ZookeeperClient {
// 创建 ZNode 节点
void create(String path, boolean ephemeral);
// 创建临时 ZNode 节点
void create(String path, String content, boolean ephemeral);
// 删除节点
void delete(String path);
// 获取指定节点的子节点集合
List<String> getChildren(String path);
// 关闭当前 ZookeeperClient 实例
void close();
// 获取某个节点存储的内容
String getContent(String path);
// 添加子节点监听器,监听某个ZNode节点下的子节点变化
List<String> addChildListener(String path, ChildListener listener);
// 添加数据监听器,监听某个节点存储的数据变化
void addDataListener(String path, DataListener listener);
// 添加状态监听器,监听Dubbo与Zookeeper集群的连接状态
// SESSION_LOST、CONNECTED、RECONNECTED、SUSPENDED、NEW_SESSION_CREATED
void addStateListener(StateListener listener);
//...
}
AbstractZookeeperClient
AbstractZookeeperClient 是 ZookeeperClient 接口的抽象实现,主要提供了如下几项能力:
- 缓存当前 ZookeeperClient 实例创建的持久 ZNode 节点;
- 管理当前 ZookeeperClient 实例添加的各类监听器;
- 管理当前 ZookeeperClient 的运行状态。
// AbstractZookeeperClient.java
public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
// 连接超时时长,默认5秒
protected int DEFAULT_CONNECTION_TIMEOUT_MS = 5 * 1000;
// 客户端会话超时时长,默认60秒
protected int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
// 目标URL
private final URL url;
// 状态监听器集合
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
// 字节点监听器集合
private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
// 数据监听器集合
private final ConcurrentMap<String, ConcurrentMap<DataListener, TargetDataListener>> listeners = new ConcurrentHashMap<String, ConcurrentMap<DataListener, TargetDataListener>>();
// 缓存了当前 ZookeeperClient 创建的持久ZNod节点路径
// 在创建ZNode节点之前,会先查这个缓存,而不是与Zookeeper集群交互来判断持久ZNode节点是否存在
// 从而减少了一次与Zookeeper集群通讯的网络开销
private final Set<String> persistentExistNodePath = new ConcurrentHashSet<>();
//...
}
AbstractZookeeperClient 内部维护了 stateListeners
、listeners
以及 childListeners
三个集合,分别管理三种类型的监听器,虽然监听内容不同,但是它们的管理方式是类似的,所以这里我只分析 listeners 集合的操作:
// AbstractZookeeperClient.java
public void addDataListener(String path, DataListener listener) {
this.addDataListener(path, listener, null);
}
public void addDataListener(String path, DataListener listener, Executor executor) {
// 获取指定path上的DataListener集合
ConcurrentMap<DataListener, TargetDataListener> dataListenerMap =
listeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
// 查询该DataListener关联的TargetDataListener
TargetDataListener targetListener = dataListenerMap.computeIfAbsent(listener,
k -> createTargetDataListener(path, k));
// 通过TargetDataListener在指定的path上添加监听
addTargetDataListener(path, targetListener, executor);
}
protected abstract TargetDataListener createTargetDataListener(String path, DataListener listener);
protected abstract void addTargetDataListener(String path, TargetDataListener listener, Executor executor);
上述的 createTargetDataListener()
方法和 addTargetDataListener()
方法都是抽象方法,由 AbstractZookeeperClient 的子类实现。
TargetDataListener 是 AbstractZookeeperClient 中标记的一个泛型,为什么 AbstractZookeeperClient 要使用泛型定义?
因为不同的 ZookeeperClient 实现可能依赖不同的 Zookeeper 客户端组件,不同 Zookeeper 客户端组件的监听器实现也有所不同,而整个 dubbo-remoting-zookeeper
模块对外暴露的监听器是统一的,就是上面介绍的那三种。因此,这时就需要一层转换进行解耦,这层解耦就是通过 TargetDataListener 完成的。
Dubbo 2.7.x 版本中只支持Apache Curator,老版本 Dubbo 2.6.5 中,ZookeeperClient 使用了 ZkClient 。
CuratorZookeeperClient
CuratorZookeeperClient 在构造时会初始化 Curator 客户端并阻塞等待连接成功,CuratorZookeeperClient 与 Zookeeper 交互的全部操作,都是围绕着 Apache Curator 客户端展开的:
// CuratorZookeeperClient.java
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorZookeeperClient.CuratorWatcherImpl, CuratorZookeeperClient.CuratorWatcherImpl> {
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 解析连接超时参数
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
// 解析绘画超时参数
int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
// 使用Curator建立与Zookeeper集群的连接
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout)
.sessionTimeoutMs(sessionExpireMs);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
client = builder.build();
// 添加连接状态的监听
client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
client.start();
// 阻塞等待直到建立连接
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
//...
}
CuratorWatcherImpl 是 CuratorZookeeperClient 实现 AbstractZookeeperClient 时指定的泛型类,它实现了 TreeCacheListener 接口,可以添加到 TreeCache 上监听自身节点以及子节点的变化。
在 CuratorZookeeperClient.childEvent()
方法的实现中我们可以看到,当 TreeCache 关注的树型结构发生变化时,会将触发事件的路径、节点内容以及事件类型传递给关联的 DataListener 实例进行回调:
// CuratorZookeeperClient.CuratorWatcherImpl.java
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (dataListener != null) {
if (logger.isDebugEnabled()) {
logger.debug("listen the zookeeper changed. The changed data:" + event.getData());
}
TreeCacheEvent.Type type = event.getType();
EventType eventType = null;
String content = null;
String path = null;
switch (type) {
case NODE_ADDED:
eventType = EventType.NodeCreated;
path = event.getData().getPath();
content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
break;
case NODE_UPDATED:
eventType = EventType.NodeDataChanged;
path = event.getData().getPath();
content = event.getData().getData() == null ? "" : new String(event.getData().getData(), CHARSET);
break;
case NODE_REMOVED:
path = event.getData().getPath();
eventType = EventType.NodeDeleted;
break;
case INITIALIZED:
eventType = EventType.INITIALIZED;
break;
case CONNECTION_LOST:
eventType = EventType.CONNECTION_LOST;
break;
case CONNECTION_RECONNECTED:
eventType = EventType.CONNECTION_RECONNECTED;
break;
case CONNECTION_SUSPENDED:
eventType = EventType.CONNECTION_SUSPENDED;
break;
}
// 回调DataListener,传递触发事件的path、节点内容以及事件类型
dataListener.dataChanged(path, content, eventType);
}
}
在 CuratorZookeeperClient.addTargetDataListener()
方法中,我们可以看到 TreeCache 的创建、启动逻辑以及添加 CuratorWatcherImpl 监听的逻辑:
// CuratorZookeeperClient.java
protected void addTargetDataListener(String path, CuratorZookeeperClient.CuratorWatcherImpl treeCacheListener, Executor executor) {
try {
// 创建TreeCache
TreeCache treeCache = TreeCache.newBuilder(client, path).setCacheData(false).build();
// 缓存TreeCache
treeCacheMap.putIfAbsent(path, treeCache);
// 添加监听
if (executor == null) {
treeCache.getListenable().addListener(treeCacheListener);
} else {
treeCache.getListenable().addListener(treeCacheListener, executor);
}
//启动
treeCache.start();
} catch (Exception e) {
throw new IllegalStateException("Add treeCache listener for path:" + path, e);
}
}
dubbo-remoting-zookeeper
模块的核心实现就是上述介绍的CuratorZookeeperClient、CuratorZookeeperTransporter,该模块是 Dubbo 与 Zookeeper 交互的基础。
三、总结
本章,我对 Dubbo 接入 Zookeeper 注册中心的核心实现进行了讲解,与 Zookeeper 注册中心的通信事实上是通过dubbo-remoting
模块中的 ZookeeperTransporter 和 ZookeeperClient 完成的。
ZookeeperClient 本质就是一个Zookeeper客户端,Dubbo在2.7.x
中,只提供了基于Apache Curator的ZookeeperClient 实现,而ZookeeperRegistry底层就是依赖ZookeeperClient 完成与注册中心之间的通信,从而实现实现 Registry 的相关功能。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。