2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105858072

raft竞选基本流程

202309152319288051.png

集群结点请求同步数据

前面说了leader发送的心跳过去的时候会带有数据key的信息,其他结点可能会进行数据的同步。

RaftCore的receivedBeat获取信息

好长一段,其实就是根据leader带来的数据key,进行比对,要更新的就批量更新,收到后区分好Service还是InstanceList的进行更新,通知,并且更新任期状态。

    		//不只是心跳
            if (!switchDomain.isSendBeatOnly()) {
    
                Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
    			//放入所有的要更新服务信息的key
                for (Map.Entry<String, Datum> entry : datums.entrySet()) {
                    receivedKeysMap.put(entry.getKey(), 0);
                }
    
                // now check datums
                List<String> batch = new ArrayList<>();
    
                int processedCount = 0;
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                        beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
                }
                //遍历所有带来的数据的key
                for (Object object : beatDatums) {
                    processedCount = processedCount + 1;
    
                    JSONObject entry = (JSONObject) object;
                    String key = entry.getString("key");
                    final String datumKey;
    
                    if (KeyBuilder.matchServiceMetaKey(key)) {
                        datumKey = KeyBuilder.detailServiceMetaKey(key);
                    } else if (KeyBuilder.matchInstanceListKey(key)) {
                        datumKey = KeyBuilder.detailInstanceListkey(key);
                    } else {
                        // ignore corrupted key:
                        continue;
                    }
    
                    long timestamp = entry.getLong("timestamp");
    
                    receivedKeysMap.put(datumKey, 1);
    
                    try {
                        if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
                            continue;
                        }
    
                        if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                            batch.add(datumKey);
                        }
    
                        if (batch.size() < 50 && processedCount < beatDatums.size()) {
                            continue;
                        }
    
                        String keys = StringUtils.join(batch, ",");
    
                        if (batch.size() <= 0) {
                            continue;
                        }
    
                        Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
                            , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
    
                        // update datum entry
                        String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
                        HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    return 1;
                                }
    
                                List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
                                });
    
                                for (JSONObject datumJson : datumList) {
                                    OPERATE_LOCK.lock();
                                    Datum newDatum = null;
                                    try {
    
                                        Datum oldDatum = getDatum(datumJson.getString("key"));
    
                                        if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
                                            Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
                                            continue;
                                        }
    
                                        if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
                                            Datum<Service> serviceDatum = new Datum<>();
                                            serviceDatum.key = datumJson.getString("key");
                                            serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
                                            serviceDatum.value =
                                                JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
                                            newDatum = serviceDatum;
                                        }
    
                                        if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
                                            Datum<Instances> instancesDatum = new Datum<>();
                                            instancesDatum.key = datumJson.getString("key");
                                            instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
                                            instancesDatum.value =
                                                JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
                                            newDatum = instancesDatum;
                                        }
    
                                        if (newDatum == null || newDatum.value == null) {
                                            Loggers.RAFT.error("receive null datum: {}", datumJson);
                                            continue;
                                        }
    
                                        raftStore.write(newDatum);
    
                                        datums.put(newDatum.key, newDatum);
                                        notifier.addTask(newDatum.key, ApplyAction.CHANGE);
    
                                        local.resetLeaderDue();
    
                                        if (local.term.get() + 100 > remote.term.get()) {
                                            getLeader().term.set(remote.term.get());
                                            local.term.set(getLeader().term.get());
                                        } else {
                                            local.term.addAndGet(100);
                                        }
    
                                        raftStore.updateTerm(local.term.get());
    
                                        Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                            newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
    
                                    } catch (Throwable e) {
                                        Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
                                    } finally {
                                        OPERATE_LOCK.unlock();
                                    }
                                }
                                TimeUnit.MILLISECONDS.sleep(200);
                                return 0;
                            }
                        });
    
                        batch.clear();
    
                    } catch (Exception e) {
                        Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
                    }
    
                }
    
                List<String> deadKeys = new ArrayList<>();
                for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
                    if (entry.getValue() == 0) {
                        deadKeys.add(entry.getKey());
                    }
                }
    
                for (String deadKey : deadKeys) {
                    try {
                        deleteDatum(deadKey);
                    } catch (Exception e) {
                        Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
                    }
                }
    
            }

RaftController的get

其实就是获取key所有的服务信息传过去。

     @GetMapping("/datum")
        public String get(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
            response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
            response.setHeader("Cache-Control", "no-cache");
            response.setHeader("Content-Encode", "gzip");
            String keysString = WebUtils.required(request, "keys");
            keysString = URLDecoder.decode(keysString, "UTF-8");
            String[] keys = keysString.split(",");
            List<Datum> datums = new ArrayList<Datum>();
    
            for (String key : keys) {
                Datum datum = raftCore.getDatum(key);
                datums.add(datum);
            }
    
            return JSON.toJSONString(datums);
        }

好了,基本的竞选和数据同步流程说完了,数据同步的前面也讲过了,会转发给leader去同步,不过这里好像不是二阶段的,leader直接去同步数据了,其他的细节自己再研究下好了,内部东西太多了,还是要花时间研究。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文