2023-09-15  阅读(3)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105856832

raft竞选基本流程

202309152319206531.png

RaftCore的HeartBeat心跳

我们可以看到,心跳也有超时时间,到了后重置然后发心跳。

    @Override
            public void run() {
                try {
    
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                    if (local.heartbeatDueMs > 0) {
                        return;
                    }
                    //重置心跳任期
                    local.resetHeartbeatDue();
    
                    sendBeat();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
                }
    
            }

RaftCore的HeartBeat心跳

但是可惜的是集群环境不是leader不能发,所以只有选为leader的那个结点才会发。每次发后会刷新任期时间,保证不用去竞选逻辑。然后封装leader结点信息,服务数据ServiceInstanceListkey等信息,发送给其他结点。收到响应后异步更新其他结点信息。

      public void sendBeat() throws IOException, InterruptedException {
                RaftPeer local = peers.local();
                if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {//不是leader且集群模式不能发
                    return;
                }
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
                }
                //重置任期
                local.resetLeaderDue();
    
                // build data
                JSONObject packet = new JSONObject();
                packet.put("peer", local);//放入leader结点信息
    
                JSONArray array = new JSONArray();
    
                if (switchDomain.isSendBeatOnly()) {
                    Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
                }
    
                if (!switchDomain.isSendBeatOnly()) {
                	//还要带数据key
                    for (Datum datum : datums.values()) {
    
                        JSONObject element = new JSONObject();
    					//加入不同类型的key
                        if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                            element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                        } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                            element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                        }
                        element.put("timestamp", datum.timestamp);
    
                        array.add(element);
                    }
                }
    			//带上数据的key
                packet.put("datums", array);
                // broadcast
                Map<String, String> params = new HashMap<String, String>(1);
                params.put("beat", JSON.toJSONString(packet));
    
                String content = JSON.toJSONString(params);
    
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                GZIPOutputStream gzip = new GZIPOutputStream(out);
                gzip.write(content.getBytes(StandardCharsets.UTF_8));
                gzip.close();
    
                byte[] compressedBytes = out.toByteArray();
                String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
    
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
                        content.length(), compressedContent.length());
                }
                //发送给除了自己以外的
                for (final String server : peers.allServersWithoutMySelf()) {
                    try {
                        final String url = buildURL(server, API_BEAT);
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("send beat to server " + server);
                        }
                        HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
                                        response.getResponseBody(), server);
                                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                    return 1;
                                }
                                //收到响应就添加RaftPeer
                                peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                                if (Loggers.RAFT.isDebugEnabled()) {
                                    Loggers.RAFT.debug("receive beat response from: {}", url);
                                }
                                return 0;
                            }
    
                            @Override
                            public void onThrowable(Throwable t) {
                                Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                    }
                }
    
            }

RaftPeerSet的update更新结点信息

其实就是新的结点信息覆盖旧的。

        public RaftPeer update(RaftPeer peer) {
            peers.put(peer.ip, peer);
            return peer;
        }

然后就是其他结点收到心跳后的处理,我们下次说吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文