2023-08-07
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/239

既然Eureka-Server支持集群部署,那么在启动过程中必然涉及对集群的一些操作,所以Eureka抽象出一个PeerEurekaNodes的概念,这就代表了整个Eureka-Server集群节点集合。在Eureka-Server的启动过程中,会创建一个PeerEurekaNodes对象:

    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
        registry,
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        applicationInfoManager
    );

本章,我就来讲解PeerEurekaNodes到底是个什么东西?

一、PeerEurekaNodes

PeerEurekaNodes代表了整个Eureka-Server集群,它的构造函数没什么特殊的,就是对各种属性进行赋值。PeerEurekaNodes内部保存了一个PeerEurekaNode列表,表示该集群中的所有Eureka-Server节点:

    public class PeerEurekaNodes {
    
        // 应用实例注册表
        protected final PeerAwareInstanceRegistry registry;
    
        // Eureka-Server 配置
        protected final EurekaServerConfig serverConfig;
    
        // Eureka-Client 配置
        protected final EurekaClientConfig clientConfig;
    
        // Eureka-Server 编解码
        protected final ServerCodecs serverCodecs;
    
        // 应用实例信息管理器
        private final ApplicationInfoManager applicationInfoManager;
    
        // Eureka-Server 集群节点数组
        private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
    
        // Eureka-Server 服务地址数组
        private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
    
        // 定时调度线程池
        private ScheduledExecutorService taskExecutor;
    
        @Inject
        public PeerEurekaNodes(
                PeerAwareInstanceRegistry registry,
                EurekaServerConfig serverConfig,
                EurekaClientConfig clientConfig,
                ServerCodecs serverCodecs,
                ApplicationInfoManager applicationInfoManager) {
            this.registry = registry;
            this.serverConfig = serverConfig;
            this.clientConfig = clientConfig;
            this.serverCodecs = serverCodecs;
            this.applicationInfoManager = applicationInfoManager;
        }
    }

1.1 start启动

PeerEurekaNodes包含了一个start方法,整体的逻辑很简单:

    public void start() {
        // 1.创建一个线程池,只有一个工作线程
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
        );
        try {
            // 2.初始化集群节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            // 3.创建一个任务,用于定时更新集群节点信息,默认10 分钟
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
    
                }
            };
            // 4.提交任务
            taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        // 打印集群节点信息
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

1.2 更新集群节点信息

我们来看下上面的resolvePeerUrls() 方法,这个方法返回一个 Eureka-Server 集群服务地址的数组:

    protected List<String> resolvePeerUrls() {
        // 1.根据当前应用实例信息,解析出整个Eureka-Server集群的服务地址
        // 涉及EndPoint和解析器,后续章节会讲解
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
    
        // 2.移除当前应用实例(自己),避免向自己同步
        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }

然后,我们看下updatePeerEurekaNodes这个更新集群节点信息的方法,它主要做两件事情:

  • 添加新增的集群节点
  • 关闭删除的集群节点
    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }
    
        // 1.计算删除的集群节点地址
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
    
        // 2.计算新增的集群节点地址
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
    
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }
    
        // 3.关闭删除的集群节点
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }
    
        // 4.添加新增的集群节点
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
    
        // 5.重新给属性赋值
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }
    
    // PeerEurekaNodes#createPeerEurekaNode
    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        // 根据服务地址,创建一个集群节点
    
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

二、总结

本章,我简单介绍了PeerEurekaNodes,它代表的其实就是整个Eureka-Server集群。PeerEurekaNodes在构造成功后,可以调用start方法解析集群中其它节点(创建一些PeerEurekaNode对象),然后启动一个定时任务更新PeerEurekaNode信息。

阅读全文