2023-08-07
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/239

Eureka-Client完成自身的注册之后,接着我们就要来看Eureka-Client拉取注册表的逻辑了。Eureka-Client拉取注册表分为两种模式: 全量拉取增量拉取

默认配置下,Eureka-Client 启动时,首先执行一次 全量拉取 注册表信息,然后在 本地缓存 ,之后每 30 秒 增量拉取 ,并跟本地缓存进行合并。

本章,我将讲解 全量拉取 注册表的逻辑,整体流程我用下面这张时序图表示。本章,读者应当重点掌握Eureka-Server端的 注册表多级缓存机制

202308072151453961.png

一、Eureka-Client拉取流程

1.1 DiscoveryClient初始化

DiscoveryClient在初始化时,会自动执行注册表的 全量拉取

    /**
    * DiscoveryClient.java
    */
    
    // 注册表本地缓存
    private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
    
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {                    //... 
    
        // 初始化注册表在本地的缓存
        localRegionApps.set(new Applications());
    
        //...   
    
        // 从 Eureka-Server 拉取注册表
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
         //...     
    }

可以看到,保存在Eureka-Client本地的其实是一个Applications对象。

1.2 Applications

Applications是什么东西?其实就是 注册的应用集合 ,它就是注册表在Eureka-Client侧的保存形式:

202308072151467432.png

    public class Applications {
        //...
    
        // 应用集合信息 hashcode,该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致
        private String appsHashCode;
    
        // 应用信息队列
        private final AbstractQueue<Application> applications;
    
        // 应用映射,key为应用名称
        private final Map<String, Application> appNameApplicationMap;
    
        public Applications() {
            this(null, -1L, Collections.emptyList());
        }
    
        public void addApplication(Application app) {
            appNameApplicationMap.put(app.getName().toUpperCase(Locale.ROOT), app);
            addInstancesToVIPMaps(app, this.virtualHostNameAppMap, this.secureVirtualHostNameAppMap);
            applications.add(app);
        }
    
        public List<Application> getRegisteredApplications() {
            return new ArrayList<Application>(this.applications);
        }
    
        public Application getRegisteredApplications(String appName) {
            return appNameApplicationMap.get(appName.toUpperCase(Locale.ROOT));
        }
    
        /**
         * Gets the list of <em>instances</em> associated to a virtual host name.
         */
        public List<InstanceInfo> getInstancesByVirtualHostName(String virtualHostName) {
            return Optional.ofNullable(this.virtualHostNameAppMap.get(virtualHostName.toUpperCase(Locale.ROOT)))
                .map(VipIndexSupport::getVipList)
                .map(AtomicReference::get)
                .orElseGet(Collections::emptyList); 
        }
    
        public String getReconcileHashCode() {
            TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
            populateInstanceCountMap(instanceCountMap);
            return getReconcileHashCode(instanceCountMap);
        }
    
        /**
         * Populates the provided instance count map.  The instance count map is used as part of the general
         * app list synchronization mechanism.
         */
        public void populateInstanceCountMap(TreeMap<String, AtomicInteger> instanceCountMap) {
            for (Application app : this.getRegisteredApplications()) {
                for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {
                    AtomicInteger instanceCount = instanceCountMap.get(info.getStatus().name());
                    if (instanceCount == null) {
                        instanceCount = new AtomicInteger(0);
                        instanceCountMap.put(info.getStatus().name(), instanceCount);
                    }
                    instanceCount.incrementAndGet();
                }
            }
        }
    
        public static String getReconcileHashCode(TreeMap<String, AtomicInteger> instanceCountMap) {
            String reconcileHashCode = "";
            for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
                reconcileHashCode = reconcileHashCode + mapEntry.getKey()
                    + STATUS_DELIMITER + mapEntry.getValue().get()
                    + STATUS_DELIMITER;
            }
            return reconcileHashCode;
        }
        //...
    }

1.3 fetchRegistry

接着, 调用 DiscoveryClient#fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,因为applications.getRegisteredApplications().size() == 0,所以这里肯定是走 全量拉取 ,实现代码如下:

    /**
    * DiscoveryClient.java
    */
    
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
        try {
            // 1.获取本地缓存的注册表
            Applications applications = getApplications();
    
            if (clientConfig.shouldDisableDelta()    // 禁用增量拉取注册表
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) 
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
    
                // 2.执行全量拉取并缓存
                getAndStoreFullRegistry();
            } else {
                // 3.执行增量拉取
                getAndUpdateDelta(applications);
            }
    
            // 4.设置应用集合hashcode
            applications.setAppsHashCode(applications.getReconcileHashCode());
            // 打印本地缓存的注册应用实例数量
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    
        // ...
    
        // 5.以拉取到的注册表为准,更新本地缓存中当前应用实例的状态(只更新缓存)
        updateInstanceRemoteStatus();
    
        return true;
    }

