2023-08-02
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

从本章开始,我将正式进入 RPC 框架的原型开发阶段。我会从服务发布与订阅、通信协议设计、负载均衡机制、动态代理四个方面详细地讲解一个通用 RPC 框架的实现过程。

本章,我主要完成整个工程的搭建,以及服务发布与订阅功能的源码实现。

工程源码存放在Gitee,需要的童鞋请自行下载。

一、工程搭建

整个工程使用 Spring Boot 2.1.12.RELEASE + JDK 1.8.0_221 + Netty 4.1.65.Final 的技术栈,并使用 Zookeeper 3.4.14 作为注册中心。

Zookeeper的搭建我不再赘述,参考官网或我的专栏《分布式系统从理论到实战》

1.1 项目结构

整个工程划分为七个模块:

202308022228513411.png

  • rpc-provider :服务提供者,负责发布 RPC 服务,接收和处理 RPC 请求;
  • rpc-consumer :服务消费者,使用动态代理发起 RPC 远程调用,帮助使用者来屏蔽底层网络通信的细节;
  • rpc-registry :注册中心模块,提供服务注册、服务发现、负载均衡的基本功能;
  • rpc-protocol :网络通信模块,包含 RPC 协议的编解码器、序列化和反序列化工具等;
  • rpc-core :基础类库,提供通用的工具类以及模型定义,例如 RPC 请求和响应类、RPC 服务元数据类等;
  • rpc-facade :包含RPC 服务接口的存根,即服务提供者需要对外暴露的接口;
  • rpc-test :测试工程,用于RPC框架的自测。

各个模块之间的依赖关系,如下图:

202308022228518892.png

1.2 使用方式

参考Dubbo的使用方式,我们的RPC框架提供了两个核心注解:

  • @RpcService :rpc-provider模块中的服务提供方,通过@RpcService注解暴露 RPC 服务;
  • @RpcReference :rpc-consumer模块中的服务消费方,通过@RpcReference注解订阅 RPC 服务。

我接下来以一个示例,讲解该RPC框架的使用。

首先,服务提供方定义需要暴露的接口:

    public interface HelloService {
        String hello(String name);
    }

接着,服务提供方实现服务:

    @RpcService(service = HelloService.class, version = "1.0.0")
    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String name) {
            return "hello" + name;
        }
    }

最后,服务消费方引用接口存根,并订阅自己需要的服务:

    @RestController
    public class HelloController {
        @RpcReference(version = "1.0.0", timeout = 8000)
        private HelloService helloService;
    
        @RequestMapping(value = "/hello", method = RequestMethod.GET)
        public String sayHello() {
            return helloService.hello("hello world");
        }
    }

二、服务发布

我们先从模块rpc-provider入手,实现服务发布的整体流程,部分细节实现我会在后续章节讲解。rpc-provider 一共包含四个核心流程:

  • 启动服务,并暴露服务端口;
  • 基于@RpcService注解,扫描需要对外发布的服务,并将服务元数据信息发布到注册中心;
  • 接收 RPC 请求,解码后得到请求消息;
  • 提交请求至自定义线程池进行处理,并将处理结果写回客户端。

2.1 注解定义

服务提供者,需要定义发布的服务类型、版本等属性,主流的 RPC 框架都支持以 XML 或者注解方式定义。这里,采用注解方式来实现。

@RpcService

首先,定义 @RpcService 注解,作用于代表服务提供方的类上:

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Component
    public @interface RpcService {
    
        Class<?> service() default Object.class;
    
        String version() default "1.0";
    }

@RpcService 使用了@Component注解,所以可以将对象注入到Spring 容器中进行管理,那么 serviceversion的属性值怎么才能和 Bean 关联起来呢?这就需要我们对 Spring Bean 的生命周期以及 Bean 的可扩展点有所了解。

Spring 的 BeanPostProcessor 接口给提供了对 Bean 进行再加工的扩展点,常用于处理自定义注解。自定义的 Bean 可以通过实现 BeanPostProcessor 接口,在 Bean 实例化的前后加入自定义的逻辑处理。

