2023-08-02
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

我在《透彻理解Java网络编程(二一)——Netty实战:服务发布与订阅》中给出了服务消费方的部分实现,服务消费方在调用服务时,应该只需要调用接口方法,然后就能拿到返回结果,不需要关心底层的网络通信、服务发现、负载均衡等具体细节。

那么,服务消费方在底层究竟是如何实现的呢?这其实就是RPC框架最核心的功能之一——动态代理。代理类可以看作是对被代理对象的包装,对目标方法的调用是通过代理类来完成的。所以,通过动态代理机制可以有效地将服务提供者和服务消费者进行解耦,隐藏了 RPC 调用的具体细节:

202308022229146081.png

本章,我就来完成服务消费者的动态代理机制的编码。我先来带大家回顾下Java中两种常见的动态代理机制: JDK 动态代理和 Cglib 动态代理。

一、JDK动态代理

JDK 动态代理,只能基于接口进行代理,对于没有继承任何接口的类,JDK 动态代理就没有用武之地了。JDK 动态代理的本质是通过反射调用代理类中的方法,它的实现依赖 java.lang.reflect 包中的两个核心类: InvocationHandler 接口和 Proxy 类。

1.1 InvocationHandler

每一个代理对象,必须提供 InvocationHandler 接口的实现类,该接口中只有一个invoke()方法。当我们调用 代理对象 的接口方法时,最终都会转发到 invoke() 方法执行具体的逻辑,也就是说,invoke方法返回的是代理对象的接口调用结果:

    // InvocationHandler.java
    
    public interface InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
    }

上述的几个参数解释如下:

  • proxy:代理对象(注意不是原始对象);
  • method:代理对象上调用的接口方法;
  • args:传入接口方法的参数。

1.2 Proxy

Proxy 类可以理解为动态创建代理类的工厂类,它提供了一组静态方法和接口用于生成代理对象,通常我们只需要使用newProxyInstance()方法:

    // Proxy.java
    
    public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) {
        //...
    }
  • loader参数:表示需要装载的类加载器ClassLoader;
  • interfaces参数:表示代理类实现的接口列表;
  • InvocationHandler接口类型的处理器:所有动态代理类的方法调用都会交由该处理器进行处理。

1.3 使用示例

下面我用一个例子讲解 JDK 动态代理的具体使用:

    public interface Subject {
        void operation();
    }
    
    public class RealSubject implements Subject {
        @Override
        public void operation() {
            System.out.println("RealSubject operation.");
        }
    }

创建代理对象工具类:

    public class SubjectProxy {
        private Object target;
    
        public SubjectProxy(Object target) {
            this.target = target;
        }
    
        public Object genSubjectInstance() {
            return Proxy.newProxyInstance(target.getClass().getClassLoader(),
                    target.getClass().getInterfaces(),
                    (proxy, method, args) -> {
                        System.out.println("start");
                        Object result = method.invoke(target, args);
                        System.out.println("end");
                        return result;
                    });
        }
    }

测试用例:

    public class SubjectProxyTest {
        @Test
        public void testProxy() {
            Subject Subject = new RealSubject();
            Subject subjectInstance = (Subject) new SubjectProxy(subject).genSubjectInstance();
            subjectInstance.operation();
        }
    }

执行结果如下:

    start
    RealSubject operation.
    end

1.4 底层原理