上述updateInstanceRemoteStatus方法,其实就是设置lastRemoteInstanceStatus这个属性值:

    private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
    
    private synchronized void updateInstanceRemoteStatus() {
        // currentRemoteInstanceStatus用来暂存Server端该实例的状态
        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
        if (instanceInfo.getAppName() != null) {
            Application app = getApplication(instanceInfo.getAppName());
            if (app != null) {
                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                if (remoteInstanceInfo != null) {
                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                }
            }
        }
        if (currentRemoteInstanceStatus == null) {
            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
        }
    
        // 如果状态发生了变化
        if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
            // 触发 StatusChangeEvent 事件, 目前 Eureka 未提供默认的该事件监听器
            onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
            // 更新lastRemoteInstanceStatus
            lastRemoteInstanceStatus = currentRemoteInstanceStatus;
        }
    }

1.4 getAndStoreFullRegistry

最后,我们来看下真正的全量拉取注册表的逻辑——DiscoveryClient#getAndStoreFullRegistry(false)

    private void getAndStoreFullRegistry() throws Throwable {
        // 拉取次数
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        logger.info("Getting all instance registry info from the eureka server");
    
        // 1.利用网络通信组件EurekaTransport拉取注册表
        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
    
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 2.更新本地缓存,filterAndShuffle用来打乱顺序
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

上述操作在底层利用jersey client,发送HTTP/GET请求(比如http://localhost:8080/v2/apps),调用eureka server的getApplications restful接口,获取全量注册表,缓存在自己的本地。

    /**
    * AbstractJerseyEurekaHttpClient.java
    */
    
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
        return getApplicationsInternal("apps/", regions);
    }
    
    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
       ClientResponse response = null;
       String regionsParamValue = null;
       try {
           WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
           if (regions != null && regions.length > 0) {
               regionsParamValue = StringUtil.join(regions);
               webResource = webResource.queryParam("regions", regionsParamValue);
           }
           Builder requestBuilder = webResource.getRequestBuilder();
           addExtraHeaders(requestBuilder);
           response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
    
           // 这里的Applications就是注册表
           Applications applications = null;
           if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
               applications = response.getEntity(Applications.class);
           }
           return anEurekaHttpResponse(response.getStatus(), Applications.class)
                   .headers(headersOf(response))
                   .entity(applications)
                   .build();
       } finally {
           if (logger.isDebugEnabled()) {
               logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                       serviceUrl, urlPath,
                       regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                       response == null ? "N/A" : response.getStatus()
               );
           }
           if (response != null) {
               response.close();
           }
       }
    }

二、Eureka-Server处理流程

接着,我们来看下Eureka-Server接受并处理全量拉取的流程。Eureka-Client发送的拉取注册表的HTTP/GET请求是类似这样的:http://localhost:8080/v2/apps/。

2.1 整体流程

在Eureka-Server端,由ApplicationsResource#getContainers()负责处理注册表拉取请求,可以看到注册表是从一个ResponseCache对象中获取的,所以整个Server端的处理流程核心就是ResponseCache:

    /**
    * ApplicationsResource.java
    */
    
    private final ResponseCache responseCache;
    
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {
    
        // ...忽略Region相关
    
        // Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受全量拉取注册表的请求
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
    
        // 1.设置响应报文格式
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
    
        // 2.创建响应缓存(ResponseCache) 的键
        Key cacheKey = new Key(Key.EntityType.Application,
                               ResponseCacheImpl.ALL_APPS,
                               keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
                              );
    
        // 2.从响应缓存(ResponseCache)读取全量注册信息
        Response response;
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
    
            // 关键是responseCache.getGZIP(cacheKey)
            response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
        } else {
            response = Response.ok(responseCache.get(cacheKey))
                .build();
        }
        return response;
    }

下面我们就来看看ResponseCache这个二级缓存的基本代码。

2.2 ResponseCache

com.netflix.eureka.registry.ResponseCache,响应缓存 接口 ,接口代码如下 :

    public interface ResponseCache {
        // 获取缓存
        String get(Key key);
    
        // 获取压缩后的缓存
        byte[] getGZIP(Key key);
    
        // 过期缓存
        void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
    
        AtomicLong getVersionDelta();
    
        AtomicLong getVersionDeltaWithRegions();
    }

