2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105838354

回顾客户端实例心跳

在服务实例注册之前,如果是临时的服务实例,会先开启心跳任务,不过心跳任务5秒后会运行,第一次心跳的时候会带上心跳内容,也就是服务实例的信息,避免实例不存在又要重新注册一次:

202309152318315191.png

心跳处理基本流程

202309152318319432.png

202309152318323863.png
调度心跳任务。

202309152318337764.png
发送心跳:

202309152318343395.png
uri/nacos/v1/ns/instance/beat

202309152318348946.png
如果返回找不到服务就会进行注册,所以你debug的时候可能会发现已经有注册的了,其实就是心跳去注册的。

202309152318358467.png

服务端处理心跳

202309152318364258.png

InstanceController的beat

这里会接受客户端的心跳,如果是有beat信息的话,说明是第一次,会带有服务实例信息,因为如果成功了服务端会下发不要带beat信息的参数,这样客户端第二次就不会带beat信息了。如果发现没有该服务,又没带beat信息,说明这个服务可能出了问题被移除过了,直接返回没找到。如果没有服务,但是发现有beat信息,那就从beat中获取服务实例信息,进行注册。然后创建一个RsInfo,跟心跳实例刷新相关,开启一次ClientBeatProcessor处理任务,处理一次服务实例刷新。最后返回成功,并带上心跳时间和客户端是否要进行轻量级心跳发送标志。

     	@CanDistro
        @PutMapping("/beat")
        @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
        public JSONObject beat(HttpServletRequest request) throws Exception {
    
            JSONObject result = new JSONObject();
            //设置要求的心跳间隔
            result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
            String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
            String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);
            String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
                UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
            int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
            String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    
            RsInfo clientBeat = null;
            if (StringUtils.isNotBlank(beat)) {//如果有心跳内容,也就不是轻量级心跳,转换为RsInfo
                clientBeat = JSON.parseObject(beat, RsInfo.class);
            }
    
            if (clientBeat != null) {
                if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                    clusterName = clientBeat.getCluster();//获取集群名
                } else {
                    // fix #2533
                    clientBeat.setCluster(clusterName);
                }
                ip = clientBeat.getIp();
                port = clientBeat.getPort();
            }
    
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
            }
            //获取相关服务实例
            Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
            //实例不存在
            if (instance == null) {
                if (clientBeat == null) {//如果心跳内容也没有就返回找不到
                    result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                    return result;
                }//否则根据心跳内容创建一个实例
                instance = new Instance();
                instance.setPort(clientBeat.getPort());
                instance.setIp(clientBeat.getIp());
                instance.setWeight(clientBeat.getWeight());
                instance.setMetadata(clientBeat.getMetadata());
                instance.setClusterName(clusterName);
                instance.setServiceName(serviceName);
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(clientBeat.isEphemeral());
                //注册实例
                serviceManager.registerInstance(namespaceId, serviceName, instance);
            }
    
            Service service = serviceManager.getService(namespaceId, serviceName);
    
            if (service == null) {
                throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
            }
            if (clientBeat == null) {//不存在的话,要创建一个进行处理
                clientBeat = new RsInfo();
                clientBeat.setIp(ip);
                clientBeat.setPort(port);
                clientBeat.setCluster(clusterName);
            }//开启一次性心跳检查任务
            service.processClientBeat(clientBeat);
            //成功返回,
            result.put(CommonParams.CODE, NamingResponseCode.OK);
            result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());//5秒间隔
            result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());//告诉客户端不需要带上心跳信息了,变成轻量级心跳了
            return result;
        }

Service的processClientBeat处理一次心跳

创建一个临时心跳处理器,然后调度处理一次。

    public void processClientBeat(final RsInfo rsInfo) {
            ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
            clientBeatProcessor.setService(this);
            clientBeatProcessor.setRsInfo(rsInfo);
            HealthCheckReactor.scheduleNow(clientBeatProcessor);
        }
    
        public static ScheduledFuture<?> scheduleNow(Runnable task) {
            return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
        }

下篇继续。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文