2023-08-02  阅读(34)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

本章,我先对 RPC 请求调用和结果响应两个过程分别进行详细分析,然后基于Netty完成整个RPC框架的通信协议设计和开发。

RPC 请求的过程对于服务消费者来说是出站操作,对于服务提供者来说是入站操作:

  • 服务消费者将 RPC 请求信息封装成 Protocol 对象,然后通过编码器 Encoder 进行二进制编码,最后直接发送至远端即可;
  • 服务提供者收到请求后,将二进制数据交给解码器 Decoder,解码后再次生成 Protocol 对象,然后传递给 Handler 执行真正的请求执行。

RPC 响应的过程对于服务消费者来说是入站操作,对于服务提供者来说是出站操作:

  • 服务提供者将响应结果封装成 Protocol 对象,然后通过 Encoder 编码发送给服务消费者;
  • 服务消费者对响应结果进行解码,然后由 Handler 根据响应结果找到之前的对应请求,最后将响应结果返回。

202308022228561621.png

一、协议设计

本节,我们来看RPC框架的协议设计,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。

我们把协议分为 协议头 Header协议体 Body 两个部分:

  • 协议头 Header 包含魔数、协议版本号、序列化算法、报文类型、状态、消息 ID、数据长度;
  • 协议体 Body 只包含数据内容部分,数据内容的长度是不固定的。
    +---------------------------------------------------------------+
    | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
    +---------------------------------------------------------------+
    | 状态 1byte |        消息 ID 8byte     |      数据长度 4byte     |
    +---------------------------------------------------------------+
    |                   数据内容 (长度不定)                          |
    +---------------------------------------------------------------+

1.1 协议实体类

RPC 请求和响应都可以使用上述协议进行通信,对应协议实体类的定义如下:

    public class RpcProtocol<T> implements Serializable {
        // 协议头
        private MsgHeader header; 
        // 协议体
        private T body; 
    }

协议头

    public class MsgHeader implements Serializable {
        // 魔数
        private short magic;     
        // 协议版本号
        private byte version; 
        // 序列化算法
        private byte serialization; 
        // 报文类型
        private byte msgType; 
        // 状态
        private byte status; 
        // 消息 ID
        private long requestId; 
        // 数据长度
        private int msgLen; 
    }

协议体

在 RPC 请求调用的场景下,RpcProtocol 中泛型 T 对应 RpcRequest 类型,RpcRequest 主要包含 RPC 远程调用需要的必要参数,定义如下:

    public class RpcRequest implements Serializable {
        /**
         * 服务版本号
         */
        private String serviceVersion;
        /**
         * 服务类全限定名
         */
        private String className;
        /**
         * 服务接口全限定名
         */
        private String methodName;
        /**
         * 接口参数值
         */
        private Object[] params;
        /**
         * 接口参数类型
         */
        private Class<?>[] parameterTypes;
    }

在 RPC 结果响应的场景下,RpcProtocol 中泛型 T 对应 RpcResponse 类型,RpcResponse 实体类的定义如下:

    public class RpcResponse implements Serializable {
        // 请求结果
        private Object data;
        // 错误信息
        private String message;
    }

1.2 序列化

上述我们定义的都是Java Bean,数据在底层以字节形式传输,所以我们需要使用序列化工具完成Java Bean和二进制数据之间的转换。目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等。

首先,我们定义一个通用的序列化接口 RpcSerialization,所有序列化算法扩展都必须实现该接口,RpcSerialization 接口分别提供了序列化 serialize() 和反序列化 deserialize() 方法:

    public interface RpcSerialization {
        <T> byte[] serialize(T obj) throws IOException;
        <T> T deserialize(byte[] data, Class<T> clz) throws IOException;
    }

接下来,我为 RpcSerialization 提供 Hessian 和 Json 两种序列化算法的实现类。以 HessianSerialization 为例,实现逻辑如下:

    @Component
    public class HessianSerialization implements RpcSerialization {
    
        @Override
        public <T> byte[] serialize(T object) {
            if (object == null) {
                throw new NullPointerException();
            }
            byte[] results;
    
            HessianSerializerOutput hessianOutput;
            try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
                hessianOutput = new HessianSerializerOutput(os);
                hessianOutput.writeObject(object);
                hessianOutput.flush();
                results = os.toByteArray();
            } catch (Exception e) {
                throw new SerializationException(e);
            }
    
            return results;
        }
    
        @Override
        public <T> T deserialize(byte[] bytes, Class<T> clz) {
            if (bytes == null) {
                throw new NullPointerException();
            }
            T result;
    
            try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
                HessianSerializerInput hessianInput = new HessianSerializerInput(is);
                result = (T) hessianInput.readObject(clz);
            } catch (Exception e) {
                throw new SerializationException(e);
            }
    
            return result;
        }
    }

