2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105846799

nacos数据一致性服务执行流程

202309152318586301.png

RaftConsistencyServiceImpl的put永久数据同步

内部是调用RaftCoresignalPublish

        @Override
        public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
            }
        }

RaftCore的signalPublish

其实就是判断是不是leader,不是的话就要把数据发给leader,让他来处理,否则的话就进行数据改变通知,然后发送给所有结点,包括自身,用CountDownLatch计数,总共只要有过半的结点的计数就算成功了。

    public void signalPublish(String key, Record value) throws Exception {
    
            if (!isLeader()) {//不是leader
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
                //交给leader去做/v1/ns/raft/datum
                raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
                return;
            }
    
            try {//是leader
                OPERATE_LOCK.lock();
                long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
    
                JSONObject json = new JSONObject();
                json.put("datum", datum);
                json.put("source", peers.local());
                //发布数据改变通知
                onPublish(datum, peers.local());
    
                final String content = JSON.toJSONString(json);
                //只要过半的结点数
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                for (final String server : peers.allServersIncludeMyself()) {//遍历所有结点
                    if (isLeader(server)) {//自己算一次
                        latch.countDown();
                        continue;
                    }
                    final String url = buildURL(server, API_ON_PUB);///v1/ns/raft/datum/commit
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                    datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            latch.countDown();//异步完成
                            return 0;
                        }
    
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
    
                }
                //等待半数完成
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
    
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }

RaftCore的isLeader

判断是否是leader,单机自身就是,否则的话就判断IP

        public boolean isLeader() {
            return peers.isLeader(NetUtils.localServer());
        }

RaftPeerSet的isLeader

        //检查是否是leader
        public boolean isLeader(String ip) {
            if (STANDALONE_MODE) {//单机直接就是
                return true;
            }
    
            if (leader == null) {
                Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
                return false;
            }
            //集群的话要看IP对不对
            return StringUtils.equals(leader.ip, ip);
        }

RaftCore的onPublish

刷新任期是事件,添加通知任务,通知服务更新,服务也是监听器。

        public void onPublish(Datum datum, RaftPeer source) throws Exception {
            RaftPeer local = peers.local();
            if (datum.value == null) {
                Loggers.RAFT.warn("received empty datum");
                throw new IllegalStateException("received empty datum");
            }
            //不是leader不能干这个事
            if (!peers.isLeader(source.ip)) {
                Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}",
                    JSON.toJSONString(source), JSON.toJSONString(getLeader()));
                throw new IllegalStateException("peer(" + source.ip + ") tried to publish " +
                    "data but wasn't leader");
            }
            //过时了
            if (source.term.get() < local.term.get()) {
                Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}",
                    JSON.toJSONString(source), JSON.toJSONString(local));
                throw new IllegalStateException("out of date publish, pub-term:"
                    + source.term.get() + ", cur-term: " + local.term.get());
            }
            //重置任期
            local.resetLeaderDue();
    
            // if data should be persisted, usually this is true:
            if (KeyBuilder.matchPersistentKey(datum.key)) {
                raftStore.write(datum);//持久化到文件
            }
            //放入数据
            datums.put(datum.key, datum);
    
            if (isLeader()) {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            } else {
                if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                    //set leader term:
                    getLeader().term.set(source.term.get());
                    local.term.set(getLeader().term.get());
                } else {
                    local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
                }
            }
            raftStore.updateTerm(local.term.get());
            //添加任务
            notifier.addTask(datum.key, ApplyAction.CHANGE);
    
            Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
        }

Service的onChange

Notifier通知器会通知服务。

202309152318594232.png
服务进行更新:

202309152319000293.png

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文