获取更新的配置文件
前面的checkUpdateDataIds
只是获取有更新的配置文件名,不是配置文件内容,所以后面还要去判断哪些有更新,再去获取内容。这里会立即返回内容的,不会挂去。
//轮询有配置改变的,然后去获取内容
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {//有更新的就获取一次配置内容,超时时间3秒
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);//设置配置内容
if (null != ct[1]) {
cache.setType(ct[1]);//设置配置类型
}
...
} catch (NacosException ioe) {
...
}
}
通知监听器
最后就检查配置是否有变化,有的话要进行通知,然后把初始化状态改了,最后继续调度当前任务。
for (CacheData cacheData : cacheDatas) {//不是初始化中的或者初始化集合里存在的
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();//检查是否有变化,有变化就通知
cacheData.setInitializing(false);//请求过了后就设置为不在初始化中,这样就会被挂起,如果服务器配置有更新,就会立即返回,这样就可以实现动态配置更新,又不会太多的空轮询消耗
}
}
inInitializingCacheList.clear();
executorService.execute(this);
CacheData的checkListenerMd5
这里就是遍历所有的监听器,然后看配置内容是否有变化,有的话就进行通知。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {//有改变的话就通知
safeNotifyListener(dataId, group, content, type, md5, wrap);
}
}
}
safeNotifyListener通知
创建了一个任务,封装好信息,调用监听器的receiveConfigInfo
方法接受数据处理。然后修改内容和MD5
。这里他设置了一下类加载器,包装和监听器的类加载器一样,可能跟SPI
反射调用相关。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener)listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException de) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();//一般是直接运行的
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
监听器处理
其实就是这块,添加记录,通知刷新事件,这里RefreshEventListener
会被通知,里面会进行刷新。
具体怎么刷下篇说吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