2023-09-15  阅读(61)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105835207

Service模型大致结构

202309152318294741.png

DistroConsistencyServiceImpl的put临时实例集合一致性服务

添加集合,然后

        @Override
        public void put(String key, Record value) throws NacosException {
            onPut(key, value);
            taskDispatcher.addTask(key);
        }

onPut添加临时实例集合

如果是临时的就添加一个Datum,将实例集合放入。如果有监听器监听的,立即通知,否则就返回,怎么通知的后面说。

        //临时的实例
        public void onPut(String key, Record value) {
            //如果是临时的服务实例集合
            if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
                Datum<Instances> datum = new Datum<>();//创建临时数据
                datum.value = (Instances) value;
                datum.key = key;
                datum.timestamp.incrementAndGet();
                dataStore.put(key, datum);//放进一个map里
            }
    
            if (!listeners.containsKey(key)) {//没有监听器就返回
                return;
            }
            //有监听立即通知服务有改变
            notifier.addTask(key, ApplyAction.CHANGE);
        }

RaftConsistencyServiceImpl的put永久实例集合一致性服务

raft选举算法有关。

        @Override
        public void put(String key, Record value) throws NacosException {
            try {
                raftCore.signalPublish(key, value);
            } catch (Exception e) {
                Loggers.RAFT.error("Raft put failed.", e);
                throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
            }
        }

signalPublish

这个其实涉及到raft选举的协议,如果本服务不是leader就要交给leader去处理,就发一个http请求给leaderleader接受到之后还是会到他的signalPublish里处理。如果是leader的话就进行服务实例改变通知,通知本地的监听器,并且要同步到其他结点,使用过半机制,刚好CountDownLatch可以用,只要有过半响应成功就算同步成功,具体的细节后面会讲。

     public void signalPublish(String key, Record value) throws Exception {
    
            if (!isLeader()) {//不是leader
                JSONObject params = new JSONObject();
                params.put("key", key);
                params.put("value", value);
                Map<String, String> parameters = new HashMap<>(1);
                parameters.put("key", key);
                //交给leader去做/v1/ns/raft/datum
                raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
                return;
            }
    
            try {//是leader
                OPERATE_LOCK.lock();
                long start = System.currentTimeMillis();
                final Datum datum = new Datum();
                datum.key = key;
                datum.value = value;
                if (getDatum(key) == null) {
                    datum.timestamp.set(1L);
                } else {
                    datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
                }
    
                JSONObject json = new JSONObject();
                json.put("datum", datum);
                json.put("source", peers.local());
                //发布数据改变通知
                onPublish(datum, peers.local());
    
                final String content = JSON.toJSONString(json);
                //只要过半的结点数
                final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
                for (final String server : peers.allServersIncludeMyself()) {//遍历所有结点
                    if (isLeader(server)) {//自己算一次
                        latch.countDown();
                        continue;
                    }
                    final String url = buildURL(server, API_ON_PUB);///v1/ns/raft/datum/commit
                    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                    datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            latch.countDown();//异步完成
                            return 0;
                        }
    
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
    
                }
                //等待半数完成
                if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    // only majority servers return success can we consider this update success
                    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
                    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
                }
    
                long end = System.currentTimeMillis();
                Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
            } finally {
                OPERATE_LOCK.unlock();
            }
        }

TaskDispatcher的addTask

负载均衡的获取一个任务执行器,添加一个新任务,这个就是来做数据同步的,具体做什么后面一起说。

        public void addTask(String key) {
            taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
        }

TaskScheduler的addTask

放到一个阻塞队列里,做数据同步,同步到其他结点,具体的任务后面说。

    private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
    
    
     public void addTask(String key) {
                queue.offer(key);
            }

这样,服务实例的注册流程基本说完了,我们可以总结下。

  1. 如果服务不存在,就创建服务,并开启心跳检查,来检查服务中的每个实例,并将服务注册到监听器集合中,因为服务本身实现了监听器集合,其他集群的服务数据有改动的话会通知服务。
  2. 添加服务实例到服务集群中,没有集群就创建一个,并刷新老的服务实例集合。并将服务key添加到同步任务中,同步任务根据数量和超时限制进行服务器集群的同步。

其实中间可能还开启了好多任务,比如心跳检查ServiceClientBeatCheckTaskClusterHealthCheckTask,数据同步的TaskDispatcher,服务实例改变的通知器Notifier任务。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文