2023-08-02  阅读(0)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

在分布式系统中,服务消费者和服务提供者都存在多个节点,所以 RPC 框架需要实现合理的负载均衡算法,控制流量能够均匀地分摊到每个服务提供者,避免单点故障导致的服务调用异常。所以,本章我就来实现 RPC 框架的负载均衡机制。

一、注册中心

服务消费者在发起 RPC 调用之前,需要知道服务提供者有哪些节点是可用的。由于服务提供者节点会存在上/下线的情况,所以服务消费者需要感知服务提供者节点的动态变化。

RPC 框架一般采用注册中心来实现服务的注册和发现,主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等。在本工程中,我使用 Zookeeper,并设计一个通用的注册中心接口,各类注册中心都可以按该接口规范进行扩展。

1.1 接口设计

注册中心会存储服务提供方的元数据信息,我们需要将服务元数据信息封装成一个对象,该对象包括 服务全限定名称服务版本服务地址/端口

    /**
     * 服务元数据
     */
    public class ServiceMeta {
    
        /**
         * 服务类全限定名
         */
        private String service;
    
        /**
         * 服务版本号
         */
        private String version;
    
        /**
         * 服务实例地址
         */
        private String address;
    
        /**
         * 服务端实例端口
         */
        private Integer port;
    
        //...省略get/set
    }

接着,需要设计通用的注册中心管理接口,包含注册中心四个基本操作:

  • 服务注册 register
  • 服务注销 unRegister
  • 服务发现 discovery
  • 注册中心销毁 destroy
    /**
     * 服务注册管理
     */
    public interface RegistryService {
    
        void register(ServiceMeta serviceMeta) throws Exception;
    
        void unRegister(ServiceMeta serviceMeta) throws Exception;
    
        ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception;
    
        void destroy() throws IOException;
    }

来看具体的实现ZookeeperRegistryService:

    public class ZookeeperRegistryService implements RegistryService {
        /**
         * 注册中心连接失败时的重试间隔
         */
        public static final int BASE_SLEEP_TIME_MS = 1000;
    
        /**
         * 注册中心连接失败时的重试次数
         */
        public static final int MAX_RETRIES = 3;
    
        /**
         * 注册中心中的RPC服务根节点路径
         */
        public static final String ZK_BASE_PATH = "/rpc";
    
        private final ServiceDiscovery<ServiceMeta> serviceDiscovery;
    
        public ZookeeperRegistryService(String registryAddr) throws Exception {
            // 1.连接Zookeeper注册中心
            CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr,
                    new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
            client.start();
    
            JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
    
            // 2.基于Curator框架构建一个服务注册工具类
            this.serviceDiscovery = ServiceDiscoveryBuilder
                    .builder(ServiceMeta.class) // 自定义数据
                    .client(client)
                    .serializer(serializer)
                    .basePath(ZK_BASE_PATH)
                    .build();
            this.serviceDiscovery.start();
        }
    
        @Override
        public void register(ServiceMeta serviceMeta) throws Exception {
            String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
    
            // 构建一个服务实例对象
            ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                    .name(serviceKey)
                    .address(serviceMeta.getAddress())
                    .port(serviceMeta.getPort())
                    .payload(serviceMeta)
                    .build();
            // 注册服务实例
            serviceDiscovery.registerService(serviceInstance);
        }
    
        @Override
        public void unRegister(ServiceMeta serviceMeta) throws Exception {
            String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
            ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                    .name(serviceKey)
                    .address(serviceMeta.getAddress())
                    .port(serviceMeta.getPort())
                    .payload(serviceMeta)
                    .build();
            serviceDiscovery.unregisterService(serviceInstance);
        }
    
        @Override
        public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
            Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
            ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
            // 基于一致性Hash算法,选择一个服务节点
            ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
            if (instance != null) {
                return instance.getPayload();
            }
            return null;
        }
    
        @Override
        public void destroy() throws IOException {
            serviceDiscovery.close();
        }
    }

上述代码,核心是创建了 ServiceDiscovery 对象, ServiceDiscovery 负责服务的注册和发现。

1.2 服务注册

服务注册通过ServiceDiscovery.register()方法完成:

    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
    
        // 构建一个服务实例对象
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
            .name(serviceKey)
            .address(serviceMeta.getAddress())
            .port(serviceMeta.getPort())
            .payload(serviceMeta)
            .build();
        // 注册服务实例
        serviceDiscovery.registerService(serviceInstance);
    }

ServiceInstance 对象代表一个服务实例,包含服务名称 name、唯一标识 id、地址 address、端口 port 以及用户自定义的可选属性 payload,ServiceInstance 在 Zookeeper 服务器中的存储形式如下:

202308022229104681.png

