Service模型大致结构
DistroConsistencyServiceImpl的put临时实例集合一致性服务
添加集合,然后
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
onPut添加临时实例集合
如果是临时的就添加一个Datum
,将实例集合放入。如果有监听器监听的,立即通知,否则就返回,怎么通知的后面说。
//临时的实例
public void onPut(String key, Record value) {
//如果是临时的服务实例集合
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();//创建临时数据
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);//放进一个map里
}
if (!listeners.containsKey(key)) {//没有监听器就返回
return;
}
//有监听立即通知服务有改变
notifier.addTask(key, ApplyAction.CHANGE);
}
RaftConsistencyServiceImpl的put永久实例集合一致性服务
和raft
选举算法有关。
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
signalPublish
这个其实涉及到raft
选举的协议,如果本服务不是leader
就要交给leader
去处理,就发一个http
请求给leader
,leader
接受到之后还是会到他的signalPublish
里处理。如果是leader
的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch
可以用,只要有过半响应成功就算同步成功,具体的细节后面会讲。
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {//不是leader
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
//交给leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
try {//是leader
OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
//发布数据改变通知
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {//遍历所有结点
if (isLeader(server)) {//自己算一次
latch.countDown();
continue;
}
final String url = buildURL(server, API_ON_PUB);///v1/ns/raft/datum/commit
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();//异步完成
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半数完成
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
TaskDispatcher的addTask
负载均衡的获取一个任务执行器,添加一个新任务,这个就是来做数据同步的,具体做什么后面一起说。
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
TaskScheduler的addTask
放到一个阻塞队列里,做数据同步,同步到其他结点,具体的任务后面说。
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public void addTask(String key) {
queue.offer(key);
}
这样,服务实例的注册流程基本说完了,我们可以总结下。
- 如果服务不存在,就创建服务,并开启心跳检查,来检查服务中的每个实例,并将服务注册到监听器集合中,因为服务本身实现了监听器集合,其他集群的服务数据有改动的话会通知服务。
- 添加服务实例到服务集群中,没有集群就创建一个,并刷新老的服务实例集合。并将服务
key
添加到同步任务中,同步任务根据数量和超时限制进行服务器集群的同步。
其实中间可能还开启了好多任务,比如心跳检查Service
的ClientBeatCheckTask
,Cluster
的HealthCheckTask
,数据同步的TaskDispatcher
,服务实例改变的通知器Notifier
任务。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。