本章,我先对 RPC 请求调用和结果响应两个过程分别进行详细分析,然后基于Netty完成整个RPC框架的通信协议设计和开发。
RPC 请求的过程对于服务消费者来说是出站操作,对于服务提供者来说是入站操作:
- 服务消费者将 RPC 请求信息封装成 Protocol 对象,然后通过编码器 Encoder 进行二进制编码,最后直接发送至远端即可;
- 服务提供者收到请求后,将二进制数据交给解码器 Decoder,解码后再次生成 Protocol 对象,然后传递给 Handler 执行真正的请求执行。
RPC 响应的过程对于服务消费者来说是入站操作,对于服务提供者来说是出站操作:
- 服务提供者将响应结果封装成 Protocol 对象,然后通过 Encoder 编码发送给服务消费者;
- 服务消费者对响应结果进行解码,然后由 Handler 根据响应结果找到之前的对应请求,最后将响应结果返回。
一、协议设计
本节,我们来看RPC框架的协议设计,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。
我们把协议分为 协议头 Header 和 协议体 Body 两个部分:
- 协议头 Header 包含魔数、协议版本号、序列化算法、报文类型、状态、消息 ID、数据长度;
- 协议体 Body 只包含数据内容部分,数据内容的长度是不固定的。
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte |
+---------------------------------------------------------------+
| 状态 1byte | 消息 ID 8byte | 数据长度 4byte |
+---------------------------------------------------------------+
| 数据内容 (长度不定) |
+---------------------------------------------------------------+
1.1 协议实体类
RPC 请求和响应都可以使用上述协议进行通信,对应协议实体类的定义如下:
public class RpcProtocol<T> implements Serializable {
// 协议头
private MsgHeader header;
// 协议体
private T body;
}
协议头
public class MsgHeader implements Serializable {
// 魔数
private short magic;
// 协议版本号
private byte version;
// 序列化算法
private byte serialization;
// 报文类型
private byte msgType;
// 状态
private byte status;
// 消息 ID
private long requestId;
// 数据长度
private int msgLen;
}
协议体
在 RPC 请求调用的场景下,RpcProtocol 中泛型 T 对应 RpcRequest 类型,RpcRequest 主要包含 RPC 远程调用需要的必要参数,定义如下:
public class RpcRequest implements Serializable {
/**
* 服务版本号
*/
private String serviceVersion;
/**
* 服务类全限定名
*/
private String className;
/**
* 服务接口全限定名
*/
private String methodName;
/**
* 接口参数值
*/
private Object[] params;
/**
* 接口参数类型
*/
private Class<?>[] parameterTypes;
}
在 RPC 结果响应的场景下,RpcProtocol 中泛型 T 对应 RpcResponse 类型,RpcResponse 实体类的定义如下:
public class RpcResponse implements Serializable {
// 请求结果
private Object data;
// 错误信息
private String message;
}
1.2 序列化
上述我们定义的都是Java Bean,数据在底层以字节形式传输,所以我们需要使用序列化工具完成Java Bean和二进制数据之间的转换。目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等。
首先,我们定义一个通用的序列化接口 RpcSerialization,所有序列化算法扩展都必须实现该接口,RpcSerialization 接口分别提供了序列化 serialize()
和反序列化 deserialize()
方法:
public interface RpcSerialization {
<T> byte[] serialize(T obj) throws IOException;
<T> T deserialize(byte[] data, Class<T> clz) throws IOException;
}
接下来,我为 RpcSerialization 提供 Hessian 和 Json 两种序列化算法的实现类。以 HessianSerialization 为例,实现逻辑如下:
@Component
public class HessianSerialization implements RpcSerialization {
@Override
public <T> byte[] serialize(T object) {
if (object == null) {
throw new NullPointerException();
}
byte[] results;
HessianSerializerOutput hessianOutput;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
hessianOutput = new HessianSerializerOutput(os);
hessianOutput.writeObject(object);
hessianOutput.flush();
results = os.toByteArray();
} catch (Exception e) {
throw new SerializationException(e);
}
return results;
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clz) {
if (bytes == null) {
throw new NullPointerException();
}
T result;
try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
HessianSerializerInput hessianInput = new HessianSerializerInput(is);
result = (T) hessianInput.readObject(clz);
} catch (Exception e) {
throw new SerializationException(e);
}
return result;
}
}
为了能够支持不同序列化算法,我采用 简单工厂模式 来实现不同序列化算法之间的切换,具体实现如下:
public class SerializationFactory {
public static RpcSerialization getRpcSerialization(byte serializationType) {
SerializationTypeEnum typeEnum = SerializationTypeEnum.findByType(serializationType);
switch (typeEnum) {
case HESSIAN:
return new HessianSerialization();
case JSON:
return new JsonSerialization();
default:
throw new IllegalArgumentException("serialization type is illegal, " + serializationType);
}
}
}
二、编解码器
设计完协议之后,我们需要利用Netty中的Encoder和Decoder自动对字节数据进行编解码。
2.1 RpcEncoder
先来看编码器,RpcEncoder用于将Java Bean对象自动编码成字节数据,使用在两个场景:
- 服务消费者组装完成
RpcProtocol<RpcRequest>
对象后,调用writeAndFlush()
方法发起请求; - 服务提供者组装完成
RpcProtocol<RpcResponse>
对象后,调用writeAndFlush()
方法返回响应。
RpcEncoder 继承 MessageToByteEncoder,并重写 encode() 方法:
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
@Override
protected void encode(ChannelHandlerContext ctx,RpcProtocol<Object> msg, ByteBuf byteBuf) throws IOException {
MsgHeader header = msg.getHeader();
byteBuf.writeShort(header.getMagic());
byteBuf.writeByte(header.getVersion());
byteBuf.writeByte(header.getSerialization());
byteBuf.writeByte(header.getMsgType());
byteBuf.writeByte(header.getStatus());
byteBuf.writeLong(header.getRequestId());
RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(header.getSerialization());
byte[] data = rpcSerialization.serialize(msg.getBody());
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
2.2 RpcDecoder
再来看解码器,RpcDecoder用于将字节数据自动解码成Java Bean对象,并传递给下一个 Inbound 处理器,使用在两个场景:
- 服务消费者获取到响应后,将响应字节数据解码成
RpcProtocol<RpcResponse>
对象; - 服务提供者获取到请求后,将请求字节数据解码成
RpcProtocol<RpcRequest>
对象。
RpcDecoder 继承 MessageToByteDecoder,并重写 decode() 方法:
public class RpcDecoder extends ByteToMessageDecoder {
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
// 保证读到完整协议头
if (in.readableBytes() < ProtocolConstants.HEADER_TOTAL_LEN) {
return;
}
// mark读指针
in.markReaderIndex();
// 魔数
short magic = in.readShort();
if (magic != ProtocolConstants.MAGIC) {
throw new IllegalArgumentException("magic number is illegal, " + magic);
}
// 协议版本号
byte version = in.readByte();
// 序列号
byte serializeType = in.readByte();
// 报文类型
byte msgType = in.readByte();
// 状态
byte status = in.readByte();
// 请求ID
long requestId = in.readLong();
// 协议体长度
int dataLength = in.readInt();
// 协议体是否足够读取
if (in.readableBytes() < dataLength) {
// 重置读指针
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
MsgType msgTypeEnum = MsgType.findByType((int)msgType);
if (msgTypeEnum == null) {
return;
}
// 封装协议头对象
MsgHeader header = new MsgHeader();
header.setMagic(magic);
header.setVersion(version);
header.setSerialization(serializeType);
header.setStatus(status);
header.setRequestId(requestId);
header.setMsgType(msgType);
header.setMsgLen(dataLength);
RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(serializeType);
switch (msgTypeEnum) {
case REQUEST:
RpcRequest request = rpcSerialization.deserialize(data, RpcRequest.class);
if (request != null) {
RpcProtocol<RpcRequest> protocol = new RpcProtocol<>();
protocol.setHeader(header);
protocol.setBody(request);
out.add(protocol);
}
break;
case RESPONSE:
RpcResponse response = rpcSerialization.deserialize(data, RpcResponse.class);
if (response != null) {
RpcProtocol<RpcResponse> protocol = new RpcProtocol<>();
protocol.setHeader(header);
protocol.setBody(response);
out.add(protocol);
}
break;
case HEARTBEAT:
// TODO
break;
}
}
}
三、请求/响应处理
3.1 请求处理
在 RPC 请求调用的场景下,服务提供者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcRequest>
对象后,然后交给 RpcRequestHandler 执行 RPC 请求调用。
RpcRequestHandler 是一个 Inbound 处理器,继承 SimpleChannelInboundHandler 并重写 channelRead0()
方法,具体实现如下:
/**
* RPC请求处理Handler
*/
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
private static final Logger LOG = LoggerFactory.getLogger(RpcRequestHandler.class );
private final Map<String, Object> rpcServiceMap;
public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
this.rpcServiceMap = rpcServiceMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> protocol) {
// 异步处理RPC请求
RpcRequestProcessor.submitRequest(() -> {
// 封装响应
RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
MsgHeader header = protocol.getHeader();
header.setMsgType(MsgType.RESPONSE.getType().byteValue());
resProtocol.setHeader(header);
RpcResponse response = new RpcResponse();
resProtocol.setBody(response);
try {
// 处理请求(同步调用)
Object result = handle(protocol.getBody());
response.setData(result);
header.setStatus(MsgStatus.SUCCESS.getCode().byteValue());
} catch (Throwable throwable) {
header.setStatus( MsgStatus.FAIL.getCode().byteValue());
response.setMessage(throwable.toString());
LOG.error("process request {} error", header.getRequestId(), throwable);
}
ctx.writeAndFlush(resProtocol);
});
}
/**
* RPC请求处理
*/
private Object handle(RpcRequest request) throws Throwable {
String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
// 获取服务类
Object serviceBean = rpcServiceMap.get(serviceKey);
if (serviceBean == null) {
throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));
}
// 通过CGLIB的FastClass机制,执行方法
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParams();
FastClass fastClass = FastClass.create(serviceClass);
int methodIndex = fastClass.getIndex(methodName, parameterTypes);
return fastClass.invoke(methodIndex, serviceBean, parameters);
}
}
RPC 请求调用是比较耗时的,所以将 RPC 请求提交到自定义的业务线程池中执行。上述的 handle() 方法是真正执行 RPC 调用的地方,我会在后续章节讲解。根据 handle() 的执行情况,RpcProtocol<RpcResponse>
最终会被设置成功或者失败的状态,以及相应的请求结果或者错误信息,最终通过 writeAndFlush() 方法将数据写回服务消费者。
3.2 响应处理
在 RPC 响应处理的场景下,服务消费者的 RpcDecoder 将二进制数据解码成 RpcProtocol<RpcResponse>
对象后,然后交给 RpcResponseHandler 处理。
/**
* RPC响应处理Handler
*/
public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) {
long requestId = msg.getHeader().getRequestId();
// 获取与请求关联的响应对象
RpcFuture<RpcResponse> future = RpcServiceHelper.REQUEST_MAP.remove(requestId);
// 设置结果
future.getPromise().setSuccess(msg.getBody());
}
}
/**
* RPC服务工具类
*/
public class RpcServiceHelper {
/**
* 请求ID生成器
*/
public static final AtomicLong REQUEST_ID_GEN = new AtomicLong(0);
/**
* 请求/响应映射缓存
*/
public static final Map<Long, RpcFuture<RpcResponse>> REQUEST_MAP = new ConcurrentHashMap<>();
/**
* 根据服务名和版本号生成Key
*
* @param serviceName 服务名称
* @param serviceVersion 服务版本号
* @return
*/
public static String buildServiceKey(String serviceName, String serviceVersion) {
return String.join("#", serviceName, serviceVersion);
}
}
服务消费者在发起调用时,维护了请求编号 requestId
和 RpcFuture<RpcResponse>
的映射关系,RpcResponseHandler 会根据请求的 requestId
找到对应发起调用的 RpcFuture
,然后为 RpcFuture
设置响应结果。
/**
* RPC请求异步封装
*
* @param <T>
*/
public class RpcFuture<T> {
private Promise<T> promise;
private long timeout;
public RpcFuture(Promise<T> promise, long timeout) {
this.promise = promise;
this.timeout = timeout;
}
public Promise<T> getPromise() {
return promise;
}
public void setPromise(Promise<T> promise) {
this.promise = promise;
}
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
}
上述代码,我采用了 Netty 提供的 Promise 工具实现 RPC 请求的同步等待,Promise 模式本质是一种异步编程模型,我们可以先拿到一个查看任务执行结果的凭证,不必等待任务执行完毕,当我们需要获取任务执行结果时,再使用凭证提供的相关接口进行获取。
四、总结
本章,我对我们的RPC 框架的协议设计以及请求响应的编解码进行了讲解,自定义协议、编解码、序列化/反序列化都是实现远程通信的必备基础知识。下一章,我将对 RPC 框架的负载均衡机制进行讲解。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。