所以,我们通过一个 RpcProviderInitializer 类,完成容器启动过程中的服务Bean装配,该类实现了 BeanPostProcessor 接口,可以对@RpcService注解的服务进行自定义处理:

    /**
     * RPC服务初始化器
     */
    public class RpcProviderInitializer implements InitializingBean, BeanPostProcessor {
        private static final Logger LOG = LoggerFactory.getLogger(RpcProviderInitializer.class);
    
        private final Integer serverPort;
        private final String serverAddress;
        private final RegistryService serviceRegistry;
        /**
         * 服务实例缓存<服务Key,服务实例对象>
         */
        private final Map<String, Object> rpcServiceMap = new ConcurrentHashMap<>();
    
        public RpcProviderInitializer(Integer serverPort, RegistryService serviceRegistry) {
            try {
                this.serverAddress = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                throw new RuntimeException("unknown host", e);
            }
            this.serverPort = serverPort;
            this.serviceRegistry = serviceRegistry;
        }
    
        @Override
        public void afterPropertiesSet() {
            new Thread(() -> {
                try {
                    startRpcServer();
                } catch (Exception e) {
                    LOG.error("start rpc server error.", e);
                }
            }).start();
        }
    
        private void startRpcServer() throws Exception {
            // 基于Netty启动RPC服务Server
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) {
                                socketChannel.pipeline()
                                        .addLast(new RpcEncoder())
                                        .addLast(new RpcDecoder())
                                        .addLast(new RpcRequestHandler(rpcServiceMap));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
                LOG.info("server addr {} started on port {}", this.serverAddress, this.serverPort);
                channelFuture.channel().closeFuture().sync();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            // 如果类注解了@RpcService,则执行服务注册
            RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
            if (rpcService != null) {
                // 服务类的全限定名
                String serviceName = rpcService.service().getName();
                // 服务版本号
                String serviceVersion = rpcService.version();
    
                try {
                    // 创建服务元数据对象
                    ServiceMeta serviceMeta = new ServiceMeta();
                    serviceMeta.setAddress(serverAddress);
                    serviceMeta.setPort(serverPort);
                    serviceMeta.setService(serviceName);
                    serviceMeta.setVersion(serviceVersion);
                    // 注册服务
                    serviceRegistry.register(serviceMeta);
                    // 缓存服务
                    String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
                    rpcServiceMap.put(serviceKey, bean);
                } catch (Exception e) {
                    LOG.error("failed to register service {}#{}", serviceName, serviceVersion, e);
                }
            }
            return bean;
        }
    }

上述 RpcProviderInitializer 类重写了 BeanPostProcessor.postProcessAfterInitialization 方法,对所有初始化完成后的 Bean 进行扫描。如果 Bean 包含 @RpcService 注解,那么获取注解上的信息,然后创建 ServiceMeta 对象,将服务元数据发布至注册中心,注册中心的实现我先暂且跳过,后续章节讲解。

RpcProviderInitializer 维护了一个 rpcServiceMap,缓存服务对象,在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务对象进行调用。

2.2 参数配置

服务提供方需要配置参数,我们不应该把这些参数写死在代码里,一般通过配置文件方式进行输入。我们定义三个参数,分别为:RPC服务端口 servicePort、注册中心地址 registryAddr 和注册中心类型 registryType,然后使用 Spring Boot 的 @ConfigurationProperties注解实现配置项的加载:

    @ConfigurationProperties(prefix = "rpc")
    public class RpcConfigProperties {
    
        /**
         * RPC服务端口
         */
        private int servicePort;
    
        /**
         * 服务注册中心地址
         */
        private String registryAddr;
    
        /**
         * 服务注册中心类型
         */
        private String registryType;
    
        //...省略get/set
    }

@ConfigurationProperties通过 prefix 属性指定配置参数的前缀,默认会与全局配置文件 application.properties 或者 application.yml 中的参数进行一一绑定。如果你想自定义一个配置文件,可以通过@PropertySource注解指定配置文件的位置。

下面我在 rpc-provider 模块的 resources 目录下创建全局配置文件 application.properties,并配置以上三个参数:

    rpc.service.port=2781
    rpc.registry.type=ZOOKEEPER
    rpc.registry.addr=127.0.0.1:2181

注意,只配置 @ConfigurationProperties 注解,Spring 容器并不能获取配置文件的内容并映射为对象,需要与@EnableConfigurationProperties 注解配合使用,该注解的作用是将声明了 @ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean。具体用法如下:

    @Configuration
    @EnableConfigurationProperties(RpcConfigProperties.class)
    public class RpcProviderAutoConfiguration {
    
        @Resource
        private RpcConfigProperties rpcProperties;
    
        @Bean
        public RpcProviderInitializer init() throws Exception {
            RegistryType type = RegistryType.valueOf(rpcProperties.getRegistryType());
            RegistryService serviceRegistry = RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);
            return new RpcProviderInitializer(rpcProperties.getServicePort(), serviceRegistry);
        }
    }

上述的@Configuration注解主要用于定义配置类,配置类内部可以包含多个@Bean注解的方法,用于创建Bean并注入到Spring容器中,这样就替换了传统的 XML 定义Bean方式。

三、服务订阅

服务消费者的实现要复杂一些,对于声明了 @RpcReference 注解的成员变量,我们需要通过动态代理,构造出一个可以真正可以进行 RPC 调用的 Bean,然后将它注册到 Spring 容器中。

3.1 注解定义

@RpcReference

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.FIELD)
    @Autowired
    public @interface RpcReference {
    
        String version() default "1.0";
    
        String registryType() default "ZOOKEEPER";
    
        String registryAddr() default "127.0.0.1:2181";
    
        long timeout() default 5000;
    }

@RpcReference 注解提供了服务版本 version、注册中心类型 registryType、注册中心地址 registryAddr 和超时时间 timeout 四个属性。接下来,我们需要使用这些属性,构造出一个自定义的 Bean,然后对该 Bean 的所有方法调用进行拦截。

3.2 Bean定义信息