我们来看下JDK动态代理的底层原理,创建代理对象的入口是 Proxy.newProxyInstance() 这个静态方法:

    // Proxy.java
    
    public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
        throws IllegalArgumentException {
        Objects.requireNonNull(h);
    
        // 代理接口类
        final Class<?>[] intfs = interfaces.clone();
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }
    
        // 获取代理类
        Class<?> cl = getProxyClass0(loader, intfs);
        try {
            if (sm != null) {
                checkNewProxyPermission(Reflection.getCallerClass(), cl);
            }
            // 获取代理类的构造方法
            final Constructor<?> cons = cl.getConstructor(constructorParams);
            final InvocationHandler ih = h;
            if (!Modifier.isPublic(cl.getModifiers())) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        cons.setAccessible(true);
                        return null;
                    }
                });
            }
            // 创建并返回代理对象
            return cons.newInstance(new Object[]{h});
        } catch (IllegalAccessException|InstantiationException e) {
            throw new InternalError(e.toString(), e);
        } catch (InvocationTargetException e) {
            //...
        } catch (NoSuchMethodException e) {
            throw new InternalError(e.toString(), e);
        }
    }

通过 newProxyInstance() 方法的实现可以看到,JDK 动态代理是在 getProxyClass0() 方法中完成代理类的生成和加载的,getProxyClass0() 方法的具体实现如下:

    // Proxy.java
    
    private static final WeakCache<ClassLoader, Class<?>[], Class<?>>
            proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());
    
        private static Class<?> getProxyClass0(ClassLoader loader, Class<?>... interfaces) {
        if (interfaces.length > 65535) {
            throw new IllegalArgumentException("interface limit exceeded");
        }
    
        // 通过ProxyClassFactory创建实现指定接口的代理类
        return proxyClassCache.get(loader, interfaces);
    }

上述 proxyClassCache 是定义在 Proxy 类中的静态字段,WeakCache类型,用于缓存已创建过的代理类。WeakCache.get() 方法首先从缓存中查找代理类,如果查找不到,则创建 Factory 对象并调用其 get() 方法获取代理类:

    // WeakCache.java
    
    public V get(K key, P parameter) {
            Objects.requireNonNull(parameter);
            expungeStaleEntries();
    
            Object cacheKey = CacheKey.valueOf(key, refQueue);
            ConcurrentMap<Object, Supplier<V>> valuesMap = map.get(cacheKey);
            if (valuesMap == null) {
                ConcurrentMap<Object, Supplier<V>> oldValuesMap = map.putIfAbsent(cacheKey,
                                      valuesMap = new ConcurrentHashMap<>());
                if (oldValuesMap != null) {
                    valuesMap = oldValuesMap;
                }
            }
    
            Object subKey = Objects.requireNonNull(subKeyFactory.apply(key, parameter));
            Supplier<V> supplier = valuesMap.get(subKey);
            Factory factory = null;
    
            while (true) {
                if (supplier != null) {
                    V value = supplier.get();
                    if (value != null) {
                        return value;
                    }
                }
                if (factory == null) {
                    factory = new Factory(key, parameter, subKey, valuesMap);
                }
    
                if (supplier == null) {
                    supplier = valuesMap.putIfAbsent(subKey, factory);
                    if (supplier == null) {
                        supplier = factory;
                    }
                } else {
                    if (valuesMap.replace(subKey, supplier, factory)) {
                        supplier = factory;
                    } else {
                        supplier = valuesMap.get(subKey);
                    }
                }
            }
        }

Factory 是 WeakCache 中的内部类,Factory.get() 方法最终会调用 ProxyClassFactory.apply() 方法创建并加载代理类。ProxyClassFactory.apply() 方法首先会检测代理类需要实现的接口集合,然后确定代理类的名称,之后创建代理类并将其写入文件中,最后加载代理类,返回对应的 Class 对象用于后续的实例化代理类对象。该方法的具体实现如下:

    // ProxyClassFactory.java
    
    public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {
        Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
    
        // ...interfaces校验
    
        // ... 选择定义代理类的包名
    
        // 代理类的名称是通过包名、代理类名称前缀以及编号这三项组成的
        long num = nextUniqueNumber.getAndIncrement();
        String proxyName = proxyPkg + proxyClassNamePrefix + num;
    
        // 生成代理类,并写入文件
        byte[] proxyClassFile = ProxyGenerator.generateProxyClass(proxyName, interfaces, accessFlags);
        try {
            // 加载代理类,并返回Class对象
            return defineClass0(loader, proxyName, proxyClassFile, 0, proxyClassFile.length);
        } catch (ClassFormatError e) {
            throw new IllegalArgumentException(e.toString());
        }
    }

