nacos数据一致性服务执行流程
RaftConsistencyServiceImpl的put永久数据同步
内部是调用RaftCore
的signalPublish
。
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
RaftCore的signalPublish
其实就是判断是不是leader,不是的话就要把数据发给leader,让他来处理,否则的话就进行数据改变通知,然后发送给所有结点,包括自身,用CountDownLatch
计数,总共只要有过半的结点的计数就算成功了。
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {//不是leader
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
//交给leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
try {//是leader
OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
//发布数据改变通知
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
//只要过半的结点数
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {//遍历所有结点
if (isLeader(server)) {//自己算一次
latch.countDown();
continue;
}
final String url = buildURL(server, API_ON_PUB);///v1/ns/raft/datum/commit
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();//异步完成
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半数完成
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
RaftCore的isLeader
判断是否是leader
,单机自身就是,否则的话就判断IP
。
public boolean isLeader() {
return peers.isLeader(NetUtils.localServer());
}
RaftPeerSet的isLeader
//检查是否是leader
public boolean isLeader(String ip) {
if (STANDALONE_MODE) {//单机直接就是
return true;
}
if (leader == null) {
Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
return false;
}
//集群的话要看IP对不对
return StringUtils.equals(leader.ip, ip);
}
RaftCore的onPublish
刷新任期是事件,添加通知任务,通知服务更新,服务也是监听器。
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
//不是leader不能干这个事
if (!peers.isLeader(source.ip)) {
Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
JSON.toJSONString(source), JSON.toJSONString(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
"data but wasn't leader");
}
//过时了
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
JSON.toJSONString(source), JSON.toJSONString(local));
throw new IllegalStateException("out of date publish, pub-term:"
+ source.term.get() + ", cur-term: " + local.term.get());
}
//重置任期
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);//持久化到文件
}
//放入数据
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//添加任务
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
Service的onChange
Notifier
通知器会通知服务。
服务进行更新:
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。