其中, com.netflix.eureka.registry.Key,缓存键的实现代码如下:

    public class Key {
        public enum KeyType {
            JSON, XML
        }
        public enum EntityType {
            Application, VIP, SVIP
        }
        private final String entityName;
        private final String[] regions;
    
        // 请求参数类型
        private final KeyType requestType;   
        // 请求 API 版本号
        private final Version requestVersion;
        // hashKey
        private final String hashKey;
        // 实体类型
        private final EntityType entityType;
        private final EurekaAccept eurekaAccept;
    
        //...
    }

三、多级缓存读取机制

ResponseCache接口的实现类是ResponseCacheImpl,它的核心是一套 多级缓存机制 ,ResponseCacheImpl将缓存拆分成两层 :

  • 只读缓存 ( readOnlyCacheMap );
  • 固定过期 + 固定大小可读写缓存 ( readWriteCacheMap )。

默认配置下,当Eureka-Client发送请求过来读取全量注册表时, 缓存读取策略 如下:

202308072151475973.png

3.1 ResponseCacheImpl

Eureka-Server响应Client的多级缓存,实际上内部是调用了ResponseCacheImpl#get(...) 。可以看到,只读缓存readOnlyCacheMap本质就是个ConcurrentHashMap,而可读写缓存是一个LoadingCache封装对象:

    /**
    * ResponseCacheImpl.java
    */
    
    // 原始注册表
    private final AbstractInstanceRegistry registry;
    // 只读注册表
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
    // 可读写注册表
    private final LoadingCache<Key, Value> readWriteCacheMap;
    
    public String get(final Key key) {
        // 是否开启只读缓存:通过参数`eureka.shouldUseReadOnlyResponseCache`配置,默认为true
        return get(key, shouldUseReadOnlyResponseCache);
    }
    
    String get(final Key key, boolean useReadOnlyCache) {
        // Vaule就是实际缓存的值
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
        }
    }
    
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            // 1.若开启只读缓存,则优先从readOnlyCacheMap读
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    // 读不到则从readWriteCacheMap读
                    payload = readWriteCacheMap.get(key);
                    // 重新更新readOnlyCacheMap
                    readOnlyCacheMap.put(key, payload);
                }
            } 
            // 2.未开启只读缓存,直接从readWriteCacheMap读
            else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key :" + key, t);
        }
        return payload;
    }
    
    
    public class Value {
        private final String payload;
        private byte[] gzipped;
    
        public Value(String payload) {
            this.payload = payload;
            if (!EMPTY_PAYLOAD.equals(payload)) {
                // ...
                try {
                    // ...省略GZIP压缩代码
                    gzipped = bos.toByteArray();
                } catch (IOException e) {
                    gzipped = null;
                } finally {
                    if (tracer != null) {
                        tracer.stop();
                    }
                }
            } else {
                gzipped = null;
            }
        }
    
        //...省略get方法
    }

ResponseCacheImpl#getGzip(...) 方法类似,只是对结果进行了压缩,这里就不赘述了。

3.2 可读写缓存LoadingCache

我们关键来看下readWriteCacheMap可读写缓存,也就是LoadingCache,它是随着ResponseCacheImpl一起构造的:

    /**
    * ResponseCacheImpl.java
    */
    
    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        //...
    
        this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(1000)    //最大容量1000
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
            .removalListener(new RemovalListener<Key, Value>() {
                @Override
                public void onRemoval(RemovalNotification<Key, Value> notification) {
                    Key removedKey = notification.getKey();
                    if (removedKey.hasRegions()) {
                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                    }
                }
            })
            .build(new CacheLoader<Key, Value>() {
                @Override
                public Value load(Key key) throws Exception {
                    if (key.hasRegions()) {
                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                        regionSpecificKeys.put(cloneWithNoRegions, key);
                    }
                    // 这里根据Key生成Value
                    Value value = generatePayload(key);
                    return value;
                }
            });
        //...
    }

可以看到,readWriteCacheMap是基于Google GuavaCacheBuilder构建的,它的最大容量是1000。它保存的Value是通过方法generatePayload生成的,最终就是通过从registry这个原始注册表(PeerAwareInstanceRegistryImpl)中获取Applications,然后编码转化放到Value里:

    /**
    * ResponseCacheImpl.java
    */
    
    private Value generatePayload(Key key) {
        //...
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();
    
                    if (ALL_APPS.equals(key.getName())) {    //全量拉取
                        //...
                        //registry.getApplications()获取的就是全量的注册应用信息
                        payload = getPayLoad(key, registry.getApplications());
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {    //增量拉取
                        //...
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    } else {
                        //..
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                //...
            }
            return new Value(payload);
        } finally {
            //...
        }
    }
    
    // 将Applications转换成缓存值
    private String getPayLoad(Key key, Applications apps) {
        EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
        String result;
        try {
            // 将Applications对象编码
            result = encoderWrapper.encode(apps);
        } catch (Exception e) {
            logger.error("Failed to encode the payload for all apps", e);
            return "";
        }
        //...
        return result;
    }