ProxyGenerator.generateProxyClass() 方法会按照指定的名称和接口集合生成代理类的字节码,并根据条件决定是否保存到磁盘上。该方法的具体代码如下:

    public static byte[] generateProxyClass(final String name, Class[] interfaces) {
    
        // 动态生成代理类的字节码
        ProxyGenerator gen = new ProxyGenerator(name, interfaces);
        final byte[] classFile = gen.generateClassFile();
    
        // 如果saveGeneratedFiles值为true,会将生成的代理类的字节码保存到文件中
        if (saveGeneratedFiles) { 
            java.security.AccessController.doPrivileged(
                new java.security.PrivilegedAction() {
                    public Void run() {
                        // ...省略try/catch代码块
                        FileOutputStream file = new FileOutputStream(dotToSlash(name) + ".class");
                        file.write(classFile);
                        file.close();
                        return null;
                    }
                }
            );
        }
        return classFile; // 返回上面生成的代理类的字节码
    }

最后,为了清晰地看到JDK动态生成的代理类的真正定义,我们需要将上述生成的代理类的字节码进行反编译。上述示例为RealSubject生成的代理类,反编译后得到的代码如下:

    public final class $Proxy37 extends Proxy implements Subject {  // 实现了Subject接口
    
        // ...省略从Object类继承下来的相关方法和属性
        private static Method m3;
    
        static {
            // 记录了operation()方法对应的Method对象
            m3 = Class.forName("com.xxx.Subject").getMethod("operation", new Class[0]);
        }
    
        // 构造方法的参数就是我们在示例中使用的DemoInvokerHandler对象
        public $Proxy11(InvocationHandler var1) throws {
            super(var1); 
        }
    
        public final void operation() throws {
            // 调用DemoInvokerHandler对象的invoke()方法
            // 最终调用RealSubject对象的对应方法
            super.h.invoke(this, m3, (Object[]) null);
        }
    }

至此JDK 动态代理的基本使用以及核心原理就介绍完了。总结一下,JDK 动态代理的实现原理是动态创建代理类并通过指定类加载器进行加载,在创建代理对象时将InvocationHandler对象作为构造参数传入。当调用代理对象时,会调用 InvocationHandler.invoke() 方法,从而执行代理逻辑,并最终调用真正业务对象的相应方法。

二、Cglib动态代理

Cglib 动态代理,底层基于 ASM 字节码框架,对指定类以字节码的方式生成一个子类,并重写其中的方法,以此来实现动态代理。Cglib 所代理的类,其类型不受限制,也无须实现任何接口,可以做到对目标类的零侵入。

Cglib 动态代理创建的是目标类的子类, 目标类必须要有无参构造函数,而且目标类不要用 final 进行修饰

Cglib 动态代理的实现需要依赖两个核心组件:MethodInterceptor 接口和 Enhancer 类,类似于 JDK 动态代理中的InvocationHandler 接口和 Proxy 类:

  • Enhancer :指定要代理的目标对象以及实际处理代理逻辑的对象,最终通过调用 create() 方法得到代理对象,对这个对象所有的非 final 方法的调用都会转发给 MethodInterceptor 进行处理;
  • MethodInterceptor :动态代理对象的方法调用都会转发到intercept方法进行增强。

2.1 MethodInterceptor

MethodInterceptor 接口只有一个 intercept() 方法,所有被代理类的方法执行最终都会转发到 intercept() 方法中执行,真实方法的执行逻辑则通过 Method 或者 MethodProxy 对象进行调用:

    // MethodInterceptor.java
    
    public interface MethodInterceptor extends Callback {
        public Object intercept(Object obj, java.lang.reflect.Method method, Object[] args,
                                   MethodProxy proxy) throws Throwable;
    }

