既然Eureka-Server支持集群部署,那么在启动过程中必然涉及对集群的一些操作,所以Eureka抽象出一个PeerEurekaNodes
的概念,这就代表了整个Eureka-Server集群节点集合。在Eureka-Server的启动过程中,会创建一个PeerEurekaNodes对象:
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
本章,我就来讲解PeerEurekaNodes
到底是个什么东西?
一、PeerEurekaNodes
PeerEurekaNodes代表了整个Eureka-Server集群,它的构造函数没什么特殊的,就是对各种属性进行赋值。PeerEurekaNodes内部保存了一个PeerEurekaNode
列表,表示该集群中的所有Eureka-Server节点:
public class PeerEurekaNodes {
// 应用实例注册表
protected final PeerAwareInstanceRegistry registry;
// Eureka-Server 配置
protected final EurekaServerConfig serverConfig;
// Eureka-Client 配置
protected final EurekaClientConfig clientConfig;
// Eureka-Server 编解码
protected final ServerCodecs serverCodecs;
// 应用实例信息管理器
private final ApplicationInfoManager applicationInfoManager;
// Eureka-Server 集群节点数组
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
// Eureka-Server 服务地址数组
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
// 定时调度线程池
private ScheduledExecutorService taskExecutor;
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
}
1.1 start启动
PeerEurekaNodes包含了一个start
方法,整体的逻辑很简单:
public void start() {
// 1.创建一个线程池,只有一个工作线程
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 2.初始化集群节点信息
updatePeerEurekaNodes(resolvePeerUrls());
// 3.创建一个任务,用于定时更新集群节点信息,默认10 分钟
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 4.提交任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
// 打印集群节点信息
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
1.2 更新集群节点信息
我们来看下上面的resolvePeerUrls()
方法,这个方法返回一个 Eureka-Server 集群服务地址的数组:
protected List<String> resolvePeerUrls() {
// 1.根据当前应用实例信息,解析出整个Eureka-Server集群的服务地址
// 涉及EndPoint和解析器,后续章节会讲解
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
// 2.移除当前应用实例(自己),避免向自己同步
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
然后,我们看下updatePeerEurekaNodes
这个更新集群节点信息的方法,它主要做两件事情:
- 添加新增的集群节点
- 关闭删除的集群节点
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 1.计算删除的集群节点地址
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 2.计算新增的集群节点地址
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// 3.关闭删除的集群节点
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 4.添加新增的集群节点
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
// 5.重新给属性赋值
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
// PeerEurekaNodes#createPeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
// 根据服务地址,创建一个集群节点
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
二、总结
本章,我简单介绍了PeerEurekaNodes,它代表的其实就是整个Eureka-Server集群。PeerEurekaNodes在构造成功后,可以调用start方法解析集群中其它节点(创建一些PeerEurekaNode对象),然后启动一个定时任务更新PeerEurekaNode信息。