四、多级缓存过期机制

Eureka-Server的ResponseCache存在 过期机制 ,具体的过期策略一共有三种: 主动过期定时过期被动过期

202308072151488884.png

4.1 主动过期

当Eureka-Server端发生实例注册(Register)下线(Cancel)、*剔除(Eviction)*时,会立即主动过期 readWriteCacheMap(默认延迟30s后再重新刷入) 。

举个例子,现在有一个应用A(ServiceA),部署了一个新的应用实例(Instance010)。接着发起服务注册,注册成功后,Eureka-Server就必须刷新readWriteCacheMap这个缓存。它具体会调用ResponseCache.invalidate(),将之前缓存好的ALL_APPS这个Key对应的缓存过期掉。

我们之前讲解服务注册的Eureka-Server端处理流程时,注册完成后,AbstractInstanceRegistry.invalidateCache会执行,内部其实是调用了ResponseCache的invalidate方法:

    private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
        // invalidate cache
        responseCache.invalidate(appName, vipAddress, secureVipAddress);
    }
    /**
    * ResponseCacheImpl.java
    */
    
    public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
        // 遍历所有类型的缓存键,逐个过期
        for (Key.KeyType type : Key.KeyType.values()) {
            for (Version v : Version.values()) {
                invalidate(
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                );
                if (null != vipAddress) {
                    invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                }
                if (null != secureVipAddress) {
                    invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
                }
            }
        }
    }
    
    public void invalidate(Key... keys) {
        for (Key key : keys) {
            logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                         key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    
            // 最终调用了readWriteCacheMap(LoadingCache)的invalidate方法,底层基于Guava过期Key
            readWriteCacheMap.invalidate(key);
            //...
        }
    }

4.2 定时过期

readWriteCacheMap在构建的时候,指定了一个自动过期的时间(默认值180秒),所以你往readWriteCacheMap中放入一个数据,180秒后就会自动过期掉:

    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(1000)
                // 配置eureka.responseCacheAutoExpirationInSeconds,设置写入过期时长。默认值:180 秒
                .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                .removalListener(new RemovalListener<Key, Value>() {
                    @Override
                    public void onRemoval(RemovalNotification<Key, Value> notification) {
                        Key removedKey = notification.getKey();
                        if (removedKey.hasRegions()) {
                            Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                            regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                        }
                    }
                })
                .build(new CacheLoader<Key, Value>() {
                    @Override
                    public Value load(Key key) throws Exception {
                        if (key.hasRegions()) {
                            Key cloneWithNoRegions = key.cloneWithoutRegions();
                            regionSpecificKeys.put(cloneWithNoRegions, key);
                        }
                        Value value = generatePayload(key);
                        return value;
                    }
                });

4.3 被动过期

ResponseCacheImpl内部有个定时任务,默认每隔30秒对readOnlyCacheMapreadWriteCacheMap中的数据进行一次比对,如果数据不一致,那么就将readWriteCacheMap中的数据覆盖到readOnlyCacheMap中去。

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {   
        //...
    
        //配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值:30秒
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    
        if (shouldUseReadOnlyResponseCache) {
            // 开启一个定时调度任务
            timer.schedule(getCacheUpdateTask(),
                           new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                    + responseCacheUpdateIntervalMs),
                           responseCacheUpdateIntervalMs);
        }
        //...
    }
    // ResponseCacheImpl.java
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                // 循环readOnlyCacheMap的缓存键
                for (Key key : readOnlyCacheMap.keySet()) {
                    //...
                    try {
                        CurrentRequestVersion.set(key.getVersion());
    
                        // 判断一、二级缓存至是否一致
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            // 不一致则覆盖只读缓存
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache", th);
                    }
                }
            }
        };
    }

五、总结

本章,我讲解了Eureka的全量拉取注册表流程,以及Eureka-Server端的注册表多级缓存机制,核心是缓存注册表的多级读取与三种过期策略,读者可以结合源码加深理解。

下一章,我将讲解Eureka-Client增量拉取注册表流程,以及该过程中核心的一致性Hash比对原理。

阅读全文