2.2 Enhancer

Enhancer 类是 Cglib 中的一个字节码增强器,我们可以通过 Enhancer.setCallback() 设置 Callback 接口,对代理类方法的前后增加一些自定义行为。

    // Enhancer.java
    
    public void setCallback(final Callback callback) {
        setCallbacks(new Callback[]{ callback });
    }

MethodInterceptor 接口默认就继承了 Callback 接口。

2.3 使用示例

下面,我以一个示例,讲解 Cglib 动态代理的具体使用。首先,引入Maven依赖:

    <dependency>
        <groupId>cglib</groupId>
        <artifactId>cglib</artifactId>
        <version>3.3.0</version>
    </dependency>

CglibTransactionProxy的实现如下:

    public class CglibTransactionProxy implements MethodInterceptor {
        private Enhancer enhancer = new Enhancer();
        private Object target;
    
        public CglibTransactionProxy(Object target) {
            this.target = target;
        }
    
        public Object genProxyInstance() {
            enhancer.setSuperclass(target.getClass());
            enhancer.setCallback(this);
            return enhancer.create();
        }
    
        @Override
        public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
            System.out.println("start transaction");
            Object result = methodProxy.invokeSuper(object, args);
            System.out.println("submit transaction");
            return result;
        }
    }

测试用例:

    public class CglibTransactionProxyTest {
    
        public static void main(String[] args) {
            UserDao userDao = new UserDaoImpl();
            UserDao proxyInstance = (UserDao) new CglibTransactionProxy(userDao).genProxyInstance();
            proxyInstance.insert();
        }
    }

执行结果如下:

    start transaction
    insert user success.
    submit transaction

三、Javassist

Javassist 是一个开源的生成 Java 字节码的类库 ,其主要优点在于简单、快速,直接使用Javassist 提供的 Java API 就能动态修改类的结构,或是动态生成类。

3.1 使用示例

Javassist 的使用比较简单,首先来看如何使用 Javassist 提供的 Java API 动态创建类。示例代码如下:

    public class JavassistMain {
    
        public static void main(String[] args) throws Exception {
            // 创建ClassPool
            ClassPool cp = ClassPool.getDefault(); 
    
            // 要生成的类名称为com.test.JavassistDemo
            CtClass clazz = cp.makeClass("com.test.JavassistDemo");
    
            StringBuffer body = null;
    
            // 创建字段,指定了字段类型、字段名称、字段所属的类
            CtField field = new CtField(cp.get("java.lang.String"), "prop", clazz);
    
            // 指定该字段使用private修饰
            field.setModifiers(Modifier.PRIVATE);
    
            // 设置prop字段的getter/setter方法
            clazz.addMethod(CtNewMethod.setter("getProp", field));
            clazz.addMethod(CtNewMethod.getter("setProp", field));
    
            // 设置prop字段的初始化值,并将prop字段添加到clazz中
            clazz.addField(field, CtField.Initializer.constant("MyName"));
    
            // 创建构造方法,指定了构造方法的参数类型和构造方法所属的类
            CtConstructor ctConstructor = new CtConstructor(new CtClass[]{}, clazz);
    
            // 设置方法体
            StringBuffer body = new StringBuffer();
            body.append("{\n prop=\"MyName\";\n}");
            ctConstructor.setBody(body.toString());
            // 将构造方法添加到clazz中
            clazz.addConstructor(ctConstructor);
    
            // 创建execute()方法,指定了方法返回值、方法名称、方法参数列表以及方法所属的类
            CtMethod ctMethod = new CtMethod(CtClass.voidType, "execute", new CtClass[]{}, clazz);
    
            // 指定该方法使用public修饰
            ctMethod.setModifiers(Modifier.PUBLIC);
    
            // 设置方法体
            body = new StringBuffer();
            body.append("{\n System.out.println(\"execute():\" " + "+ this.prop);");
            body.append("\n}");
    
            ctMethod.setBody(body.toString());
    
            // 将execute()方法添加到clazz中
            clazz.addMethod(ctMethod); 
    
            // 将上面定义的JavassistDemo类保存到指定的目录
            clazz.writeFile("/Users/xxx/"); 
    
            // 加载clazz类,并创建对象
            Class<?> c = clazz.toClass();
            Object o = c.newInstance();
    
            // 调用execute()方法
            Method method = o.getClass().getMethod("execute", new Class[]{});
            method.invoke(o, new Object[]{});
        }
    }

