2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105841056

服务端处理查询实例列表

我们看查询服务实例列表在哪里,在ZoneAwareLoadBalancerupdateListOfServers更新服务中。

202309152318429811.png
这里会传UDP端口,就是前篇说的服务器推送服务实例改变的时候会用到。

202309152318438582.png

InstanceController的list查询实例列表

主要就是获取了一些信息,这里要注意udpPort,推送会用到。最后用doSrvIPXT处理,这个什么方法,好像没什么可读性啊。

      @GetMapping("/list")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
        public JSONObject list(HttpServletRequest request) throws Exception {
    
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);
    
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String agent = WebUtils.getUserAgent(request);
            String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
            String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
            //UDP端口号
            Integer udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
            String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
            boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
            String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    
            String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
            boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
            return doSrvIPXT(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                healthyOnly);
        }

doSrvIPXT返回实例

很长的方法,而且有些逻辑很奇怪,大致就是根据客户端agent,如果有udp端口的话就会添加到udp推送服务里去。然后获取集群相关的所有实例,进行客户端IP过滤,最后再健康状态过滤,如果小于保护阈值,还要开启保护模式,也就是把不健康的也加入到健康的集合里,最后封装数据响应。

     public JSONObject doSrvIPXT(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                                    int udpPort,
                                    String env, boolean isCheck, String app, String tid, boolean healthyOnly)
            throws Exception {
    
            ClientInfo clientInfo = new ClientInfo(agent);//创建客户端信息
            JSONObject result = new JSONObject();
            Service service = serviceManager.getService(namespaceId, serviceName);//获取服务
    
            if (service == null) {//没有服务
                if (Loggers.SRV_LOG.isDebugEnabled()) {
                    Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
                }
                result.put("name", serviceName);
                result.put("clusters", clusters);
                result.put("hosts", new JSONArray());
                return result;
            }
    
            checkIfDisabled(service);//检查服务是否可用
            //缓存事件3秒
            long cacheMillis = switchDomain.getDefaultCacheMillis();
    
            // now try to enable the push
            try {//有UDP端口的话且里面判断客户端agent类型,好几种语言版本JAVA PYTHON GO C
                if (udpPort > 0 && pushService.canEnablePush(agent)) {
                    //添加要推送的客户端信息
                    pushService.addClient(namespaceId, serviceName,
                        clusters,
                        agent,
                        new InetSocketAddress(clientIP, udpPort),
                        pushDataSource,
                        tid,
                        app);
                    cacheMillis = switchDomain.getPushCacheMillis(serviceName);
                }
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
                cacheMillis = switchDomain.getDefaultCacheMillis();
            }
    
            List<Instance> srvedIPs;
            //获取集群相关的服务实例
            srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
            // 客户端IP过滤
            // filter ips using selector:
            if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
                srvedIPs = service.getSelector().select(clientIP, srvedIPs);
            }
    
            if (CollectionUtils.isEmpty(srvedIPs)) {
    
                if (Loggers.SRV_LOG.isDebugEnabled()) {
                    Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
                }
    
                if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                    clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                    result.put("dom", serviceName);
                } else {
                    result.put("dom", NamingUtils.getServiceName(serviceName));
                }
    
                result.put("hosts", new JSONArray());
                result.put("name", serviceName);
                result.put("cacheMillis", cacheMillis);
                result.put("lastRefTime", System.currentTimeMillis());
                result.put("checksum", service.getChecksum());
                result.put("useSpecifiedURL", false);
                result.put("clusters", clusters);
                result.put("env", env);
                result.put("metadata", service.getMetadata());
                return result;
            }
    
            Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
            ipMap.put(Boolean.TRUE, new ArrayList<>());
            ipMap.put(Boolean.FALSE, new ArrayList<>());
            //区分健康状况的实例
            for (Instance ip : srvedIPs) {
                ipMap.get(ip.isHealthy()).add(ip);
            }
    
            if (isCheck) {
                result.put("reachProtectThreshold", false);
            }
            //保护模式阈值,默认是,不开启
            double threshold = service.getProtectThreshold();
            //健康的小于阈值的话就会开启保护模式,会把不健康的也实例也下发
            if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
    
                Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
                if (isCheck) {
                    result.put("reachProtectThreshold", true);
                }
    
                ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
                ipMap.get(Boolean.FALSE).clear();
            }
    
            if (isCheck) {
                result.put("protectThreshold", service.getProtectThreshold());
                result.put("reachLocalSiteCallThreshold", false);
    
                return new JSONObject();//返回空对象,很奇怪?是不是写错了
            }
    
            JSONArray hosts = new JSONArray();
            //封装实例
            for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
                List<Instance> ips = entry.getValue();
    
                if (healthyOnly && !entry.getKey()) {
                    continue;
                }
    
                for (Instance instance : ips) {
    
                    // remove disabled instance:
                    if (!instance.isEnabled()) {
                        continue;
                    }
    
                    JSONObject ipObj = new JSONObject();
    
                    ipObj.put("ip", instance.getIp());
                    ipObj.put("port", instance.getPort());
                    // deprecated since nacos 1.0.0:
                    ipObj.put("valid", entry.getKey());
                    ipObj.put("healthy", entry.getKey());
                    ipObj.put("marked", instance.isMarked());
                    ipObj.put("instanceId", instance.getInstanceId());
                    ipObj.put("metadata", instance.getMetadata());
                    ipObj.put("enabled", instance.isEnabled());
                    ipObj.put("weight", instance.getWeight());
                    ipObj.put("clusterName", instance.getClusterName());
                    if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                        clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                        ipObj.put("serviceName", instance.getServiceName());
                    } else {
                        ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
                    }
    
                    ipObj.put("ephemeral", instance.isEphemeral());
                    hosts.add(ipObj);
    
                }
            }
    
            result.put("hosts", hosts);
            if (clientInfo.type == ClientInfo.ClientType.JAVA &&
                clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                result.put("dom", serviceName);
            } else {
                result.put("dom", NamingUtils.getServiceName(serviceName));
            }
            result.put("name", serviceName);
            result.put("cacheMillis", cacheMillis);
            result.put("lastRefTime", System.currentTimeMillis());
            result.put("checksum", service.getChecksum());
            result.put("useSpecifiedURL", false);
            result.put("clusters", clusters);
            result.put("env", env);
            result.put("metadata", service.getMetadata());
            return result;
        }

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文