raft竞选基本流程
集群结点请求同步数据
前面说了leader
发送的心跳过去的时候会带有数据key
的信息,其他结点可能会进行数据的同步。
RaftCore的receivedBeat获取信息
好长一段,其实就是根据leader
带来的数据key
,进行比对,要更新的就批量更新,收到后区分好Service
还是InstanceList
的进行更新,通知,并且更新任期状态。
//不只是心跳
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
//放入所有的要更新服务信息的key
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums
List<String> batch = new ArrayList<>();
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
//遍历所有带来的数据的key
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.getLong("timestamp");
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue;
}
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
, getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
// update datum entry
String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
});
for (JSONObject datumJson : datumList) {
OPERATE_LOCK.lock();
Datum newDatum = null;
try {
Datum oldDatum = getDatum(datumJson.getString("key"));
if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
continue;
}
if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.getString("key");
serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
serviceDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.getString("key");
instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
instancesDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
RaftController的get
其实就是获取key
所有的服务信息传过去。
@GetMapping("/datum")
public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String keysString = WebUtils.required(request, "keys");
keysString = URLDecoder.decode(keysString, "UTF-8");
String[] keys = keysString.split(",");
List<Datum> datums = new ArrayList<Datum>();
for (String key : keys) {
Datum datum = raftCore.getDatum(key);
datums.add(datum);
}
return JSON.toJSONString(datums);
}
好了,基本的竞选和数据同步流程说完了,数据同步的前面也讲过了,会转发给leader
去同步,不过这里好像不是二阶段的,leader
直接去同步数据了,其他的细节自己再研究下好了,内部东西太多了,还是要花时间研究。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。