2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105857120

raft竞选基本流程

202309152319235621.png

RaftController的beat处理心跳

202309152319249592.png

RaftCore的receivedBeat

核心的列出来了,主要就是比对一下信息,更新本地信息。当然如果有需要同步信息的话,还要给leader发消息,获取同步的信息,我这里基本略过了,很长,有兴趣的可以细看。

        public RaftPeer receivedBeat(JSONObject beat) throws Exception {
            final RaftPeer local = peers.local();//本地的
            final RaftPeer remote = new RaftPeer();//远程的leader
            remote.ip = beat.getJSONObject("peer").getString("ip");
            remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
            remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
            remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
            remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
            remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
            //如果远程不是leader,无效
            if (remote.state != RaftPeer.State.LEADER) {
                Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
                    remote.state, JSON.toJSONString(remote));
                throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
            }
            //如果本地大于远程的,心跳信息过期了
            if (local.term.get() > remote.term.get()) {
                Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
                    , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
                throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
                    + ", beat-to-term: " + local.term.get());
            }
            //本地不是跟随者,可能是候选人,所以将自己变为跟随者,选远程的leader
            if (local.state != RaftPeer.State.FOLLOWER) {
    
                Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
                // mk follower
                local.state = RaftPeer.State.FOLLOWER;
                local.voteFor = remote.ip;
            }
    		//数据key集合
            final JSONArray beatDatums = beat.getJSONArray("datums");
            local.resetLeaderDue();//刷新竞选时间
            local.resetHeartbeatDue();
            //设置leader
            peers.makeLeader(remote);
            //不只是心跳,可能还要获取leader的数据同步
            if (!switchDomain.isSendBeatOnly()) {
    			...
            }
    
            return local;
        }

RaftPeerSet的makeLeader

因为现在集群里面可能有其以前的leader存在,并不是和新的leader是同一个,所以要去获得以前leader的信息进行更新,其实就是把集群信息更新下。其实除了leader可以获取到其他结点的信息,其他结点只知道自己和leader的最新信息。

    public RaftPeer makeLeader(RaftPeer candidate) {
            if (!Objects.equals(leader, candidate)) {//leader改变了
                leader = candidate;//设置
                applicationContext.publishEvent(new MakeLeaderEvent(this, leader));//通知
                Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",
                    leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));
            }
    
            for (final RaftPeer peer : peers.values()) {
                Map<String, String> params = new HashMap<>(1);
                if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {//如果存在其他以前的leader
                    try {//请求以前过期的leader的信息
                        String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);
                        HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}",
                                        response.getResponseBody(), peer.ip);
                                    peer.state = RaftPeer.State.FOLLOWER;
                                    return 1;
                                }
                                //更新
                                update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
    
                                return 0;
                            }
                        });
                    } catch (Exception e) {
                        peer.state = RaftPeer.State.FOLLOWER;
                        Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
                    }
                }
            }
            //更新
            return update(candidate);
        }

RaftController的getPeer获取结点信息

     @GetMapping("/peer")
        public JSONObject getPeer(HttpServletRequest request, HttpServletResponse response) {
            List<RaftPeer> peers = raftCore.getPeers();
            RaftPeer peer = null;
    
            for (RaftPeer peer1 : peers) {
                if (StringUtils.equals(peer1.ip, NetUtils.localServer())) {
                    peer = peer1;
                }
            }
    
            if (peer == null) {
                peer = new RaftPeer();
                peer.ip = NetUtils.localServer();
            }
    
            return JSON.parseObject(JSON.toJSONString(peer));
        }

还有一部分就是心跳带来的服务信息的同步下次说。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文