自动同步配置的实现
NacosContextRefresher
会监听应用程序上下文完成的事件,在事件中做配置监听器的注册:
只注册一次:
registerNacosListenersForApplications给所有配置设置监听器
从注册表里获取所有的NacosPropertySource
属性源,也就是尝试过获取配置的配置源,有3
个。如果是支持刷新的,就注册监听器。
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
比如这三个,虽然可能只有一个是有内容的,但是源码里会去尝试3
个,都会封装成属性源放进来。
registerNacosListener注册监听器到配置服务
其实就是创建一个监听器,放入注册服务,如果有刷新了,就会调用innerReceive
方法,进行刷新历史的添加和刷新事件的通知,也就是你自己可以接受到这个事件后做点扩展。
private void registerNacosListener(final String groupKey, final String dataKey) {
//生成一个key,用逗号连接dataid和组
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
//放进listenerMap中,如果不存在的话就创建一个AbstractSharedListener并返回
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
//刷新个数
refreshCountIncrement();
//添加刷新历史
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
//广播刷新事件
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
...
}
});
try {
//把监听器到添加到配置服里务
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
...
}
}
获取的key
:
NacosConfigService的addListener配置服务添加监听器
其实是封装成集合添加到ClientWorker
中。
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
ClientWorker的addTenantListeners
会获取一个CacheData
,然后把监听器添加进去,其中dataId, group, tenant
作为参数传进去,后面会用做key
。
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
addCacheDataIfAbsent(服务器挂起的原因猜想)
里面做的就是先去缓存里获取,如果没有就创建一个CacheData
,里面封装了监听器集合,过滤器,还有配置信息。最后重新创建一个HashMap
,然后放进去,再把HashMap
放入cacheMap
里。这里有个cache.setInitializing(true);
表示是第一次初始化的,所以请求不会被服务器挂起,如果不是第一次,那就会被挂起,为什么要这样设计,我猜是因为这样既可以避免无效的空轮询,也可以进行动态配置更新,因为在服务器挂起的那段时间里,大致是10
秒钟,客户端设置超时是45
秒,如果有配置修改的话,就可以立即响应,把修改后的配置取过来。具体服务器是怎么实现的,后面我们分析到了就知道啦。
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
CacheData cache = getCache(dataId, group, tenant);//从缓存中获取
if (null != cache) {
return cache;//存在直接返回
}
String key = GroupKey.getKeyTenant(dataId, group, tenant);//拼接成key
synchronized (cacheMap) {//
CacheData cacheFromMap = getCache(dataId, group, tenant);
if (null != cacheFromMap) {//双重检测,有缓存了
cache = cacheFromMap;
// reset so that server not hang this check
cache.setInitializing(true);//不让服务器挂起请求,因为有正在始化呢,这里涉及到请求头Long-Pulling-Timeout-No-Hangup,如果有在初始化的就会有这个请求头,服务器看到估计就会立即处理,不会挂起,在轮询任务的处理中会看到。这里我猜是为了让CacheData快速重新获取一次,好让新的监听器起作用,否则服务器会挂起,就要等好久响应了
} else {//没缓存就创建
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
// fix issue # 1317
if (enableRemoteSyncConfig) {//是否需要同步一次
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
cache.setContent(ct[0]);
}
}
//创建一个新的
Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
copy.put(key, cache);//放入cache
cacheMap.set(copy);//设置回去
}
LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
//统计信息
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache;
}
CacheData的addListener添加监听器
根据传入的类型,调用不同的ManagerListenerWrap
构造函数,添加到监听器集合里。监听器集合是CopyOnWriteArrayList
,这个可以在写的时候提高性能,写的时候是复制一份去改的,原来的数据也能读,但是是旧的值,不过没关系,一般只修改一个元素,不影响到其他元素,其他元素照样可以读,旧的更新的是一样的数据。
public void addListener(Listener listener) {
if (null == listener) {
throw new IllegalArgumentException("listener is null");
}
ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ?//是配置改变监听器还要传入内容
new ManagerListenerWrap(listener, md5, content) : new ManagerListenerWrap(listener, md5);
if (listeners.addIfAbsent(wrap)) {
LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
listeners.size());
}
}
下篇继续,看看监听器怎么被调用到的。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