2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105855838

raft竞选基本流程

202309152319175221.png

MasterElection

前面说了选举测初始化,现在已经可以选举了,最开始的时候每个结点的状态都是FOLLOWER跟随者,这里就是一个开始随机时间的等待,只要开始选举了,就会等待,直到第一个任期时间到了,重置时间,为什么要重置呢,因为如果不重置的话就一直投票啦,只有当没有leader的时候或者长时间没接受到leader心跳(只有leader才会发心跳)才要投票,所以会一定时间间隔如果没有收到leader的心跳,才可能会开始投票,只要收到了,时间又会被重置,也就不需要选举leader。只要任期时间过了就开始拉票了:

        public class MasterElection implements Runnable {
            @Override
            public void run() {
                try {
    				//可以开始选举的标记
                    if (!peers.isReady()) {
                        return;
                    }
    				//获取本地结点
                    RaftPeer local = peers.local();
                    local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;//任期-500毫秒
    				//任期超时没到
                    if (local.leaderDueMs > 0) {
                        return;
                    }
    
                    // 重置任期超时时间和心跳时间,准备开始拉票
                    local.resetLeaderDue();
                    local.resetHeartbeatDue();
    				//拉票
                    sendVote();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while master election {}", e);
                }
    
            }

sendVote拉票

首先任期term+1,其实就是说明状态是新的,给自己头一票,然后状态设置为CANDIDATE候选者,然后把自己的信息发给其他集群结点,告诉其他结点我这次的任期状态,为谁投票。然后异步等待响应处理,决定leader

    public void sendVote() {
    
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
                    JSON.toJSONString(getLeader()), local.term);
    
                peers.reset();
    
                local.term.incrementAndGet();//轮次+1
                local.voteFor = local.ip;//为自己投票
                local.state = RaftPeer.State.CANDIDATE;//候选者
    
                Map<String, String> params = new HashMap<>(1);
                params.put("vote", JSON.toJSONString(local));
                for (final String server : peers.allServersWithoutMySelf()) {//除自己外的其他结点
                    final String url = buildURL(server, API_VOTE);// /v1/ns/raft/vote
                    try {
                        HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                                    return 1;
                                }
    							//解析其他结点的信息
                                RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
    
                                Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
                                //有成功响应
                                peers.decideLeader(peer);
    
                                return 0;
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }

其他结点处理拉票请求

RaftController的vote

接受拉票请求,然后调用raftCore.receivedVote处理。

        @PostMapping("/vote")
        public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
            RaftPeer peer = raftCore.receivedVote(
                JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class));
    
            return JSON.parseObject(JSON.toJSONString(peer));
        }

RaftCore的receivedVote

首先注意这个方法是synchronized 修饰的,也就是同时只能一个线程来访问,先来后到,先判断任期,如果任期状态比自己小的,那还是自己状态最新,就投自己,然后返回。如果拉票的结点任期状态新,就投他,然后状态改为FOLLOWER跟随者,重新计时,同步任期状态。为什么任期状态这个时候会一样,因为初始的时候可能是0,一旦超时时间过了,就开始竞选了,任期状态+1,变为CANDIDATE候选者了呀,这样就跟拉票的任期可能一样,都是1。还有可能拉票的那个很久没同步了,突然leader挂了,但是他的状态是旧的,所以也不能选他,只能选自己。

     
        public synchronized RaftPeer receivedVote(RaftPeer remote) {
            if (!peers.contains(remote)) {//不存在
                throw new IllegalStateException("can not find peer: " + remote.ip);
            }
            //本机RaftPeer
            RaftPeer local = peers.get(NetUtils.localServer());
            if (remote.term.get() <= local.term.get()) {//任期数少于自己的
                String msg = "received illegitimate vote" +
                    ", voter-term:" + remote.term + ", votee-term:" + local.term;
    
                Loggers.RAFT.info(msg);
                if (StringUtils.isEmpty(local.voteFor)) {
                    local.voteFor = local.ip;//没投过的话投自己
                }
    
                return local;
            }
            //设置任期到期时间,重新选举计时
            local.resetLeaderDue();
            //作为跟随者
            local.state = RaftPeer.State.FOLLOWER;
            local.voteFor = remote.ip;//投发来的结点
            local.term.set(remote.term.get());//同步term
    
            Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
    
            return local;
        }

接受投票后决定leader

RaftCore的decideLeader

拉票有结果返回后,就会开始决定leader,将投票者的信息更新进集群,然后遍历所有投票者,统计最多的候选人,如果票数过半就选为leader,并改变器状态,如果是本机的话还要进行LeaderElectFinishedEvent事件通知。

     public RaftPeer decideLeader(RaftPeer candidate) {
            peers.put(candidate.ip, candidate);//放入结果RaftPeer
    
            SortedBag ips = new TreeBag();
            int maxApproveCount = 0;
            String maxApprovePeer = null;
            for (RaftPeer peer : peers.values()) {
                if (StringUtils.isEmpty(peer.voteFor)) {
                    continue;
                }
                //有投票过的
                ips.add(peer.voteFor);
                if (ips.getCount(peer.voteFor) > maxApproveCount) {//超过最多选票数的就更新
                    maxApproveCount = ips.getCount(peer.voteFor);//最多选票数
                    maxApprovePeer = peer.voteFor;//最多候选者
                }
            }
            //超过半数
            if (maxApproveCount >= majorityCount()) {
                RaftPeer peer = peers.get(maxApprovePeer);//查找票最多的候选者
                peer.state = RaftPeer.State.LEADER;//选为leader
    
                if (!Objects.equals(leader, peer)) {//如果leader有改变的话就通知
                    leader = peer;
                    applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
                    Loggers.RAFT.info("{} has become the LEADER", leader.ip);
                }
            }
    
            return leader;
        }

现在有结点leader选出来了,但是其他结点不知道啊,所以要进行心跳同步,心跳同步只有leader可以,我们下篇来看。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文