基于Netty实现简化的RPC
一些理论
- 当你的项目太大,业务越来越多的时候,需要将服务拆分,RPC就可以用于服务于服务之间的调用问题。系统中的内部服务之间的调用用RPC。
- RPC的架构主要包括三个部分:
- Register注册中心:将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。
- Server服务提供者:提供服务接口的定义和实现类。
- Client服务消费者:通过远程代理对象调用远程服务。
- RPC就是将以下这些步骤封装起来,使得客户端能够像调用本地服务一样调用远程的服务。
- 接收调用
- 将方法参数等封装成能够进行网络传输的消息体序列化后发送到服务端
- 将服务端处理的结果反序列化后返回给客户端。
实践
服务端代码
- 服务端在Handler部分根据传过来的RPC请求体进行解析,调用相应的方法,返回RPC相应消息体。
- 上一步的解析部分通过Spring的反射获取类名和方法名。
RPC请求消息体和RPC响应消息体
- 调用的接口全限定名
- 调用接口中的方法名
- 方法返回类型
- 方法参数类型数组
- 方法参数值数组
/**
* RPC请求体
*/
@Data
public class RpcRequestMessage extends Message{
//调用的接口全限定名
private String className;
//调用接口中的方法名
private String methodName;
//方法返回类型
private Class<?> returnType;
//方法参数类型数组
private Class[] parameterTypes;
//方法参数值数组
private Object[] parameters;
public RpcRequestMessage(int sequenceId, String className, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameters) {
super(sequenceId);
this.className = className;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameters = parameters;
}
@Override
public int getMessageType() {
return MessageConstant.RPC_REQUEST_MESSAGE;
}
}
处理Rpc请求的Handler
/**
* 处理Rpc请求的Handler,通过反射的机制来创建对象,调用方法,最后返回数据。
*/
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
RpcResponseMessage rpcResponseMessage = new RpcResponseMessage();
try {
//Class.forName获取类对象本身的.class属性
Class<?> aClass = Class.forName(msg.getClassName());
//获取方法对象
Method method = aClass.getMethod(msg.getMethodName(), msg.getParameterTypes());
//调用方法 .newInstance()调用构造函数生成对象
Object invoke = method.invoke(aClass.newInstance(), msg.getParameters());
rpcResponseMessage.setReturnValue(invoke);
} catch (Exception e){
e.printStackTrace();
rpcResponseMessage.setException(new Exception(e.getMessage()));
}
ctx.writeAndFlush(rpcResponseMessage);
}
}
客户端代码
- writeAndFlush方法的调试(出现了没有报错的难以查明的问题可以用这个方法):writeAndFlush返回的是ChannelFuture对象,有sync同步和addListener异步两种方法,通过异步和promise可以进行两个线程之间的通信。
channelFuture.addListener(promise -> {
if (!promise.isSuccess()){
System.out.println(promise.cause());
}
});
- 客户端应该生成一个单例模式的channel对象,可以供其他方法来一起调用。
- 关闭channel的方法应该设置为异步,而不是同步等待。否则初始化channel的过程中会一直阻塞住,导致无法获取到channel对象。
- 单例模式采用双重检查锁+volatite。
- 使用代理模式对请求的参数进行封装,并且将数据发送出去,使之能够像调用本地方法一样调用远程方法。
- 通过代理模式调用方法获取返回数据是在主线程中操作的,但是数据的处理是在NIO线程中,也就是在RpcResponseHandler执行,线程之间需要使用Promise进行通信。Promise就是一个容器,可以在多个线程中交换结果。
- 一次方法的调用对应一个Promise,通过方法请求时带的序列号作为Key,将Promise存入到Map当中。消息接收完毕再把Promise去掉。
- 通过一个全局的id变量来作为消息的序列号。
单例模式获取channel
/**
* 用于Rpc的客户端启动器
*/
public class RpcClient {
public static volatile Channel channel = null;
private static final Object LOCK = new Object();
public static Channel getChannel(){
if(channel==null){
synchronized (LOCK){
if(channel==null){
initChannel();
}
}
}
return channel;
}
/**
* 初始化Channel
*/
public static void initChannel(){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer());
ChannelFuture future = bootstrap.connect("127.0.0.1",8282).sync();
channel = future.channel();
// 以异步的方式去关闭channel,防止线程堵塞。
channel.closeFuture().addListener( e -> {
group.shutdownGracefully();
});
} catch (Exception e){
e.printStackTrace();
}
}
}
代理模式封装请求
/**
* 消息序列号和Promise的对应
*/
private final static Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
public static Promise<Object> getPromise(int sequenceId){
return PROMISES.get(sequenceId);
}
public static <T> T getProxyService(Class<T> service){
ClassLoader classLoader = service.getClassLoader();
int sequenceId = SequenceIdGenerator.getSequenceId();
// 生成代理对象实例
Object o = Proxy.newProxyInstance(classLoader, service.getInterfaces(), (proxy, method, args) -> {
// proxy 代理对象 method 代理对象执行的方法 args 执行方法的参数列表
// 1. 将方法调用转换为 消息对象
RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(
sequenceId,
service.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 将请求发送出去
getChannel().writeAndFlush(rpcRequestMessage).addListener(future -> {
if(!future.isSuccess()){
System.out.println(future.cause());
}
});
// 3. 准备一个Promise对象来接受结果并放入map容器中 指定Promise异步接受结果的线程
DefaultPromise promise = new DefaultPromise(getChannel().eventLoop());
MessageConstant.putPromise(sequenceId,promise);
// 4. 同步等待结果
promise.await();
// 5. 返回数据
if(promise.isSuccess()){
return promise.getNow();
}else{
return promise.cause();
}
});
return (T) o;
}
遇到的问题
- 堆栈溢出