2023-09-15  阅读(2)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105841056

服务端处理查询实例列表

我们看查询服务实例列表在哪里,在ZoneAwareLoadBalancerupdateListOfServers更新服务中。

202309152318429811.png
这里会传UDP端口,就是前篇说的服务器推送服务实例改变的时候会用到。

202309152318438582.png

InstanceController的list查询实例列表

主要就是获取了一些信息,这里要注意udpPort,推送会用到。最后用doSrvIPXT处理,这个什么方法,好像没什么可读性啊。

      @GetMapping("/list")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
        public JSONObject list(HttpServletRequest request) throws Exception {
    
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);
    
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String agent = WebUtils.getUserAgent(request);
            String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
            String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
            //UDP端口号
            Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
            String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
            boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
            String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    
            String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
            boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
            return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
        }

doSrvIPXT返回实例

很长的方法,而且有些逻辑很奇怪,大致就是根据客户端agent,如果有udp端口的话就会添加到udp推送服务里去。然后获取集群相关的所有实例,进行客户端IP过滤,最后再健康状态过滤,如果小于保护阈值,还要开启保护模式,也就是把不健康的也加入到健康的集合里,最后封装数据响应。

     public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                                    int udpPort,
                                    String env, boolean isCheck, String app, String tid, boolean healthyOnly)
            throws Exception {
    
            ClientInfo clientInfo = new ClientInfo(agent);//创建客户端信息
            JSONObject result = new JSONObject();
            Service service = serviceManager.getService(namespaceId, serviceName);//获取服务
    
            if (service == null) {//没有服务
                if (Loggers.SRV_LOG.isDebugEnabled()) {
                    Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
                }
                result.put("name", serviceName);
                result.put("clusters", clusters);
                result.put("hosts", new JSONArray());
                return result;
            }
    
            checkIfDisabled(service);//检查服务是否可用
            //缓存事件3秒
            long cacheMillis = switchDomain.getDefaultCacheMillis();
    
            // now try to enable the push
            try {//有UDP端口的话且里面判断客户端agent类型,好几种语言版本JAVA PYTHON GO C
                if (udpPort > 0 && pushService.canEnablePush(agent)) {
                    //添加要推送的客户端信息
                    pushService.addClient(namespaceId, serviceName,
                        clusters,
                        agent,
                        new InetSocketAddress(clientIP, udpPort),
                        pushDataSource,
                        tid,
                        app);
                    cacheMillis = switchDomain.getPushCacheMillis(serviceName);
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
                cacheMillis = switchDomain.getDefaultCacheMillis();
            }
    
            List<Instance> srvedIPs;
            //获取集群相关的服务实例
            srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
            // 客户端IP过滤
            // filter ips using selector:
            if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
                srvedIPs = service.getSelector().select(clientIP, srvedIPs);
            }
    
            if (CollectionUtils.isEmpty(srvedIPs)) {
    
                if (Loggers.SRV_LOG.isDebugEnabled()) {
                    Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
                }
    
                if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                    clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    result.put("dom", serviceName);
                } else {
                    result.put("dom", NamingUtils.getServiceName(serviceName));
                }
    
                result.put("hosts", new JSONArray());
                result.put("name", serviceName);
                result.put("cacheMillis", cacheMillis);
                result.put("lastRefTime", System.currentTimeMillis());
                result.put("checksum", service.getChecksum());
                result.put("useSpecifiedURL", false);
                result.put("clusters", clusters);
                result.put("env", env);
                result.put("metadata", service.getMetadata());
                return result;
            }
    
            Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
            ipMap.put(Boolean.TRUE, new ArrayList<>());
            ipMap.put(Boolean.FALSE, new ArrayList<>());
            //区分健康状况的实例
            for (Instance ip : srvedIPs) {
                ipMap.get(ip.isHealthy()).add(ip);
            }
    
            if (isCheck) {
                result.put("reachProtectThreshold", false);
            }
            //保护模式阈值,默认是,不开启
            double threshold = service.getProtectThreshold();
            //健康的小于阈值的话就会开启保护模式,会把不健康的也实例也下发
            if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
    
                Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
                if (isCheck) {
                    result.put("reachProtectThreshold", true);
                }
    
                ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
                ipMap.get(Boolean.FALSE).clear();
            }
    
            if (isCheck) {
                result.put("protectThreshold", service.getProtectThreshold());
                result.put("reachLocalSiteCallThreshold", false);
    
                return new JSONObject();//返回空对象,很奇怪?是不是写错了
            }
    
            JSONArray hosts = new JSONArray();
            //封装实例
            for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
                List<Instance> ips = entry.getValue();
    
                if (healthyOnly && !entry.getKey()) {
                    continue;
                }
    
                for (Instance instance : ips) {
    
                    // remove disabled instance:
                    if (!instance.isEnabled()) {
                        continue;
                    }
    
                    JSONObject ipObj = new JSONObject();
    
                    ipObj.put("ip", instance.getIp());
                    ipObj.put("port", instance.getPort());
                    // deprecated since nacos 1.0.0:
                    ipObj.put("valid", entry.getKey());
                    ipObj.put("healthy", entry.getKey());
                    ipObj.put("marked", instance.isMarked());
                    ipObj.put("instanceId", instance.getInstanceId());
                    ipObj.put("metadata", instance.getMetadata());
                    ipObj.put("enabled", instance.isEnabled());
                    ipObj.put("weight", instance.getWeight());
                    ipObj.put("clusterName", instance.getClusterName());
                    if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                        clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                        ipObj.put("serviceName", instance.getServiceName());
                    } else {
                        ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                    }
    
                    ipObj.put("ephemeral", instance.isEphemeral());
                    hosts.add(ipObj);
    
                }
            }
    
            result.put("hosts", hosts);
            if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }
            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.put("metadata", service.getMetadata());
            return result;
        }

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文