服务端处理查询实例列表
我们看查询服务实例列表在哪里,在ZoneAwareLoadBalancer
的updateListOfServers
更新服务中。
这里会传UDP
端口,就是前篇说的服务器推送服务实例改变的时候会用到。
InstanceController的list查询实例列表
主要就是获取了一些信息,这里要注意udpPort
,推送会用到。最后用doSrvIPXT
处理,这个什么方法,好像没什么可读性啊。
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public JSONObject list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
//UDP端口号
Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
doSrvIPXT返回实例
很长的方法,而且有些逻辑很奇怪,大致就是根据客户端agent
,如果有udp
端口的话就会添加到udp
推送服务里去。然后获取集群相关的所有实例,进行客户端IP
过滤,最后再健康状态过滤,如果小于保护阈值,还要开启保护模式,也就是把不健康的也加入到健康的集合里,最后封装数据响应。
public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort,
String env, boolean isCheck, String app, String tid, boolean healthyOnly)
throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);//创建客户端信息
JSONObject result = new JSONObject();
Service service = serviceManager.getService(namespaceId, serviceName);//获取服务
if (service == null) {//没有服务
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("hosts", new JSONArray());
return result;
}
checkIfDisabled(service);//检查服务是否可用
//缓存事件3秒
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {//有UDP端口的话且里面判断客户端agent类型,好几种语言版本JAVA PYTHON GO C
if (udpPort > 0 && pushService.canEnablePush(agent)) {
//添加要推送的客户端信息
pushService.addClient(namespaceId, serviceName,
clusters,
agent,
new InetSocketAddress(clientIP, udpPort),
pushDataSource,
tid,
app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
List<Instance> srvedIPs;
//获取集群相关的服务实例
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// 客户端IP过滤
// filter ips using selector:
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("hosts", new JSONArray());
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
//区分健康状况的实例
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
//保护模式阈值,默认是,不开启
double threshold = service.getProtectThreshold();
//健康的小于阈值的话就会开启保护模式,会把不健康的也实例也下发
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return new JSONObject();//返回空对象,很奇怪?是不是写错了
}
JSONArray hosts = new JSONArray();
//封装实例
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
JSONObject ipObj = new JSONObject();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.put("metadata", instance.getMetadata());
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.put("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA &&
clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.put("metadata", service.getMetadata());
return result;
}
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。