2023-08-07
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/239

本章,我将讲解Eureka的心跳(renew)机制。通过前面章节,我们已经知道 Eureka-Client 向 Eureka-Server 注册应用实例成功后,会获得租约 ( Lease )。所以,Eureka-Client 向 Eureka-Server 发送心跳,其实就是进行 续租 (Lease Renewal),避免租约过期,这样才能让Eureka-Server知道自己还活着。

默认情况下,租约有效期为 90 秒,心跳(续租)频率为 30 秒。两者比例为 1 : 3 ,也就是说,在网络异常等情况下,有三次重试机会。

202308072151557061.png

一、Eureka-Client发起续租

Eureka-Client发起心跳(续租)的整体流程如下:

  1. DiscoveryClient初始化时,会去调度一堆定时任务,其中有一个就是HeartbeatThread——心跳任务;
  2. 心跳任务,默认每隔30秒执行一次发送心跳的逻辑,底层是调用了EurekaHttpClient#sendHeartbeat()
  3. 心跳请求是一个HTTP/PUT请求,URL类似http://localhost:8080/v2/apps/ServiceA/i-000000-1。

1.1 初始化定时任务

Eureka-Client 在构造和初始化时,会创建一个 心跳(续租) 线程——HeartbeatThread,每隔一段时间发送一次心跳:

    /**
    * DiscoveryClient.java
    */
    
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                   Provider<BackupRegistry> backupRegistryProvider) {
        // ...
    
        // 定时任务线程池,用于定时触发心跳( 续租 )
        scheduler = Executors.newScheduledThreadPool(2,
                   new ThreadFactoryBuilder()
                           .setNameFormat("DiscoveryClient-%d")
                           .setDaemon(true)
                           .build());
    
        // 心跳任务执行线程池
        heartbeatExecutor = new ThreadPoolExecutor(
                  1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>(),
                  new ThreadFactoryBuilder()
                          .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                          .setDaemon(true)
                          .build()
        ); 
    
        // ...
    
        // 初始化定时任务
        initScheduledTasks();
    }

initScheduledTasks,会开始进行心跳任务的调度,默认每隔30秒调度一次:

    private void initScheduledTasks() {
        // 向 Eureka-Server 发送心跳(续租)执行器
        if (clientConfig.shouldRegisterWithEureka()) {
            // 心跳间隔,默认30s
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            // 超时重试因子
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
            // 开始调度
            scheduler.schedule(
                new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()    // HeartbeatThread线程
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
    
            //...
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

1.2 HeartbeatThread

上述com.netflix.discovery.DiscoveryClient.HeartbeatThread,是一个包含心跳请求逻辑的Runnable任务,实现执行 Eureka-Client 向 Eureka-Server 发起 续租 ( renew )请求。实现代码如下:

    /**
    * DiscoveryClient.java
    */
    
    // 最后成功向 Eureka-Server 心跳时间戳
    private volatile long lastSuccessfulHeartbeatTimestamp = -1;
    
    private class HeartbeatThread implements Runnable {
    
       public void run() {
           if (renew()) {
               // 心跳发送成功后,更新时间戳
               lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
           }
       }
    }

调用 #renew 方法,执行续租逻辑:

    boolean renew() {
       EurekaHttpResponse<InstanceInfo> httpResponse;
       try {
           // 1.利用EurekaTransport通信组件发送HTTP/PUT请求
           httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
           logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    
           // 2.404表示 Eureka-Server 不存在租约,则重新发起注册
           if (httpResponse.getStatusCode() == 404) {
               REREGISTER_COUNTER.increment();
               logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
               long timestamp = instanceInfo.setIsDirtyWithTime();
               // 新发起注册
               boolean success = register();
               if (success) {
                   instanceInfo.unsetIsDirty(timestamp);
               }
               return success;
           }
           return httpResponse.getStatusCode() == 200;
       } catch (Throwable e) {
           logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
           return false;
       }
    }

1.3 EurekaHttpClient

renew方法内部其实调用了 AbstractJerseyEurekaHttpClient#sendHeartBeat(...) 方法,发起 续租请求 ,实现代码如下:

    /**
    * AbstractJerseyEurekaHttpClient.java
    */
    
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        // URL,例如http://localhost:8080/v2/apps/ServiceA/i-000000-1
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            // 发送http/put请求
            WebResource webResource = jerseyClient.resource(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            response = requestBuilder.put(ClientResponse.class);    //PUT Request
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity()) {
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

二、Eureka-Server接受续租

Eureka-Server接受心跳(续租)的整体流程如下:

  1. 负责接受心跳请求的Controller是InstanceResource#renewLease(),相当于Jersey的一个MVC框架;
  2. 调用AbstractInstanceRegistry的renew()方法,完成应用实例的续租;
  3. 实际续租的处理流程是:根据应用名称和实例ID,从内部的Map中获取一个Lease ,更新一下lastUpdateTimestamp时间戳。

2.1 接受续租请求

com.netflix.eureka.resources.InstanceResource,是处理单个应用实例的请求的 Resource ( Controller ):

    /**
    * InstanceResource.java
    */
    
    @PUT
    public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
    
        // 1.执行续租
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // 2.续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起注册
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // 比较请求的lastDirtyTimestamp和Server端的InstanceInfo的lastDirtyTimestamp的差异
        // 需要配置 eureka.syncWhenTimestampDiffers = true ( 默认开启 )。
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            // 比较 lastDirtyTimestamp 的差异
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }

2.2 执行续租

实际续租逻辑的执行是在Server端的注册表对象的AbstractInstanceRegistry#renew(...) 方法,本质就是根据应用名称和实例ID,在一个内部的Map中找到租约对象,并更新它的时间戳:

    /**
    * AbstractInstanceRegistry.java
    */
    
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
    
        // key是实例id,value是Lease对象
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        // 1.如果没找到租约,直接返回失败
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } 
        // 2.找到了旧的租约
        else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // 获取应用实例的最终状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
    
                // 若应用实例的最终状态为UNKNOWN,则返回失败
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                                + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
    
                // 若应用实例的状态与最终状态不相等,则以最终状态覆盖掉应用实例的状态
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                        instanceInfo.getStatus().name(),
                        instanceInfo.getOverriddenStatus().name(),
                        instanceInfo.getId()
                    };
                    logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                        + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
    
            // 增加续租计数,Netflix Servo监控用
            renewsLastMin.increment();
            // 设置 租约最后更新时间戳
            leaseToRenew.renew();
            return true;
        }
    }
    /**
    * Lease.java
    */
    public void renew() {
        // duration是租约有效期,默认90秒
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }

lastUpdateTimestamp就是最新一次接受到心跳的时间戳,其实这里不需要加这个租约有效期duration,是一个Eureka自身的一个Bug,后面我讲解服务剔除时会讲到。

三、总结

总结,本章我讲解了Eureka的心跳续租机制,本质就是Eureka-Client通过定时调度任务,每隔一段时间发送一次HTTP/PUT请求到Eureka-Server端。Eureka-Server端接受到请求后,在自己的注册表中找到应用实例对应的租约,然后更新租约的时间戳。

阅读全文