RpcProviderInitializer 在启动过程中,根据 @RpcService 注解识别需要发布的服务,然后调用 RegistryService 接口的 register() 方法,将服务发布到注册中心:

    // RpcProviderInitializer.java
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 如果类注解了@RpcService,则执行服务注册
        RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
        if (rpcService != null) {
            // 服务类的全限定名
            String serviceName = rpcService.service().getName();
            // 服务版本号
            String serviceVersion = rpcService.version();
    
            try {
                // 创建服务元数据对象
                ServiceMeta serviceMeta = new ServiceMeta();
                serviceMeta.setAddress(serverAddress);
                serviceMeta.setPort(serverPort);
                serviceMeta.setService(serviceName);
                serviceMeta.setVersion(serviceVersion);
                // 注册服务
                serviceRegistry.register(serviceMeta);
                // 缓存服务
                String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
                rpcServiceMap.put(serviceKey, bean);
            } catch (Exception e) {
                LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
            }
        }
        return bean;
    }

至此,服务提供者在启动后就可以将 @RpcService 注解修饰的服务发布到注册中心了。

二、负载均衡

服务消费者在发起 RPC 调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。之前我提到了几种常用的负载均衡策略:Round-Robin 轮询、Weighted Round-Robin 权重轮询、Least Connections 最少连接数、一致性 Hash 等。

本节,我针对一致性 Hash 算法进行实现, 一致性 Hash 算法可以保证每个服务节点分摊的流量尽可能均匀,而且能够把服务节点伸缩带来的影响降到最低

一致性Hash算法的原理请参见我的专栏《分布式系统从理论到实战》

2.1 一致性Hash实现

首先,定义一个通用的负载均衡接口,Round-Robin 、一致性 Hash 等负载均衡算法都需要实现该接口,接口定义如下:

    /**
     * 负载均衡器
     * @param <T>
     */
    public interface ServiceLoadBalancer<T> {
        T select(List<T> servers, int hashCode);
    }

上述的select() 方法的传入参数是一批服务节点以及客户端对象的 hashCode,针对 Zookeeper 的场景,我们可以实现一个比较通用的一致性 Hash 算法:

    public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {
        /**
         * 每个实节点对应的虚拟节点数
         */
        private final static int VIRTUAL_NODE_SIZE = 10;
        private final static String VIRTUAL_NODE_SPLIT = "#";
    
        @Override
        public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {
            // 构造一个Hash环
            TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers);
            // 选择一个节点
            return allocateNode(ring, hashCode);
        }
    
        private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {
            // 获取第一个大于等于该hashCode的KEY对应的节点
            Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode);
            if (entry == null) {
                entry = ring.firstEntry();
            }
            return entry.getValue();
        }
    
        private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {
            TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();
            // 遍历每一个服务实例
            for (ServiceInstance<ServiceMeta> instance : servers) {
                // 每个实例映射10个虚拟节点
                for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                    ring.put((buildHashKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);
                }
            }
            return ring;
        }
    
        private String buildHashKey(ServiceInstance<ServiceMeta> instance) {
            ServiceMeta payload = instance.getPayload();
            return String.join(":", payload.getAddress(), String.valueOf(payload.getPort()));
        }
    }

JDK 提供了 TreeMap 数据结构,可以非常方便地构造哈希环。TreeMap 底层基于红黑树实现,可以维持Key的有序性,所以我们构造Hash环的思路就是:

  1. 遍历所有服务节点,为每个服务节点创建10个虚拟节点,每个虚拟节点的唯一标识是:服务IP:服务端口#[虚拟节点编号],比如192.168.1.1:28083#1
  2. 计算虚拟节点唯一标识的hash值,将该值作为Key,服务实例作为Value存入TreeMap;
  3. 当客户端请求一个服务节点时,以客户端hashcode作为KEY,去TreeMap查找,TreeMap.ceilingEntry()方法会返回大于或等于给定KEY的最小键值对,即为客户端对应要调用的服务节点。

2.2 服务发现

服务发现的实现思路比较简单,首先找出被调用服务所有的节点列表,然后通过 ServiceLoadBalancer 提供的负载均衡算法,找出一个相应的服务节点。具体代码实现如下:

    @Override
    public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
        // 基于一致性Hash算法,选择一个服务节点
        ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
        if (instance != null) {
            return instance.getPayload();
        }
        return null;
    }

三、总结

本章,我对RPC框架的负载均衡机制进行了讲解,主要涉及服务注册和服务发现的负载均衡机制的讲解,我基于TreeMap实现了一致性Hash算法,事实上在生成hash值时,可以使用 Google Guava 工具库中的MurmurHash,它具有更好的防Hash碰撞的特性。

服务消费者通过服务发现接口获取到可调用的服务节点后,是通过动态代理机制完成请求调用的,我会在下一章详细讲解。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文