在分布式系统中,服务消费者和服务提供者都存在多个节点,所以 RPC 框架需要实现合理的负载均衡算法,控制流量能够均匀地分摊到每个服务提供者,避免单点故障导致的服务调用异常。所以,本章我就来实现 RPC 框架的负载均衡机制。
一、注册中心
服务消费者在发起 RPC 调用之前,需要知道服务提供者有哪些节点是可用的。由于服务提供者节点会存在上/下线的情况,所以服务消费者需要感知服务提供者节点的动态变化。
RPC 框架一般采用注册中心来实现服务的注册和发现,主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等。在本工程中,我使用 Zookeeper,并设计一个通用的注册中心接口,各类注册中心都可以按该接口规范进行扩展。
1.1 接口设计
注册中心会存储服务提供方的元数据信息,我们需要将服务元数据信息封装成一个对象,该对象包括 服务全限定名称 、 服务版本 、 服务地址/端口 :
/**
* 服务元数据
*/
public class ServiceMeta {
/**
* 服务类全限定名
*/
private String service;
/**
* 服务版本号
*/
private String version;
/**
* 服务实例地址
*/
private String address;
/**
* 服务端实例端口
*/
private Integer port;
//...省略get/set
}
接着,需要设计通用的注册中心管理接口,包含注册中心四个基本操作:
- 服务注册 register
- 服务注销 unRegister
- 服务发现 discovery
- 注册中心销毁 destroy
/**
* 服务注册管理
*/
public interface RegistryService {
void register(ServiceMeta serviceMeta) throws Exception;
void unRegister(ServiceMeta serviceMeta) throws Exception;
ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception;
void destroy() throws IOException;
}
来看具体的实现ZookeeperRegistryService:
public class ZookeeperRegistryService implements RegistryService {
/**
* 注册中心连接失败时的重试间隔
*/
public static final int BASE_SLEEP_TIME_MS = 1000;
/**
* 注册中心连接失败时的重试次数
*/
public static final int MAX_RETRIES = 3;
/**
* 注册中心中的RPC服务根节点路径
*/
public static final String ZK_BASE_PATH = "/rpc";
private final ServiceDiscovery<ServiceMeta> serviceDiscovery;
public ZookeeperRegistryService(String registryAddr) throws Exception {
// 1.连接Zookeeper注册中心
CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr,
new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
client.start();
JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);
// 2.基于Curator框架构建一个服务注册工具类
this.serviceDiscovery = ServiceDiscoveryBuilder
.builder(ServiceMeta.class) // 自定义数据
.client(client)
.serializer(serializer)
.basePath(ZK_BASE_PATH)
.build();
this.serviceDiscovery.start();
}
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
// 构建一个服务实例对象
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
// 注册服务实例
serviceDiscovery.registerService(serviceInstance);
}
@Override
public void unRegister(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
serviceDiscovery.unregisterService(serviceInstance);
}
@Override
public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
// 基于一致性Hash算法,选择一个服务节点
ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
if (instance != null) {
return instance.getPayload();
}
return null;
}
@Override
public void destroy() throws IOException {
serviceDiscovery.close();
}
}
上述代码,核心是创建了 ServiceDiscovery 对象, ServiceDiscovery 负责服务的注册和发现。
1.2 服务注册
服务注册通过ServiceDiscovery.register()
方法完成:
@Override
public void register(ServiceMeta serviceMeta) throws Exception {
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
// 构建一个服务实例对象
ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
.name(serviceKey)
.address(serviceMeta.getAddress())
.port(serviceMeta.getPort())
.payload(serviceMeta)
.build();
// 注册服务实例
serviceDiscovery.registerService(serviceInstance);
}
ServiceInstance 对象代表一个服务实例,包含服务名称 name、唯一标识 id、地址 address、端口 port 以及用户自定义的可选属性 payload,ServiceInstance 在 Zookeeper 服务器中的存储形式如下:
RpcProviderInitializer 在启动过程中,根据 @RpcService
注解识别需要发布的服务,然后调用 RegistryService 接口的 register() 方法,将服务发布到注册中心:
// RpcProviderInitializer.java
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 如果类注解了@RpcService,则执行服务注册
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
if (rpcService != null) {
// 服务类的全限定名
String serviceName = rpcService.service().getName();
// 服务版本号
String serviceVersion = rpcService.version();
try {
// 创建服务元数据对象
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setAddress(serverAddress);
serviceMeta.setPort(serverPort);
serviceMeta.setService(serviceName);
serviceMeta.setVersion(serviceVersion);
// 注册服务
serviceRegistry.register(serviceMeta);
// 缓存服务
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
rpcServiceMap.put(serviceKey, bean);
} catch (Exception e) {
LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
}
}
return bean;
}
至此,服务提供者在启动后就可以将 @RpcService
注解修饰的服务发布到注册中心了。
二、负载均衡
服务消费者在发起 RPC 调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。之前我提到了几种常用的负载均衡策略:Round-Robin 轮询、Weighted Round-Robin 权重轮询、Least Connections 最少连接数、一致性 Hash 等。
本节,我针对一致性 Hash 算法进行实现, 一致性 Hash 算法可以保证每个服务节点分摊的流量尽可能均匀,而且能够把服务节点伸缩带来的影响降到最低 。
一致性Hash算法的原理请参见我的专栏《分布式系统从理论到实战》。
2.1 一致性Hash实现
首先,定义一个通用的负载均衡接口,Round-Robin 、一致性 Hash 等负载均衡算法都需要实现该接口,接口定义如下:
/**
* 负载均衡器
* @param <T>
*/
public interface ServiceLoadBalancer<T> {
T select(List<T> servers, int hashCode);
}
上述的select() 方法的传入参数是一批服务节点以及客户端对象的 hashCode,针对 Zookeeper 的场景,我们可以实现一个比较通用的一致性 Hash 算法:
public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {
/**
* 每个实节点对应的虚拟节点数
*/
private final static int VIRTUAL_NODE_SIZE = 10;
private final static String VIRTUAL_NODE_SPLIT = "#";
@Override
public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {
// 构造一个Hash环
TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers);
// 选择一个节点
return allocateNode(ring, hashCode);
}
private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {
// 获取第一个大于等于该hashCode的KEY对应的节点
Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode);
if (entry == null) {
entry = ring.firstEntry();
}
return entry.getValue();
}
private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {
TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();
// 遍历每一个服务实例
for (ServiceInstance<ServiceMeta> instance : servers) {
// 每个实例映射10个虚拟节点
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
ring.put((buildHashKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);
}
}
return ring;
}
private String buildHashKey(ServiceInstance<ServiceMeta> instance) {
ServiceMeta payload = instance.getPayload();
return String.join(":", payload.getAddress(), String.valueOf(payload.getPort()));
}
}
JDK 提供了 TreeMap 数据结构,可以非常方便地构造哈希环。TreeMap 底层基于红黑树实现,可以维持Key的有序性,所以我们构造Hash环的思路就是:
- 遍历所有服务节点,为每个服务节点创建10个虚拟节点,每个虚拟节点的唯一标识是:
服务IP:服务端口#[虚拟节点编号]
,比如192.168.1.1:28083#1
; - 计算虚拟节点唯一标识的hash值,将该值作为Key,服务实例作为Value存入TreeMap;
- 当客户端请求一个服务节点时,以客户端hashcode作为KEY,去TreeMap查找,
TreeMap.ceilingEntry()
方法会返回大于或等于给定KEY的最小键值对,即为客户端对应要调用的服务节点。
2.2 服务发现
服务发现的实现思路比较简单,首先找出被调用服务所有的节点列表,然后通过 ServiceLoadBalancer
提供的负载均衡算法,找出一个相应的服务节点。具体代码实现如下:
@Override
public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
// 基于一致性Hash算法,选择一个服务节点
ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
if (instance != null) {
return instance.getPayload();
}
return null;
}
三、总结
本章,我对RPC框架的负载均衡机制进行了讲解,主要涉及服务注册和服务发现的负载均衡机制的讲解,我基于TreeMap实现了一致性Hash算法,事实上在生成hash值时,可以使用 Google Guava 工具库中的MurmurHash,它具有更好的防Hash碰撞的特性。
服务消费者通过服务发现接口获取到可调用的服务节点后,是通过动态代理机制完成请求调用的,我会在下一章详细讲解。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。