执行上述代码之后,在指定的目录下可以找到生成的 JavassistDemo.class 文件,将其反编译,得到 JavassistDemo 的代码如下:

    public class JavassistDemo {
        private String prop = "MyName";
    
        public JavassistDemo() {
            prop = "MyName";
        }
    
        public void setProp(String paramString) {
            this.prop = paramString;
        }
    
        public String getProp() {
            return this.prop;
        }
    
        public void execute() {
            System.out.println("execute():" + this.prop);
        }
    }

3.2 动态代理

Javassist 也可以实现动态代理功能,底层的原理也是通过创建目标类的子类的方式实现的。这里使用 Javassist 为上面生成的 JavassitDemo 创建一个代理对象,具体实现如下:

    public class JavassitMain2 {
        public static void main(String[] args) throws Exception {
            ProxyFactory factory = new ProxyFactory();
    
            // 指定父类,ProxyFactory会动态生成继承该父类的子类
            factory.setSuperclass(JavassistDemo.class);
    
            // 设置过滤器,判断哪些方法调用需要被拦截
            factory.setFilter(new MethodFilter() {
                public boolean isHandled(Method m) {
                    if (m.getName().equals("execute")) {
                        return true;
                    }
                    return false;
                }
            });
    
            // 设置拦截处理
            factory.setHandler(new MethodHandler() {
                @Override
                public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) 
                    throws Throwable {
                    System.out.println("前置处理");
                    Object result = proceed.invoke(self, args);
                    System.out.println("执行结果:" + result);
                    System.out.println("后置处理");
                    return result;
                }
            });
    
            // 创建JavassistDemo的代理类,并创建代理对象
            Class<?> c = factory.createClass();
            JavassistDemo JavassistDemo = (JavassistDemo) c.newInstance();
            JavassistDemo.execute(); // 执行execute()方法,会被拦截
            System.out.println(JavassistDemo.getProp());
        }
    }

Javassist可以直接使用 Java 语言的字符串生成类,还是比较好用的。 Javassist 的性能也比较好,是 Dubbo 默认的代理生成方式

四、RPC调用

《透彻理解Java网络编程(二一)——Netty实战:服务发布与订阅》一章中,我讲解了 @RpcReference 注解的实现过程,通过一个自定义的 RpcReferenceBean 完成代理对象的创建。

4.1 代理对象创建

先来看代理对象的创建,这里直接使用了JDK动态代理:

    package com.tpvlog.rpc.consumer.annotation;
    
    import com.tpvlog.rpc.consumer.RpcInvokerProxy;
    import com.tpvlog.rpc.registry.RegistryFactory;
    import com.tpvlog.rpc.registry.RegistryService;
    import com.tpvlog.rpc.registry.RegistryType;
    import org.springframework.beans.factory.FactoryBean;
    
    import java.lang.reflect.Proxy;
    
    public class RpcReferenceBean implements FactoryBean<Object> {
    
        private Class<?> interfaceClass;
    
        private String serviceVersion;
    
        private String registryType;
    
        private String registryAddr;
    
        private long timeout;
    
        private Object object;
    
        /**
         * FactoryBean创建Bean对象时,会调用该方法对对象进行初始化
         * @throws Exception
         */
        public void init() throws Exception {
            // 创建服务提供方接口的代理对象
            RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));
            this.object = Proxy.newProxyInstance(
                    interfaceClass.getClassLoader(),
                    new Class<?>[]{interfaceClass},
                    new RpcInvokerProxy(serviceVersion, timeout, registryService));
        }
    }

