服务获取和刷新的流程图
NacosNamingService的selectInstances
前面说了,负载均衡器初始化的时候会最后调用NacosNamingService
的selectInstances
方法,最终到这里,因为是订阅标记是true
,所以直接走hostReactor.getServiceInfo
,内部如果发现服务有更新会进行监听器的通知,所以叫做订阅:
HostReactor的getServiceInfo
先判断容灾模式,如果已经出故障了,直接从本地缓存读取。否则就尝试获取服务信息,如果没有就创建一个,然后立即去注册中心获取服务,这里用了一个updatingMap
来表示再更新的服务,然后进行更新,里面涉及更新后的结果和老的做比对,如果有改变,会对Notifier
任务进行改变通知,其实就是改变了阻塞队列changedServices
,Notifier
检查到他有改变,就会进行处理,去通知监听器有NamingEvent
事件,不过貌似目前还没有监听这个事件的监听器。完了最后再判断是否这个服务已开启UpdateTask
任务更新,没有的话就开启一个UpdateTask
任务进行更新,这里才是真正更新服务,对已有的服务进行更新,而负载均衡的PollingServerListUpdater
任务,更多的是获取服务,真正的服务更新还是UpdateTask
任务做的。
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {//已经到容灾模式
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {//不存在就创建
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
//updatingMap表示在更新了
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
//正在更新实例
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
HostReactor的scheduleUpdateIfAbsent
双重检测,添加一个UpdateTask
任务。这个任务具体干什么,我在前面的文章
里有,就不多说了。
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
如果有新的服务获取到,或者有服务改变了,Notifier
任务会感知到,然后去通知监听器,不过没监听器监听NamingEvent
事件:
PushReceiver接受推送
一旦注册中心有服务改变了,就会进行推送,如果发现是dom类型的,表示有更新,所以就调用hostReactor.processServiceJSON
,内部会进行更新,通知等操作,具体跟前面讲过的updateServiceNow
的结果处理一样:
然后根据类型进行相应的应答UDP
报文就好了:
因为你请求刷新的时候会把UDP
端口传过去:
好了,服务刷新是如何做到的,是什么时候刷新的现在基本已经了解了吧,除了自己去更新服务信息,服务端还会UDP
推送过来。 流程图也有了,可以自己debug
一下加深理解,当然还有细节只能自己看啦,面面俱到我也做不到。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。