由于@RpcReference注解是作用在类的字段上的,所以我们就得思考下,如何创建服务接口类对应的对象呢?比如,下面的HelloService接口:

    public class HelloController {
    
        @RpcReference(version = "1.0.0", timeout = 8000)
        private HelloService helloService;
    
        //...
    }

Spring 的 FactoryBean 接口,可以看成是创建Bean的工厂,可以帮助我们实现自定义 Bean 的创建,它的 getObject()方法用于返回一个对象。所以,我们可以将代理类对象的创建逻辑放在FactoryBean实现类中:

    public class RpcReferenceBean implements FactoryBean<Object> {
    
        private Class<?> interfaceClass;
    
        private String serviceVersion;
    
        private String registryType;
    
        private String registryAddr;
    
        private long timeout;
    
        private Object object;
    
        @Override
        public Object getObject() {
            return object;
        }
    
        /**
         * FactoryBean创建Bean对象时,会调用该方法对对象进行初始化
         * @throws Exception
         */
        public void init() throws Exception {
            // 创建服务提供方接口的代理对象
            RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));
            this.object = Proxy.newProxyInstance(
                    interfaceClass.getClassLoader(),
                    new Class<?>[]{interfaceClass},
                    new RpcInvokerProxy(serviceVersion, timeout, registryService));
        }
        //...省略get/set
    }

我们可以使用 Spring 的 BeanFactoryPostProcessor 扫描所有引用@RpcReference注解的类字段,生成自定义的RpcReferenceBean对象,注入到容器中,然后Spring容器会根据Bean定义信息创建对象。

BeanFactoryPostProcessor 是 Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行,所以 BeanFactoryPostProcessor 可以在 Bean 实例化之前获取 Bean 的配置元数据,并允许用户对其修改。而 BeanPostProcessor 是在 Bean 初始化 前后执行,它并不能修改 Bean 的配置信息。

    @Component
    public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
        private static final Logger LOG = LoggerFactory.getLogger(RpcConsumerPostProcessor.class);
    
        private ApplicationContext context;
    
        private ClassLoader classLoader;
    
        /**
         * Bean定义信息缓存:<服务提供方的类全限定名,Bean定义对象>
         */
        private final Map<String, BeanDefinition> rpcRefBeanDefinitions = new LinkedHashMap<>();
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.context = applicationContext;
        }
    
        @Override
        public void setBeanClassLoader(ClassLoader classLoader) {
            this.classLoader = classLoader;
        }
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            // 遍历Spring容器中所有的类
            for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
                BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
                String beanClassName = beanDefinition.getBeanClassName();
                if (beanClassName != null) {
                    Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
                    ReflectionUtils.doWithFields(clazz, new ReflectionUtils.FieldCallback() {
                        @Override
                        public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                            // 对类的字段引用进行处理
                            RpcConsumerPostProcessor.this.parseRpcReference(field);
                        }
                    });
                }
            }
    
            BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
            this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) -> {
                if (context.containsBean(beanName)) {
                    throw new IllegalArgumentException("spring context already has a bean named " + beanName);
                }
                registry.registerBeanDefinition(beanName,beanDefinition);
                LOG.info("registered RpcReferenceBean {} success.", beanName);
            });
        }
    
        private void parseRpcReference(Field field) {
            // 如果字段使用了@RpcReference注解,则进行处理
            RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);
            if (annotation != null) {
                // 创建Bean定义对象
                BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);
                builder.setInitMethodName("init");
                builder.addPropertyValue("interfaceClass", field.getType());
                builder.addPropertyValue("serviceVersion", annotation.version());
                builder.addPropertyValue("registryType", annotation.registryType());
                builder.addPropertyValue("registryAddr", annotation.registryAddr());
                builder.addPropertyValue("timeout", annotation.timeout());
    
                BeanDefinition beanDefinition = builder.getBeanDefinition();
                rpcRefBeanDefinitions.put(field.getName(), beanDefinition);
            }
        }
    }

RpcConsumerPostProcessor 类重写了 BeanFactoryPostProcessor.postProcessBeanFactory 方法,从 beanFactory 中获取所有 Bean 定义信息,然后对每个 Bean 的所有 field 进行检测,如果 field 声明了 @RpcReference 注解,则进行如下处理:

  1. 通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义,并为 RpcReferenceBean 的成员变量赋值,同时执行了创建对象时的初始化方法init,我们就是在该方法中创建代理对象的;
  2. 构造完 RpcReferenceBean 的定义之后,将RpcReferenceBean 的 BeanDefinition 重新注册到 Spring 容器中。

四、总结

本章,我对RPC框架的工程结构进行了讲解,着重介绍了服务提供者使用@RpcService注解是如何发布服务的,服务消费者使用@RpcReference注解是如何订阅服务的,特别要注意 @RpcReference 注解,被该注解修饰的field变量都会被构造成 RpcReferenceBean,然后为该RpcReferenceBean生成BeanDefinition对象注入到Spring容器中。

本章关于@RpcReference和@RpcService这两个注解的开发其实是很有参考意义的,我们的应用可以基于Spring框架开发各类自定义注解,封装出一套自己的开发框架,这在实际项目开发过程中是很常见的。

阅读全文