4.2 代理调用

RpcInvokerProxy 是实现动态代理逻辑的核心所在,其中包含 RPC 调用时的底层网络通信、服务发现、负载均衡等具体细节:

    public class RpcInvokerProxy implements InvocationHandler {
    
        private final String serviceVersion;
        private final long timeout;
        private final RegistryService registryService;
    
        public RpcInvokerProxy(String serviceVersion, long timeout, RegistryService registryService) {
            this.serviceVersion = serviceVersion;
            this.timeout = timeout;
            this.registryService = registryService;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            long requestId = RpcServiceHelper.REQUEST_ID_GEN.incrementAndGet();
    
            RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
    
            // 协议头
            MsgHeader header = new MsgHeader();
            header.setMagic(ProtocolConstants.MAGIC);
            header.setVersion(ProtocolConstants.VERSION);
            header.setRequestId(requestId);
            header.setSerialization( SerializationTypeEnum.HESSIAN.getType().byteValue());
            header.setMsgType( MsgType.REQUEST.getType().byteValue());
            header.setStatus(MsgStatus.FAIL.getCode().byteValue());
            protocol.setHeader(header);
    
            // 协议体
            RpcRequest request = new RpcRequest();
            request.setServiceVersion(this.serviceVersion);
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParams(args);
            protocol.setBody(request);
    
            // 发起请求
            RpcConsumer rpcConsumer = new RpcConsumer();
            RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout);
            // 关联请求和响应Future对象
            RpcServiceHelper.REQUEST_MAP.put(requestId, future);
            rpcConsumer.sendRequest(protocol, this.registryService);
    
            return future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS).getData();
        }
    }

RpcInvokerProxy实现了 InvocationHandler 接口的 invoke() 方法,代理对象的 RPC 接口调用时,都会转发到 invoke() 方法上,invoke() 的核心流程分为三步:

  1. 构造 RPC 协议对象;
  2. 发起 RPC 远程调用;
  3. 等待 RPC 调用执行结果。

我们来看RpcConsumer.sendRequest()方法的请求调用:

    public class RpcConsumer {
        private static final Logger LOG = LoggerFactory.getLogger(RpcConsumer.class);
    
        private final Bootstrap bootstrap;
        private final EventLoopGroup eventLoopGroup;
    
        public RpcConsumer() {
            this.eventLoopGroup = new NioEventLoopGroup(4);
            this.bootstrap = new Bootstrap();
            this.bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new RpcEncoder())
                                    .addLast(new RpcDecoder())
                                    .addLast(new RpcResponseHandler());
                        }
                    });
        }
    
        public void sendRequest(RpcProtocol<RpcRequest> protocol, RegistryService registryService) throws Exception {
            // 1.选择一个服务提供方实例
            RpcRequest request = protocol.getBody();
            Object[] params = request.getParams();
            String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
            int invokerHashCode = request.getParams().length > 0 ? params[0].hashCode() : serviceKey.hashCode();
            ServiceMeta serviceMetadata = registryService.discovery(serviceKey, invokerHashCode);
            if (serviceMetadata == null) {
                throw new RuntimeException("No service provider founded:" + request);
            }
    
            // 2.基于Netty发起请求
            ChannelFuture future = bootstrap.connect(serviceMetadata.getAddress(), serviceMetadata.getPort()).sync();
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        LOG.info("connect rpc server {} on port {} success.", serviceMetadata.getAddress(), serviceMetadata.getPort());
                    } else {
                        LOG.error("connect rpc server {} on port {} failed.", serviceMetadata.getAddress(), serviceMetadata.getPort());
                        future.cause().printStackTrace();
                        eventLoopGroup.shutdownGracefully();
                    }
                }
            });
            future.channel().writeAndFlush(protocol);
        }
    }

