2023-09-15
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105835207

Service模型大致结构

202309152318294741.png

DistroConsistencyServiceImpl的put临时实例集合一致性服务

添加集合,然后

        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            taskDispatcher.addTask(key);
        }

onPut添加临时实例集合

如果是临时的就添加一个Datum,将实例集合放入。如果有监听器监听的,立即通知,否则就返回,怎么通知的后面说。

        //临时的实例
        public void onPut(String key, Record value) {
            //如果是临时的服务实例集合
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                Datum<Instances> datum = new Datum<>();//创建临时数据
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                dataStore.put(key, datum);//放进一个map里
            }
    
            if (!listeners.containsKey(key)) {//没有监听器就返回
                return;
            }
            //有监听立即通知服务有改变
            notifier.addTask(key, ApplyAction.CHANGE);
        }

RaftConsistencyServiceImpl的put永久实例集合一致性服务

raft选举算法有关。

        @Override
        public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
            }
        }

signalPublish

这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leaderleader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功,具体的细节后面会讲。

     public void signalPublish(String key, Record value) throws Exception {
    
            if (!isLeader()) {//不是leader
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
                //交给leader去做/v1/ns/raft/datum
                raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
                return;
            }
    
            try {//是leader
                OPERATE_LOCK.lock();
                long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
    
                JSONObject json = new JSONObject();
                json.put("datum", datum);
                json.put("source", peers.local());
                //发布数据改变通知
                onPublish(datum, peers.local());
    
                final String content = JSON.toJSONString(json);
                //只要过半的结点数
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                for (final String server : peers.allServersIncludeMyself()) {//遍历所有结点
                    if (isLeader(server)) {//自己算一次
                        latch.countDown();
                        continue;
                    }
                    final String url = buildURL(server, API_ON_PUB);///v1/ns/raft/datum/commit
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                    datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            latch.countDown();//异步完成
                            return 0;
                        }
    
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
    
                }
                //等待半数完成
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
    
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }

TaskDispatcher的addTask

负载均衡的获取一个任务执行器,添加一个新任务,这个就是来做数据同步的,具体做什么后面一起说。

        public void addTask(String key) {
            taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
        }

TaskScheduler的addTask

放到一个阻塞队列里,做数据同步,同步到其他结点,具体的任务后面说。

    private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
    
    
     public void addTask(String key) {
                queue.offer(key);
            }

这样,服务实例的注册流程基本说完了,我们可以总结下。

  1. 如果服务不存在,就创建服务,并开启心跳检查,来检查服务中的每个实例,并将服务注册到监听器集合中,因为服务本身实现了监听器集合,其他集群的服务数据有改动的话会通知服务。
  2. 添加服务实例到服务集群中,没有集群就创建一个,并刷新老的服务实例集合。并将服务key添加到同步任务中,同步任务根据数量和超时限制进行服务器集群的同步。

其实中间可能还开启了好多任务,比如心跳检查ServiceClientBeatCheckTaskClusterHealthCheckTask,数据同步的TaskDispatcher,服务实例改变的通知器Notifier任务。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文