回顾客户端实例心跳
在服务实例注册之前,如果是临时的服务实例,会先开启心跳任务,不过心跳任务5
秒后会运行,第一次心跳的时候会带上心跳内容,也就是服务实例的信息,避免实例不存在又要重新注册一次:
心跳处理基本流程
调度心跳任务。
发送心跳:
uri
是/nacos/v1/ns/instance/beat
:
如果返回找不到服务就会进行注册,所以你debug
的时候可能会发现已经有注册的了,其实就是心跳去注册的。
服务端处理心跳
InstanceController的beat
这里会接受客户端的心跳,如果是有beat
信息的话,说明是第一次,会带有服务实例信息,因为如果成功了服务端会下发不要带beat
信息的参数,这样客户端第二次就不会带beat
信息了。如果发现没有该服务,又没带beat
信息,说明这个服务可能出了问题被移除过了,直接返回没找到。如果没有服务,但是发现有beat
信息,那就从beat
中获取服务实例信息,进行注册。然后创建一个RsInfo
,跟心跳实例刷新相关,开启一次ClientBeatProcessor
处理任务,处理一次服务实例刷新。最后返回成功,并带上心跳时间和客户端是否要进行轻量级心跳发送标志。
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public JSONObject beat(HttpServletRequest request) throws Exception {
JSONObject result = new JSONObject();
//设置要求的心跳间隔
result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
Constants.DEFAULT_NAMESPACE_ID);
String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {//如果有心跳内容,也就不是轻量级心跳,转换为RsInfo
clientBeat = JSON.parseObject(beat, RsInfo.class);
}
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();//获取集群名
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
}
//获取相关服务实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
//实例不存在
if (instance == null) {
if (clientBeat == null) {//如果心跳内容也没有就返回找不到
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}//否则根据心跳内容创建一个实例
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
//注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {//不存在的话,要创建一个进行处理
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}//开启一次性心跳检查任务
service.processClientBeat(clientBeat);
//成功返回,
result.put(CommonParams.CODE, NamingResponseCode.OK);
result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());//5秒间隔
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());//告诉客户端不需要带上心跳信息了,变成轻量级心跳了
return result;
}
Service的processClientBeat处理一次心跳
创建一个临时心跳处理器,然后调度处理一次。
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
}
下篇继续。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。