从本章开始,我将正式进入 RPC 框架的原型开发阶段。我会从服务发布与订阅、通信协议设计、负载均衡机制、动态代理四个方面详细地讲解一个通用 RPC 框架的实现过程。
本章,我主要完成整个工程的搭建,以及服务发布与订阅功能的源码实现。
工程源码存放在Gitee,需要的童鞋请自行下载。
一、工程搭建
整个工程使用 Spring Boot 2.1.12.RELEASE + JDK 1.8.0_221 + Netty 4.1.65.Final 的技术栈,并使用 Zookeeper 3.4.14 作为注册中心。
Zookeeper的搭建我不再赘述,参考官网或我的专栏《分布式系统从理论到实战》。
1.1 项目结构
整个工程划分为七个模块:
- rpc-provider :服务提供者,负责发布 RPC 服务,接收和处理 RPC 请求;
- rpc-consumer :服务消费者,使用动态代理发起 RPC 远程调用,帮助使用者来屏蔽底层网络通信的细节;
- rpc-registry :注册中心模块,提供服务注册、服务发现、负载均衡的基本功能;
- rpc-protocol :网络通信模块,包含 RPC 协议的编解码器、序列化和反序列化工具等;
- rpc-core :基础类库,提供通用的工具类以及模型定义,例如 RPC 请求和响应类、RPC 服务元数据类等;
- rpc-facade :包含RPC 服务接口的存根,即服务提供者需要对外暴露的接口;
- rpc-test :测试工程,用于RPC框架的自测。
各个模块之间的依赖关系,如下图:
1.2 使用方式
参考Dubbo的使用方式,我们的RPC框架提供了两个核心注解:
- @RpcService :rpc-provider模块中的服务提供方,通过
@RpcService
注解暴露 RPC 服务; - @RpcReference :rpc-consumer模块中的服务消费方,通过
@RpcReference
注解订阅 RPC 服务。
我接下来以一个示例,讲解该RPC框架的使用。
首先,服务提供方定义需要暴露的接口:
public interface HelloService {
String hello(String name);
}
接着,服务提供方实现服务:
@RpcService(service = HelloService.class, version = "1.0.0")
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "hello" + name;
}
}
最后,服务消费方引用接口存根,并订阅自己需要的服务:
@RestController
public class HelloController {
@RpcReference(version = "1.0.0", timeout = 8000)
private HelloService helloService;
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String sayHello() {
return helloService.hello("hello world");
}
}
二、服务发布
我们先从模块rpc-provider
入手,实现服务发布的整体流程,部分细节实现我会在后续章节讲解。rpc-provider
一共包含四个核心流程:
- 启动服务,并暴露服务端口;
- 基于
@RpcService
注解,扫描需要对外发布的服务,并将服务元数据信息发布到注册中心; - 接收 RPC 请求,解码后得到请求消息;
- 提交请求至自定义线程池进行处理,并将处理结果写回客户端。
2.1 注解定义
服务提供者,需要定义发布的服务类型、版本等属性,主流的 RPC 框架都支持以 XML 或者注解方式定义。这里,采用注解方式来实现。
@RpcService
首先,定义 @RpcService
注解,作用于代表服务提供方的类上:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
public @interface RpcService {
Class<?> service() default Object.class;
String version() default "1.0";
}
@RpcService 使用了@Component
注解,所以可以将对象注入到Spring 容器中进行管理,那么 service
、version
的属性值怎么才能和 Bean 关联起来呢?这就需要我们对 Spring Bean 的生命周期以及 Bean 的可扩展点有所了解。
Spring 的 BeanPostProcessor
接口给提供了对 Bean 进行再加工的扩展点,常用于处理自定义注解。自定义的 Bean 可以通过实现 BeanPostProcessor 接口,在 Bean 实例化的前后加入自定义的逻辑处理。
所以,我们通过一个 RpcProviderInitializer
类,完成容器启动过程中的服务Bean装配,该类实现了 BeanPostProcessor 接口,可以对@RpcService
注解的服务进行自定义处理:
/**
* RPC服务初始化器
*/
public class RpcProviderInitializer implements InitializingBean, BeanPostProcessor {
private static final Logger LOG = LoggerFactory.getLogger(RpcProviderInitializer.class);
private final Integer serverPort;
private final String serverAddress;
private final RegistryService serviceRegistry;
/**
* 服务实例缓存<服务Key,服务实例对象>
*/
private final Map<String, Object> rpcServiceMap = new ConcurrentHashMap<>();
public RpcProviderInitializer(Integer serverPort, RegistryService serviceRegistry) {
try {
this.serverAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new RuntimeException("unknown host", e);
}
this.serverPort = serverPort;
this.serviceRegistry = serviceRegistry;
}
@Override
public void afterPropertiesSet() {
new Thread(() -> {
try {
startRpcServer();
} catch (Exception e) {
LOG.error("start rpc server error.", e);
}
}).start();
}
private void startRpcServer() throws Exception {
// 基于Netty启动RPC服务Server
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new RpcRequestHandler(rpcServiceMap));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
LOG.info("server addr {} started on port {}", this.serverAddress, this.serverPort);
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 如果类注解了@RpcService,则执行服务注册
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
if (rpcService != null) {
// 服务类的全限定名
String serviceName = rpcService.service().getName();
// 服务版本号
String serviceVersion = rpcService.version();
try {
// 创建服务元数据对象
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setAddress(serverAddress);
serviceMeta.setPort(serverPort);
serviceMeta.setService(serviceName);
serviceMeta.setVersion(serviceVersion);
// 注册服务
serviceRegistry.register(serviceMeta);
// 缓存服务
String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
rpcServiceMap.put(serviceKey, bean);
} catch (Exception e) {
LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
}
}
return bean;
}
}
上述 RpcProviderInitializer 类重写了 BeanPostProcessor.postProcessAfterInitialization
方法,对所有初始化完成后的 Bean 进行扫描。如果 Bean 包含 @RpcService 注解,那么获取注解上的信息,然后创建 ServiceMeta 对象,将服务元数据发布至注册中心,注册中心的实现我先暂且跳过,后续章节讲解。
RpcProviderInitializer 维护了一个 rpcServiceMap,缓存服务对象,在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务对象进行调用。
2.2 参数配置
服务提供方需要配置参数,我们不应该把这些参数写死在代码里,一般通过配置文件方式进行输入。我们定义三个参数,分别为:RPC服务端口 servicePort、注册中心地址 registryAddr 和注册中心类型 registryType,然后使用 Spring Boot 的 @ConfigurationProperties
注解实现配置项的加载:
@ConfigurationProperties(prefix = "rpc")
public class RpcConfigProperties {
/**
* RPC服务端口
*/
private int servicePort;
/**
* 服务注册中心地址
*/
private String registryAddr;
/**
* 服务注册中心类型
*/
private String registryType;
//...省略get/set
}
@ConfigurationProperties
通过 prefix 属性指定配置参数的前缀,默认会与全局配置文件 application.properties 或者 application.yml 中的参数进行一一绑定。如果你想自定义一个配置文件,可以通过@PropertySource
注解指定配置文件的位置。
下面我在 rpc-provider
模块的 resources 目录下创建全局配置文件 application.properties
,并配置以上三个参数:
rpc.service.port=2781
rpc.registry.type=ZOOKEEPER
rpc.registry.addr=127.0.0.1:2181
注意,只配置 @ConfigurationProperties 注解,Spring 容器并不能获取配置文件的内容并映射为对象,需要与@EnableConfigurationProperties
注解配合使用,该注解的作用是将声明了 @ConfigurationProperties
注解的类注入为 Spring 容器中的 Bean。具体用法如下:
@Configuration
@EnableConfigurationProperties(RpcConfigProperties.class)
public class RpcProviderAutoConfiguration {
@Resource
private RpcConfigProperties rpcProperties;
@Bean
public RpcProviderInitializer init() throws Exception {
RegistryType type = RegistryType.valueOf(rpcProperties.getRegistryType());
RegistryService serviceRegistry = RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);
return new RpcProviderInitializer(rpcProperties.getServicePort(), serviceRegistry);
}
}
上述的@Configuration
注解主要用于定义配置类,配置类内部可以包含多个@Bean
注解的方法,用于创建Bean并注入到Spring容器中,这样就替换了传统的 XML 定义Bean方式。
三、服务订阅
服务消费者的实现要复杂一些,对于声明了 @RpcReference 注解的成员变量,我们需要通过动态代理,构造出一个可以真正可以进行 RPC 调用的 Bean,然后将它注册到 Spring 容器中。
3.1 注解定义
@RpcReference
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {
String version() default "1.0";
String registryType() default "ZOOKEEPER";
String registryAddr() default "127.0.0.1:2181";
long timeout() default 5000;
}
@RpcReference 注解提供了服务版本 version、注册中心类型 registryType、注册中心地址 registryAddr 和超时时间 timeout 四个属性。接下来,我们需要使用这些属性,构造出一个自定义的 Bean,然后对该 Bean 的所有方法调用进行拦截。
3.2 Bean定义信息
由于@RpcReference注解是作用在类的字段上的,所以我们就得思考下,如何创建服务接口类对应的对象呢?比如,下面的HelloService接口:
public class HelloController {
@RpcReference(version = "1.0.0", timeout = 8000)
private HelloService helloService;
//...
}
Spring 的 FactoryBean 接口,可以看成是创建Bean的工厂,可以帮助我们实现自定义 Bean 的创建,它的 getObject()
方法用于返回一个对象。所以,我们可以将代理类对象的创建逻辑放在FactoryBean实现类中:
public class RpcReferenceBean implements FactoryBean<Object> {
private Class<?> interfaceClass;
private String serviceVersion;
private String registryType;
private String registryAddr;
private long timeout;
private Object object;
@Override
public Object getObject() {
return object;
}
/**
* FactoryBean创建Bean对象时,会调用该方法对对象进行初始化
* @throws Exception
*/
public void init() throws Exception {
// 创建服务提供方接口的代理对象
RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));
this.object = Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvokerProxy(serviceVersion, timeout, registryService));
}
//...省略get/set
}
我们可以使用 Spring 的 BeanFactoryPostProcessor 扫描所有引用@RpcReference
注解的类字段,生成自定义的RpcReferenceBean对象,注入到容器中,然后Spring容器会根据Bean定义信息创建对象。
BeanFactoryPostProcessor 是 Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行,所以 BeanFactoryPostProcessor 可以在 Bean 实例化之前获取 Bean 的配置元数据,并允许用户对其修改。而 BeanPostProcessor 是在 Bean 初始化 前后执行,它并不能修改 Bean 的配置信息。
@Component
public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private static final Logger LOG = LoggerFactory.getLogger(RpcConsumerPostProcessor.class);
private ApplicationContext context;
private ClassLoader classLoader;
/**
* Bean定义信息缓存:<服务提供方的类全限定名,Bean定义对象>
*/
private final Map<String, BeanDefinition> rpcRefBeanDefinitions = new LinkedHashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// 遍历Spring容器中所有的类
for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
String beanClassName = beanDefinition.getBeanClassName();
if (beanClassName != null) {
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
ReflectionUtils.doWithFields(clazz, new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
// 对类的字段引用进行处理
RpcConsumerPostProcessor.this.parseRpcReference(field);
}
});
}
}
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) -> {
if (context.containsBean(beanName)) {
throw new IllegalArgumentException("spring context already has a bean named " + beanName);
}
registry.registerBeanDefinition(beanName,beanDefinition);
LOG.info("registered RpcReferenceBean {} success.", beanName);
});
}
private void parseRpcReference(Field field) {
// 如果字段使用了@RpcReference注解,则进行处理
RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);
if (annotation != null) {
// 创建Bean定义对象
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);
builder.setInitMethodName("init");
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceVersion", annotation.version());
builder.addPropertyValue("registryType", annotation.registryType());
builder.addPropertyValue("registryAddr", annotation.registryAddr());
builder.addPropertyValue("timeout", annotation.timeout());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitions.put(field.getName(), beanDefinition);
}
}
}
RpcConsumerPostProcessor 类重写了 BeanFactoryPostProcessor.postProcessBeanFactory
方法,从 beanFactory 中获取所有 Bean 定义信息,然后对每个 Bean 的所有 field 进行检测,如果 field 声明了 @RpcReference 注解,则进行如下处理:
- 通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义,并为 RpcReferenceBean 的成员变量赋值,同时执行了创建对象时的初始化方法
init
,我们就是在该方法中创建代理对象的; - 构造完 RpcReferenceBean 的定义之后,将RpcReferenceBean 的 BeanDefinition 重新注册到 Spring 容器中。
四、总结
本章,我对RPC框架的工程结构进行了讲解,着重介绍了服务提供者使用@RpcService
注解是如何发布服务的,服务消费者使用@RpcReference
注解是如何订阅服务的,特别要注意 @RpcReference 注解,被该注解修饰的field变量都会被构造成 RpcReferenceBean,然后为该RpcReferenceBean生成BeanDefinition对象注入到Spring容器中。
本章关于@RpcReference和@RpcService这两个注解的开发其实是很有参考意义的,我们的应用可以基于Spring框架开发各类自定义注解,封装出一套自己的开发框架,这在实际项目开发过程中是很常见的。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。