为了能够支持不同序列化算法,我采用 简单工厂模式 来实现不同序列化算法之间的切换,具体实现如下:

    public class SerializationFactory {
    
        public static RpcSerialization getRpcSerialization(byte serializationType) {
            SerializationTypeEnum typeEnum = SerializationTypeEnum.findByType(serializationType);
    
            switch (typeEnum) {
                case HESSIAN:
                    return new HessianSerialization();
                case JSON:
                    return new JsonSerialization();
                default:
                    throw new IllegalArgumentException("serialization type is illegal, " + serializationType);
            }
        }
    }

二、编解码器

设计完协议之后,我们需要利用Netty中的Encoder和Decoder自动对字节数据进行编解码。

2.1 RpcEncoder

先来看编码器,RpcEncoder用于将Java Bean对象自动编码成字节数据,使用在两个场景:

  • 服务消费者组装完成RpcProtocol<RpcRequest>对象后,调用writeAndFlush()方法发起请求;
  • 服务提供者组装完成RpcProtocol<RpcResponse>对象后,调用writeAndFlush()方法返回响应。

RpcEncoder 继承 MessageToByteEncoder,并重写 encode() 方法:

    public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
        @Override
        protected void encode(ChannelHandlerContext ctx,RpcProtocol<Object> msg, ByteBuf byteBuf) throws IOException {
            MsgHeader header = msg.getHeader();
            byteBuf.writeShort(header.getMagic());
            byteBuf.writeByte(header.getVersion());
            byteBuf.writeByte(header.getSerialization());
            byteBuf.writeByte(header.getMsgType());
            byteBuf.writeByte(header.getStatus());
            byteBuf.writeLong(header.getRequestId());
    
            RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(header.getSerialization());
            byte[] data = rpcSerialization.serialize(msg.getBody());
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }

2.2 RpcDecoder

再来看解码器,RpcDecoder用于将字节数据自动解码成Java Bean对象,并传递给下一个 Inbound 处理器,使用在两个场景:

  • 服务消费者获取到响应后,将响应字节数据解码成RpcProtocol<RpcResponse>对象;
  • 服务提供者获取到请求后,将请求字节数据解码成RpcProtocol<RpcRequest>对象。

RpcDecoder 继承 MessageToByteDecoder,并重写 decode() 方法:

    public class RpcDecoder extends ByteToMessageDecoder {
        @Override
        public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
            // 保证读到完整协议头
            if (in.readableBytes() < ProtocolConstants.HEADER_TOTAL_LEN) {
                return;
            }
            // mark读指针
            in.markReaderIndex();
            // 魔数
            short magic = in.readShort();
            if (magic != ProtocolConstants.MAGIC) {
                throw new IllegalArgumentException("magic number is illegal, " + magic);
            }
            // 协议版本号
            byte version = in.readByte();
            // 序列号
            byte serializeType = in.readByte();
            // 报文类型
            byte msgType = in.readByte();
            // 状态
            byte status = in.readByte();
            // 请求ID
            long requestId = in.readLong();
            // 协议体长度
            int dataLength = in.readInt();
    
            // 协议体是否足够读取
            if (in.readableBytes() < dataLength) {
                // 重置读指针
                in.resetReaderIndex();
                return;
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
    
            MsgType msgTypeEnum = MsgType.findByType((int)msgType);
            if (msgTypeEnum == null) {
                return;
            }
            // 封装协议头对象
            MsgHeader header = new MsgHeader();
            header.setMagic(magic);
            header.setVersion(version);
            header.setSerialization(serializeType);
            header.setStatus(status);
            header.setRequestId(requestId);
            header.setMsgType(msgType);
            header.setMsgLen(dataLength);
    
            RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(serializeType);
            switch (msgTypeEnum) {
                case REQUEST:
                    RpcRequest request = rpcSerialization.deserialize(data, RpcRequest.class);
                    if (request != null) {
                        RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
                        protocol.setHeader(header);
                        protocol.setBody(request);
                        out.add(protocol);
                    }
                    break;
                case RESPONSE:
                   RpcResponse response = rpcSerialization.deserialize(data, RpcResponse.class);
                    if (response != null) {
                        RpcProtocol<RpcResponse> protocol = new RpcProtocol<>();
                        protocol.setHeader(header);
                        protocol.setBody(response);
                        out.add(protocol);
                    }
                    break;
                case HEARTBEAT:
                    // TODO
                    break;
            }
        }
    }

三、请求/响应处理

3.1 请求处理

在 RPC 请求调用的场景下,服务提供者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcRequest> 对象后,然后交给 RpcRequestHandler 执行 RPC 请求调用。

