服务发现任务图
NamingProxy的refreshSrvIfNeed刷新服务地址
这个主要是刷新服务器地址用的,但是是根据域名去请求的,一般不设置域名这里基本都是返回了。
private void refreshSrvIfNeed() {
try {
//有了服务了直接返回
if (!CollectionUtils.isEmpty(serverList)) {
NAMING_LOGGER.debug("server list provided by user: " + serverList);
return;
}
//间隔太短不行,30秒
if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
return;
}
List<String> list = getServerListFromEndpoint();
if (CollectionUtils.isEmpty(list)) {
throw new Exception("Can not acquire Nacos list");
}
if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list);
}
serversFromEndpoint = list;
lastSrvRefTime = System.currentTimeMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("failed to update server list", e);
}
}
EventDispatcher的Notifier
其实就是监听服务改变,然后进行通知,changedServices
是个BlockingQueue
阻塞队列,有改变的才会获取。
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (serviceInfo == null) {
continue;
}
try {
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
HostReactor的UpdateTask
这个是给指定的服务做更新,根据服务端发来的缓存事件,一般是10
秒,请求更新一次。
@Override
public void run() {
try {
//获取服务名相关的服务信息
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//不存在就直接获取
if (serviceObj == null) {
//直接更新
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
//还没更新过,要更新后比较
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {//已经更新过了,就直接把本地的推送上去,不比较差异
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
updateServiceNow更新并比较差异
获取老的,然后请求新的,然后解析json
后比较,有差异就要通知。
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
NamingProxy的queryList
设置参数,调用请求服务实例列表uri
为/nacos/v1/ns/instance/list
,返回json
结果。
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
HostReactor的processServiceJSON
解析json
为服务信息对象,然后比较新老服务信息,有改变的话就进行通知和保存到本地。
这样基本的任务都讲完了,接下去开始讲点细节,什么时候会进行服务获取和更新,其实一开始只是初始化,只有注册中心地址和服务名,没有任何服务实例的,只有当有请求来的时候,底层用都了ribbon做负载均衡,在创建负载均衡器的时候会进行服务列表请求更新,具体下篇说吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。