4.3 请求处理

当服务提供者收到 RPC 请求后,由 RpcRequestHandler 最终执行 RPC 请求调用:

    /**
     * RPC请求处理Handler
     */
    public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
    
        private static final Logger LOG = LoggerFactory.getLogger(RpcRequestHandler.class );
    
        private final Map<String, Object> rpcServiceMap;
    
        public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
            this.rpcServiceMap = rpcServiceMap;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) {
    
            // 异步处理RPC请求
            RpcRequestProcessor.submitRequest(() -> {
                // 封装响应
                RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
    
                MsgHeader header = protocol.getHeader();
                header.setMsgType(MsgType.RESPONSE.getType().byteValue());
                resProtocol.setHeader(header);
    
                RpcResponse response = new RpcResponse();
                resProtocol.setBody(response);
    
                try {
                    // 处理请求(同步调用)
                    Object result = handle(protocol.getBody());
                    response.setData(result);
                    header.setStatus(MsgStatus.SUCCESS.getCode().byteValue());
                } catch (Throwable throwable) {
                    header.setStatus( MsgStatus.FAIL.getCode().byteValue());
                    response.setMessage(throwable.toString());
                    LOG.error("process request {} error", header.getRequestId(), throwable);
                }
                ctx.writeAndFlush(resProtocol);
            });
        }
        //...
    }

因为 RPC 请求调用是比较耗时的,所以这里将 RPC 请求提交到自定义的业务线程池中执行,我们来看 handle() 方法的处理逻辑:

    // RpcRequestHandler.java
    
    private Object handle(RpcRequest request) throws Throwable {
        String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
        // 获取服务类
        Object serviceBean = rpcServiceMap.get(serviceKey);
    
        if (serviceBean == null) {
            throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
        }
    
        // 通过CGLIB的FastClass机制,执行方法
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParams();
    
        FastClass fastClass = FastClass.create(serviceClass);
        int methodIndex = fastClass.getIndex(methodName, parameterTypes);
        return fastClass.invoke(methodIndex, serviceBean, parameters);
    }

rpcServiceMap 中存放着服务提供者所有对外发布的服务接口,我们可以通过服务名+版本号找到对应的服务接口。然后通过 Cglib 提供的 FastClass 机制直接调用方法。

Cglib 中 MethodProxy 对象就是采用了 FastClass 机制,它可以和 Method 对象完成同样的事情,但是相比于反射性能更高。

FastClass 机制并没有采用反射的方式调用被代理的方法,而是运行时动态生成一个新的 FastClass 子类,向子类中写入直接调用目标方法的逻辑。同时该子类会为代理类分配一个 int 类型的 index 索引,FastClass 即可通过 index 索引定位到需要调用的方法。

五、总结

本章,我对动态代理的基本原理进行了讲解,并使用动态代理技术完成了 RPC 请求的调用和处理。动态代理技术是 RPC 框架的核心技术之一,也是很重要的一个性能优化点。

在 JDK 1.8 版本之后 JDK 动态代理在运行多次之后比 Cglib 的速度更快了,但是它还是有使用的局限性;虽然 Javassist 字节码生成的性能相比 JDK 动态代理和 Cglib 动态代理更好,但是 Javassist 在生成动态代理类上性能较慢的。

至此,基于Netty实现的RPC框架实战到此结束,源码我放在Gitee上,需要的童鞋自行下载阅读。本专栏我实现的 RPC 框架只是一个基础的demo, RPC 框架还有更多高阶特性值得我们去挖掘,如服务治理、线程池隔离、集群容错、熔断限流等,感兴趣的童鞋可以去阅读下最新版本的Dubbo3源码,然后基于该demo打磨自己的RPC框架。

阅读全文