RpcRequestHandler 是一个 Inbound 处理器,继承 SimpleChannelInboundHandler 并重写 channelRead0() 方法,具体实现如下:

    /**
     * RPC请求处理Handler
     */
    public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
    
        private static final Logger LOG = LoggerFactory.getLogger(RpcRequestHandler.class );
    
        private final Map<String, Object> rpcServiceMap;
    
        public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
            this.rpcServiceMap = rpcServiceMap;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) {
    
            // 异步处理RPC请求
            RpcRequestProcessor.submitRequest(() -> {
                // 封装响应
                RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
    
                MsgHeader header = protocol.getHeader();
                header.setMsgType(MsgType.RESPONSE.getType().byteValue());
                resProtocol.setHeader(header);
    
                RpcResponse response = new RpcResponse();
                resProtocol.setBody(response);
    
                try {
                    // 处理请求(同步调用)
                    Object result = handle(protocol.getBody());
                    response.setData(result);
                    header.setStatus(MsgStatus.SUCCESS.getCode().byteValue());
                } catch (Throwable throwable) {
                    header.setStatus( MsgStatus.FAIL.getCode().byteValue());
                    response.setMessage(throwable.toString());
                    LOG.error("process request {} error", header.getRequestId(), throwable);
                }
                ctx.writeAndFlush(resProtocol);
            });
        }
    
        /**
         * RPC请求处理
         */
        private Object handle(RpcRequest request) throws Throwable {
            String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
            // 获取服务类
            Object serviceBean = rpcServiceMap.get(serviceKey);
    
            if (serviceBean == null) {
                throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
            }
    
            // 通过CGLIB的FastClass机制,执行方法
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParams();
    
            FastClass fastClass = FastClass.create(serviceClass);
            int methodIndex = fastClass.getIndex(methodName, parameterTypes);
            return fastClass.invoke(methodIndex, serviceBean, parameters);
        }
    }

RPC 请求调用是比较耗时的,所以将 RPC 请求提交到自定义的业务线程池中执行。上述的 handle() 方法是真正执行 RPC 调用的地方,我会在后续章节讲解。根据 handle() 的执行情况,RpcProtocol<RpcResponse> 最终会被设置成功或者失败的状态,以及相应的请求结果或者错误信息,最终通过 writeAndFlush() 方法将数据写回服务消费者。

3.2 响应处理

在 RPC 响应处理的场景下,服务消费者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcResponse> 对象后,然后交给 RpcResponseHandler 处理。

    /**
     * RPC响应处理Handler
     */
    public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) {
            long requestId = msg.getHeader().getRequestId();
            // 获取与请求关联的响应对象
            RpcFuture<RpcResponse> future = RpcServiceHelper.REQUEST_MAP.remove(requestId);
            // 设置结果
            future.getPromise().setSuccess(msg.getBody());
        }
    }
    /**
     * RPC服务工具类
     */
    public class RpcServiceHelper {
        /**
         * 请求ID生成器
         */
        public static final AtomicLong REQUEST_ID_GEN = new AtomicLong(0);
    
        /**
         * 请求/响应映射缓存
         */
        public static final Map<Long, RpcFuture<RpcResponse>> REQUEST_MAP = new ConcurrentHashMap<>();
    
        /**
         * 根据服务名和版本号生成Key
         *
         * @param serviceName    服务名称
         * @param serviceVersion 服务版本号
         * @return
         */
        public static String buildServiceKey(String serviceName, String serviceVersion) {
            return String.join("#", serviceName, serviceVersion);
        }
    }

服务消费者在发起调用时,维护了请求编号 requestIdRpcFuture<RpcResponse> 的映射关系,RpcResponseHandler 会根据请求的 requestId 找到对应发起调用的 RpcFuture,然后为 RpcFuture 设置响应结果。

    /**
     * RPC请求异步封装
     *
     * @param <T>
     */
    public class RpcFuture<T> {
        private Promise<T> promise;
        private long timeout;
    
        public RpcFuture(Promise<T> promise, long timeout) {
            this.promise = promise;
            this.timeout = timeout;
        }
    
        public Promise<T> getPromise() {
            return promise;
        }
    
        public void setPromise(Promise<T> promise) {
            this.promise = promise;
        }
    
        public long getTimeout() {
            return timeout;
        }
    
        public void setTimeout(long timeout) {
            this.timeout = timeout;
        }
    }

上述代码,我采用了 Netty 提供的 Promise 工具实现 RPC 请求的同步等待,Promise 模式本质是一种异步编程模型,我们可以先拿到一个查看任务执行结果的凭证,不必等待任务执行完毕,当我们需要获取任务执行结果时,再使用凭证提供的相关接口进行获取。

四、总结

本章,我对我们的RPC 框架的协议设计以及请求响应的编解码进行了讲解,自定义协议、编解码、序列化/反序列化都是实现远程通信的必备基础知识。下一章,我将对 RPC 框架的负载均衡机制进行讲解。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文