2023-09-15  阅读(39)
原文作者:王伟王胖胖 原文地址: https://blog.csdn.net/wangwei19871103/article/details/105839360

心跳处理基本流程

202309152318396071.png

ClientBeatProcessor的run处理临时实例心跳

这里就体现出RsInfo的用途啦,其实就是保存下实例的相关信息,IP,端口,集群。遍历实例集群的实例,找出对应的IP和端口的实例进行状态更新。如果发现有问题,还要用PushService进行UDP进行通知,UDP端口是客户端请求的时候刷新服务实例列表的使用传上来的,客户端也有个PushReceiver就是来接受UDP报文信息,具体可以看这篇文章

     @Override
        public void run() {
            Service service = this.service;
            ...
            String ip = rsInfo.getIp();//IP
            String clusterName = rsInfo.getCluster();//集群名字
            int port = rsInfo.getPort();//端口
            Cluster cluster = service.getClusterMap().get(clusterName);//获取集群
            List<Instance> instances = cluster.allIPs(true);//获取集群所有的临时服务实例
    		//遍历更新对应的状态
            for (Instance instance : instances) {
                if (instance.getIp().equals(ip) && instance.getPort() == port) {
                    if (Loggers.EVT_LOG.isDebugEnabled()) {
                        Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                    }
                    instance.setLastBeat(System.currentTimeMillis());//刷新心跳时间
                    if (!instance.isMarked()) {//没被标记的
                        if (!instance.isHealthy()) {//不健康的
                            instance.setHealthy(true);//设置为健康
                            Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                            getPushService().serviceChanged(service);//UDP发送服务改变通知
                        }
                    }
                }
            }
        }

PushService的serviceChanged服务有改变UDP客户端

实现了监听接口:

202309152318400282.png
缓存里没有的话,服务有改变的时候用上下文来发送ServiceChangeEvent事件。

202309152318405603.png

UDP调度任务

其实就是遍历需要推送的客户端,然后封装数据,推送。

        @Override
        public void onApplicationEvent(ServiceChangeEvent event) {
            Service service = event.getService();
            String serviceName = service.getName();
            String namespaceId = service.getNamespaceId();
    
            Future future = udpSender.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        ...
                        //获取所有需要推送的PushClient
                        ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                        if (MapUtils.isEmpty(clients)) {
                            return;
                        }
    
                        Map<String, Object> cache = new HashMap<>(16);
                        long lastRefTime = System.nanoTime();
                        for (PushClient client : clients.values()) {
                            if (client.zombie()) {//超时的不删除不处理
                              	...
                                clients.remove(client.toString());
                               	...
                                continue;
                            }
    
                            Receiver.AckEntry ackEntry;
                           	...
                            String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                            byte[] compressData = null;
                            Map<String, Object> data = null;
                            //有压缩数据
                            if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                                org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                                compressData = (byte[]) (pair.getValue0());
                                data = (Map<String, Object>) pair.getValue1();
    							...
                            }
    						//准备UDP数据
                            if (compressData != null) {
                                ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                            } else {
                                ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                                if (ackEntry != null) {
                                    cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                                }
                            }
    
                            Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                                client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
    						//发送
                            udpPush(ackEntry);
                        }
                    } catch (Exception e) {
                        Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
    
                    } finally {
                    	//发送完删除
                        futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                    }
    
                }
            }, 1000, TimeUnit.MILLISECONDS);
    		//放缓存,不会重复发送
            futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    
        }

udpPush

判断是否大于重试次数,因为UDP不可靠,可能发出去没收到,也可能客户端发来的没收到,所以要尝试,后面有开启任务重试的。然后封装好数据发送,再开启一个Retransmitter任务10秒后看有没有成功响应,没有的话就重新发送。

     private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
            if (ackEntry == null) {
                Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
                return null;
            }
            //大于尝试的次数
            if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
                Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
                ackMap.remove(ackEntry.key);
                udpSendTimeMap.remove(ackEntry.key);
                failedPush += 1;
                return ackEntry;
            }
    
            try {
                if (!ackMap.containsKey(ackEntry.key)) {
                    totalPush++;
                }
                ackMap.put(ackEntry.key, ackEntry);
                udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
    
                Loggers.PUSH.info("send udp packet: " + ackEntry.key);
                udpSocket.send(ackEntry.origin);//发送UDP报文
    
                ackEntry.increaseRetryTime();
                //10秒没应答就再尝试一次
                executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
                    TimeUnit.MILLISECONDS);
    
                return ackEntry;
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
                    ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
                ackMap.remove(ackEntry.key);
                udpSendTimeMap.remove(ackEntry.key);
                failedPush += 1;
    
                return null;
            }
        }

Retransmitter

如果发现ackMap中还有,说明没收到客户端响应。

        public static class Retransmitter implements Runnable {
            Receiver.AckEntry ackEntry;
    
            public Retransmitter(Receiver.AckEntry ackEntry) {
                this.ackEntry = ackEntry;
            }
    
            @Override
            public void run() {
                if (ackMap.containsKey(ackEntry.key)) {//没接受到响应
                    Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
                    udpPush(ackEntry);
                }
            }
        }

Receiver

如果收到客户端响应的话会从ackMap中删除ackEntry.key

202309152318411544.png

好了,心跳的接受,处理,如果有数据改变的通知和UDP推送原理基本都讲了,具体细节看源码吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文