2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105856832

raft竞选基本流程

202309152319206531.png

RaftCore的HeartBeat心跳

我们可以看到,心跳也有超时时间,到了后重置然后发心跳。

    @Override
            public void run() {
                try {
    
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    if (local.heartbeatDueMs > 0) {
                        return;
                    }
                    //重置心跳任期
                    local.resetHeartbeatDue();
    
                    sendBeat();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
                }
    
            }

RaftCore的HeartBeat心跳

但是可惜的是集群环境不是leader不能发,所以只有选为leader的那个结点才会发。每次发后会刷新任期时间,保证不用去竞选逻辑。然后封装leader结点信息,服务数据ServiceInstanceListkey等信息,发送给其他结点。收到响应后异步更新其他结点信息。

      public void sendBeat() throws IOException, InterruptedException {
                RaftPeer local = peers.local();
                if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {//不是leader且集群模式不能发
                    return;
                }
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
                }
                //重置任期
                local.resetLeaderDue();
    
                // build data
                JSONObject packet = new JSONObject();
                packet.put("peer", local);//放入leader结点信息
    
                JSONArray array = new JSONArray();
    
                if (switchDomain.isSendBeatOnly()) {
                    Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
                }
    
                if (!switchDomain.isSendBeatOnly()) {
                	//还要带数据key
                    for (Datum datum : datums.values()) {
    
                        JSONObject element = new JSONObject();
    					//加入不同类型的key
                        if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                            element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                        } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                            element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                        }
                        element.put("timestamp", datum.timestamp);
    
                        array.add(element);
                    }
                }
    			//带上数据的key
                packet.put("datums", array);
                // broadcast
                Map<String, String> params = new HashMap<String, String>(1);
                params.put("beat", JSON.toJSONString(packet));
    
                String content = JSON.toJSONString(params);
    
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                GZIPOutputStream gzip = new GZIPOutputStream(out);
                gzip.write(content.getBytes(StandardCharsets.UTF_8));
                gzip.close();
    
                byte[] compressedBytes = out.toByteArray();
                String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
                        content.length(), compressedContent.length());
                }
                //发送给除了自己以外的
                for (final String server : peers.allServersWithoutMySelf()) {
                    try {
                        final String url = buildURL(server, API_BEAT);
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("send beat to server " + server);
                        }
                        HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
                                        response.getResponseBody(), server);
                                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                    return 1;
                                }
                                //收到响应就添加RaftPeer
                                peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                                if (Loggers.RAFT.isDebugEnabled()) {
                                    Loggers.RAFT.debug("receive beat response from: {}", url);
                                }
                                return 0;
                            }
    
                            @Override
                            public void onThrowable(Throwable t) {
                                Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                    }
                }
    
            }

RaftPeerSet的update更新结点信息

其实就是新的结点信息覆盖旧的。

        public RaftPeer update(RaftPeer peer) {
            peers.put(peer.ip, peer);
            return peer;
        }

然后就是其他结点收到心跳后的处理,我们下次说吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文