一、背景
Netty 是由 JBOSS 提供的一个 java 开源框架,现为 Github 上的独立项目。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程,例如:基于 TCP 和 UDP 的 socket 服务开发。
今天,这里就分享一个基于 Netty 实现的 RPC 框架。
什么是RPC
RPC,是指 Remote Procedure Call Protocol,中文释义为远程过程调用,是一种进程间通信方式,它是一种技术的思想,它允许程序调用另一个地址空间(共享网络的另一台机器上)的过程或函数。
简单来说,调用远程的服务时,给你的感觉就像是调用本地的服务一样,RPC 通信对用户来说是完全透明的,使用者无需关心请求是如何发出去的、发到了哪里,每次调用只需要拿到正确的调用结果就行。
主流的 RPC 框架有 dubbo、spring-cloud 等,而 dubbo 的底层通信正是基于 Netty 实现的。
项目基本流程
1. 客户端 Clustomer 调用远程访问接口 PersonService
2. PersonService 在客户端启动时会自动创建 PersonService 的代理对象 Proxy
3. Proxy 将请求数据发给客户端处理器 RpcClientHandler
4. RpcClientHandler 将请求数据发给服务端处理器 RpcServerHandler
5. RpcServerHandler 通过反射 Reflect 找到远程访问接口实现类 PersonServiceImpl
6. 执行 PersonServiceImpl 的方法,获取执行结果
7. RpcServerHandler 将执行结果传输给 RpcClientHandler
8. RpcClientHandler 接收到服务端的响应后,通过调用链路将执行结果回传 Clustomer
复制代码
代码中的出现自定义注解/自定义类,如果没有将它们的代码贴出来,这些注解或对象只需知道它们是干嘛的,暂时不理会它们是如何实现的(因为该项目还自定义了 IOC 容器,这里暂时不对 IOC 容器进行讲解,而且有其他简便的方式可以替换它们实现的功能),想要了解可以前往 github 地址看源码
项目源码: github.com/yehuisheng-…
二、引入 maven 依赖
<dependencies>
<!-- 引入netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
<!-- 反射库 -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.10.2</version>
</dependency>
<!-- 序列化 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
</dependencies>
复制代码
三、定义传输格式
1、自定义协议
- MsgProtocol 是自定义协议,规定了通信双方数据处理的格式
public class MsgProtocol {
private int length;
private byte[] msg;
public int getLength() {
return length;
}
public byte[] getMsg() {
return msg;
}
public void setLength(int length) {
this.length = length;
}
public void setMsg(byte[] msg) {
this.msg = msg;
}
public MsgProtocol() {
}
public MsgProtocol(byte[] msg) {
this.msg = msg;
this.length = msg.length;
}
}
复制代码
2、请求对象
- RequestMsg 是请求对象,包含了客户端请求的接口、方法、形参等数据
public class RequestMsg implements Serializable {
/** 请求参数 */
private Object[] params;
/** 参数类型 */
private Class<?>[] paramsType;
/** 请求接口 */
private Class<?> clazz;
/** 请求方法 */
private String methodName;
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public Class<?> getClazz() {
return clazz;
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamsType() {
return paramsType;
}
public void setParamsType(Class<?>[] paramsType) {
this.paramsType = paramsType;
}
@Override
public String toString() {
return "RpcRequestMsg{" +
"params=" + Arrays.toString(params) +
", clazz=" + clazz +
", methodName=" + methodName +
'}';
}
}
复制代码
3、解码器/编码器
- ProDeCoder 自定义解码器,将 Pipeline 上接收的数据转为 MsgProtocol 对象
public class ProDeCoder extends ReplayingDecoder<Void> {
/**
* 解码器 - 从字节流的缓冲区中解析通信节点发送过来的数据
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
// 获取数据长度
int length = byteBuf.readInt();
// 建立缓存区
byte[] bytes = new byte[length];
// 正确读取数据到缓冲区中
byteBuf.readBytes(bytes);
/*
* 将解码的数据通过 pipeline 调用链给下一个处理器
*/
MsgProtocol protocol = new MsgProtocol();
protocol.setLength(length);
protocol.setMsg(bytes);
list.add(protocol);
}
}
复制代码
- ProEnCoder 自定义编码器,将 Pipeline 上要传输出去的 MsgProtocol 对象转为 byte 数组
public class ProEnCoder extends MessageToByteEncoder<MsgProtocol> {
/**
* 编码器 - 将数据的长度放入缓冲区中,发送给通信的节点
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MsgProtocol msgProtocol, ByteBuf byteBuf) {
byteBuf.writeInt(msgProtocol.getLength());
byteBuf.writeBytes(msgProtocol.getMsg());
}
}
复制代码
4、字节操作工具类
- ByteUtil 是字节操作工具类,只有两个方法:将字节数组转为 Java 对象、将 Java 对象转为字节数组
public class ByteUtil {
/**
* @param bytes 字节数组
* @param clazz 类型
* @param <T> 泛型转换
* @return 将字节数组转为 Java 对象
*/
public static <T> T cast(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) {
return null;
}
// 读取字节数组,转为 Object
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
Object obj = ois.readObject();
if (obj instanceof String) {
// 用 Gson 进行反序列化
Gson gson = new Gson();
return gson.fromJson(obj.toString(), clazz);
} else {
return clazz.cast(obj);
}
} catch (Exception e) {
throw new ClassCastException("类型转换异常," + e.getMessage());
}
}
/**
* @param obj Object 类型的对象
* @return 将对象转为字节数组
*/
public static byte[] getBytes(Object obj) {
if (obj instanceof Serializable) {
// 已经序列化了
byte[] bytes = null;
try (ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bo)) {
oos.writeObject(obj);
bytes = bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
}
return bytes;
} else {
// 使用 Gson 进行序列化
Gson gson = new Gson();
String json = gson.toJson(obj);
return getBytes(json);
}
}
}
复制代码
四、搭建服务端
- ConfigProperties 是自定义配置类,作用是加载 properties 文件的配置信息,这里用到的配置只有 IP 地址和端口,感兴趣的话也可以将 ServerBootstrap 对象的配置加到配置文件,由用户决定配置项
- @ObjectScan 是自定义注解,类似于 Spring 的 @Component 注解,作用是帮助 IOC 容器启动时自动创建对象
@ObjectScan
public class ConfigProperties {
private final Properties properties;
public ConfigProperties() {
this.properties = new Properties();
loadConfig();
}
/** 地址 */
private String address;
/** 端口 */
private int port;
/**
* 加载配置文件
*/
private void loadConfig() {
File directory = new File("");
String courseFile;
try {
courseFile = directory.getCanonicalPath();
} catch (IOException e) {
throw new RuntimeException(e);
}
String path = courseFile + "\target\classes\rpc.properties";
try (FileInputStream fis = new FileInputStream(path);
InputStream is = new BufferedInputStream(fis)) {
properties.load(is);
} catch (FileNotFoundException ignored) {
// 配置文件不存在就使用默认值
} catch (Exception e) {
e.printStackTrace();
}
refreshContext();
}
/**
* 刷新配置信息
*/
private void refreshContext() {
this.address = temple("address", "127.0.0.1", String.class);
this.port = temple("port", 9999, Integer.class);
System.out.println("加载配置文件,address = " + address
+ ", port = " + port);
}
/**
*
* @param fieldName 字段名
* @param defaultValue 默认值
* @param clazz 类型
* @param <T> 泛型
* @return 返回配置文件中的值,如果没有则使用默认值
*/
private <T> T temple(String fieldName, T defaultValue, Class<T> clazz) {
Object o = properties.get(fieldName);
if (o != null) {
String s = o.toString().replaceAll(" ", "");
if ("".equals(s)) {
return defaultValue;
} else if (Integer.class.equals(clazz) && isNumber(s)) {
return clazz.cast(Integer.parseInt(s));
} else if (String.class.equals(clazz)) {
return clazz.cast(s);
} else {
throw new RuntimeException("配置文件属性" + fieldName + "类型错误,请检查");
}
}
return defaultValue;
}
/**
* @param value 字段值
* @return 正则表达式判断是否整数
*/
private boolean isNumber(String value) {
return value.matches("-?\d+");
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
}
复制代码
1、channel 处理器
- @AutoImport 是自定义注解,类似于 Spring 的 @Autowired 注解,作用是对添加注解的字段进行自动注入
- BeanFactory 是自定义 Bean 工厂(IOC容器),类似于 Spring 的 BeanFactory 对象,作用是管理 IOC 容器的对象
/**
*
* @ChannelHandler.Sharable 添加注解只是标明当前 Handler 是可共享的,会在添加到 Pipeline 时去做判断,
* 如果 Handler 是单例,但是没有添加 Sharable 注解,Netty 就会抛异常。
* Netty 并不会帮你实现单例,你添加了注解后,还需要自行将 Handler 设置为单例。
*
* @author yehuisheng
*/
@ObjectScan
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<MsgProtocol> {
@AutoImport
private BeanFactory beanFactory;
/**
* 有客户端连接,就会触发该方法
* @param ctx channel上下文对象
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("有客户端连接了");
}
/**
* 读取客户端的请求,进行处理
* @param ctx channel上下文对象
* @param msg 消息体
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) throws Exception {
// 解析远程服务调用的请求数据
RequestMsg request = ByteUtil.cast(msg.getMsg(), RequestMsg.class);
// 通过 Bean 工厂获取接口实现类
Object instance = beanFactory.getServiceImpl(request.getClazz());
// 根据参数类型和方法名找到方法对象,执行该方法获得接口的执行结果
Class<?>[] paramsType = request.getParamsType();
Method method = ObjectUtil.isEmpty(paramsType)
? instance.getClass().getDeclaredMethod(request.getMethodName())
: instance.getClass().getDeclaredMethod(request.getMethodName(), paramsType);
Object res = method.invoke(instance, request.getParams());
/*
* 判断有无返回值,Void 只实现了 Serializable 接口的空对象,
* 仅仅表示没有返回值,在客户端获取结果的方法中可以看到它们的使用
*/
boolean hasReturn = !Void.class.getSimpleName().toLowerCase()
.equalsIgnoreCase(method.getReturnType().getName());
/*
* Null 只实现了 Serializable 接口的空对象,仅仅表示数据为空,
* 为了在调用 ByteUtil.getBytes() 可以将空值序列化
*/
Object data = hasReturn ? (res == null ? new Null() : res) : new Void();
// 将数据封装为自定义协议,返回客户端
byte[] bytes = ByteUtil.getBytes(data);
ctx.writeAndFlush(new MsgProtocol(bytes));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
cause.printStackTrace();
}
}
复制代码
2、服务端
@ObjectScan
public class RpcNettyServer {
@AutoImport
private ConfigProperties configProperties;
@AutoImport
private RpcServerHandler rpcServerHandler;
private EventExecutorGroup eventExecutors;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void start() {
final int processors = Runtime.getRuntime().availableProcessors();
// 负责接收客户端连接的线程池
bossGroup = new NioEventLoopGroup(processors > 1 ? processors/2 : processors);
// 负责IO操作/任务处理的线程池
workerGroup = new NioEventLoopGroup(processors * 2);
// 自定义异步任务线程组
eventExecutors = new DefaultEventExecutorGroup(processors);
try {
/*
* 1、初始化两个线程组
* 2、设置 NIO 通信 channel
* 3、定义阻塞队列的长度
* 4、设置是否监控客户端的连接状态
* 5、添加信道(channel)的处理器
*/
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加编码器,解码器,handler,处理 handler 的线程池
pipeline.addLast(new ProDeCoder())
.addLast(new ProEnCoder())
.addLast(eventExecutors, rpcServerHandler);
}
});
// 绑定配置类加载的端口
ChannelFuture channelFuture = bootstrap.bind(configProperties.getPort()).sync();
System.out.println("RPC 服务启动成功。。。");
// 阻塞当前代码,使 netty 服务器一直处于运行状态
channelFuture.channel().closeFuture().sync();
} catch (Throwable e) {
e.printStackTrace();
} finally {
close();
}
}
public void close() {
boolean success = false;
if (ObjectUtil.canShutdownThreadPool(bossGroup)) {
bossGroup.shutdownGracefully();
success = true;
}
if (ObjectUtil.canShutdownThreadPool(workerGroup)) {
workerGroup.shutdownGracefully();
success = true;
}
if (ObjectUtil.canShutdownThreadPool(eventExecutors)) {
eventExecutors.shutdownGracefully();
success = true;
}
if (success) {
System.out.println("服务端关闭服务了。。。");
}
}
}
复制代码
五、搭建客户端
1、chanel 处理器
客户端处理器做这样的事情:
- 获取接口的请求方法、形参,将这些数据发送到服务端
- 数据发送后,调用 Object 的 wait 方法,线程进入等待状态
- 处理器接收到服务端的处理结果后,就调用 Object 的 notify 方法唤醒等待的线程
- 等待的线程唤醒后,执行下一步操作,将处理结果返回给远程访问接口的调用方
/**
*
* @ChannelHandler.Sharable 添加注解只是标明当前 Handler 是可共享的,会在添加到 Pipeline 时去做判断,
* 如果 Handler 是单例,但是没有添加 Sharable 注解,Netty 就会抛异常。
* Netty 并不会帮你实现单例,你添加了注解后,还需要自行将 Handler 设置为单例。
*
* @author yehuisheng
*/
@ObjectScan
@ChannelHandler.Sharable
public class RpcClientHandler<T> extends SimpleChannelInboundHandler<MsgProtocol> implements Supplier<T> {
private ChannelHandlerContext channel;
/** 请求结果 */
private Object result;
/** 请求对象 */
private RequestMsg request;
public void setRequest(RequestMsg request) {
this.request = request;
}
/**
* 与服务端成功建立连接
* @param ctx channel上下文对象
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
channel = ctx;
}
/**
* 读取服务器的信息
* @param ctx channel上下文对象
* @param msg 消息体
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) {
this.result = ByteUtil.cast(msg.getMsg(), Object.class);
// 接收到服务端的请求后,唤醒 get 方法继续执行
this.notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
/**
* @return 获取服务端的远程接口的处理结果
*/
@Override
@SuppressWarnings("unchecked")
public synchronized T get() {
if (request == null) {
throw new IllegalArgumentException("找不到请求信息");
}
if (channel == null) {
throw new IllegalArgumentException("服务端未开启");
}
try {
byte[] bytes = ByteUtil.getBytes(this.request);
// 向服务端发送请求
channel.writeAndFlush(new MsgProtocol(bytes));
// 等待 channelRead0() 响应服务端的请求结果
this.wait();
} catch (Exception e) {
e.printStackTrace();
}
/*
* 如果方法无返回值,服务端也不回传消息,客户端就会一直阻塞在 Object.wait() 中,
* 因此需要使用特定对象表示
*/
return (result instanceof Void || result instanceof Null) ? null : (T) result;
}
}
复制代码
2、客户端
客户端主要做两个事情:
- 建立与服务端的连接
- 给远程访问接口创建代理对象,当远程代理对象调用方法时,将当前接口的类型、方法和参数封装成 RequestMsg 对象,再由 clientHandler 对象携带数据访问服务端,获取接口的处理结果
clientHandler 是共享对象,为了线程安全需要在代理接口调用的位置加锁。
这里有一个问题,clientHandler 内部使用了 synchronized + wait/notify,wait 的方法会释放当前线程持有的对象锁。因此,如果代理接口调用 clientHandler 时使用 synchronized 对 clientHandler 对象加锁,那么当 clientHandler 内部调用 Object 的 wait 方法时,也会把这个位置的锁给释放掉,就会出现线程安全问题。
如何解决这个问题呢?
方案一: 代理接口调用处理器的位置 synchronized 不直接锁 clientHandler 对象,而是创建一个 Object 对象当作锁,锁定 clientHandler 对象
方案二: clientHandler 对象内部的 wait/notify 换成 Lock 接口的 await/signal
@ObjectScan
public class RpcNettyClient {
@AutoImport
private ConfigProperties configProperties;
@AutoImport
private RpcClientHandler<?> clientHandler;
private EventLoopGroup eventLoopGroup;
private final Object lock = new Object();
/**
* @param clazz 获取的接口类型
* @param <T> 泛型
* @return 获取接口的代理对象
*/
public <T> T getBeanInterface(Class<T> clazz) {
if (!clazz.isInterface()) {
throw new IllegalArgumentException("clazz不是接口类型");
}
// 通过 JDK 动态代理创建代理对象
Object instance = Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class[]{clazz},
(proxy, method, args) -> {
// 设置请求对象的参数
RequestMsg request = new RequestMsg();
request.setClazz(clazz);
request.setParamsType(method.getParameterTypes());
request.setMethodName(method.getName());
request.setParams(args);
synchronized (lock) {
clientHandler.setRequest(request);
return clientHandler.get();
}
}
);
return clazz.cast(instance);
}
/**
* 开启客户端,连接 netty 服务器
*/
public void start() {
eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
// 设置线程池
.group(eventLoopGroup)
// 设置 NIO 通道
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 添加编解码器和自定义业务处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ProDeCoder())
.addLast(new ProEnCoder())
.addLast(clientHandler);
}
});
System.out.println("连接远程服务成功。。。");
// 绑定服务器地址和端口号
ChannelFuture channelFuture = bootstrap.connect(configProperties.getAddress(), configProperties.getPort());
// 不阻塞客户端
channelFuture.sync();
// 阻塞客户端
// channelFuture.channel().closeFuture().sync();
} catch (Throwable e) {
e.printStackTrace();
close();
}
}
public void close() {
boolean success = false;
if (ObjectUtil.canShutdownThreadPool(eventLoopGroup)) {
eventLoopGroup.shutdownGracefully();
success = true;
}
if (success) {
System.out.println("客户端关闭服务了。。。");
}
}
}
复制代码
六、创建远程访问接口
1、接口
public interface PersonService {
void add(String name, String sex, int age);
Object get(String name);
Object remove(String name);
int count();
int number(Integer n);
int number(int n);
}
复制代码
2、接口实现类
- @Service 是自定义注解,作用是服务端将当前对象向外暴露,使得客户端可以远程访问这个对象
@Service
public class PersonServiceImpl implements PersonService {
private final Map<String, JsonObject> personMap = new ConcurrentHashMap<>();
@Override
public void add(String name, String sex, int age) {
if (ObjectUtil.isEmpty(name) || ObjectUtil.isEmpty(sex)
|| (!"男".equals(sex) && !"女".equals(sex))) {
return;
}
JsonObject person = new JsonObject();
person.addProperty("name", name);
person.addProperty("sex", sex);
person.addProperty("age", age);
personMap.put(name, person);
}
@Override
public JsonObject get(String name) {
return ObjectUtil.isEmpty(name) ? null : personMap.get(name);
}
@Override
public JsonObject remove(String name) {
return ObjectUtil.isEmpty(name) ? null : personMap.remove(name);
}
@Override
public int count() {
return personMap.size();
}
@Override
public int number(Integer n) {
return n == null ? -1 : n + 10;
}
@Override
public int number(int n) {
return n;
}
}
复制代码
七、创建服务提供者和消费者
1、线程池
- RpcThreadPool 自定义线程池
- SelfRejectedPolicy 自定义拒绝策略,线程被拒绝后,睡眠一段时间,再继续尝试加入线程池的工作队列,如果某线程超过指定的时间阈值,还加不入线程池的工作队列,就抛弃该线程(也可以短信邮件报警、记录到日志文件中等其他处理方式)
@ObjectScan
public class RpcThreadPool extends ThreadPoolExecutor {
private static final int PROCESSORS = Runtime.getRuntime().availableProcessors();
public RpcThreadPool() {
// 根据服务器配置和项目处理的任务复杂度,配置合适的线程池参数
super(PROCESSORS > 2 ? PROCESSORS/3 : PROCESSORS,
PROCESSORS*2,
3L, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(PROCESSORS * 500),
Executors.defaultThreadFactory(),
new SelfRejectedPolicy());
}
}
class SelfRejectedPolicy implements RejectedExecutionHandler {
/**
* 尝试再次进入线程池工作队列的时间(单位毫秒)
* 如果线程超过这个时间还未进入工作队列,则抛弃当前线程
*/
private final long againTryAddQueueTime;
/** 被拒绝的线程睡眠多长时间后再尝试加入线程池的工作队列(单位毫秒) */
private final long sleepTime;
SelfRejectedPolicy() {
// 默认 againTryAddQueueTime = 300
this(300);
}
SelfRejectedPolicy(long againTryAddQueueTime) {
// 默认 sleepTime = 50
this(againTryAddQueueTime, 50);
}
SelfRejectedPolicy(long againTryAddQueueTime, long sleepTime) {
this.againTryAddQueueTime = againTryAddQueueTime;
this.sleepTime = sleepTime;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Runnable runnable;
// 判断当前线程是否被线程池拒绝过
if (r instanceof RejectedRunnable) {
RejectedRunnable rejectedRunnable = (RejectedRunnable) r;
// 判断线程花费在进入工作队列的时间,有没有超出阈值
if (rejectedRunnable.getAliveTime() > againTryAddQueueTime) {
System.err.println("抛弃线程:" + rejectedRunnable);
return;
}
runnable = r;
} else {
// 包装该线程,并默认记录花费在进入工作队列的时间
runnable = new RejectedRunnable(r);
}
try {
// 睡眠一段时间后,再使用线程池调用该任务
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 无论是 Runnable 还是 Callable 接口,最终都是使用 execute() 加入任务队列
executor.execute(runnable);
}
}
复制代码
- RejectedRunnable 拒绝线程类,包装被线程池拒绝的任务,并在自定义拒绝策略中尝试再次加入线程池的工作队列
public class RejectedRunnable implements Runnable {
/** 被线程池拒绝的线程 */
private final Runnable runnable;
/** 当前对象的创建时间 */
private final long createTime;
public RejectedRunnable(Runnable runnable) {
this.runnable = runnable;
this.createTime = System.currentTimeMillis();
}
@Override
public void run() {
runnable.run();
}
/**
* @return 获取当前对象的存活时间
*/
public long getAliveTime() {
return System.currentTimeMillis() - createTime;
}
}
复制代码
2、入口类
- Applications 服务开启/关闭的入口类,当前类只有两个重要的方法
- run() 通过注解判断当前是客户端还是服务端,并最终调用 RpcNettyClient 或 RpcNettyServer 的 start 方法开启服务
- close() 关闭当前的服务和线程池
public class Applications {
private static BeanFactory clientBeanFactory;
private static BeanFactory serverBeanFactory;
/**
* 获取方法调用者,并且判断是否有开启注解,然后运行对应的 IOC 容器
*/
public static void run() {
try {
// 获取调用方的类对象
String className = new Exception().getStackTrace()[1].getClassName();
Class<?> clazz = Class.forName(className);
BeanFactory beanFactory;
// 注解判断服务端还是客户端
if (clazz.isAnnotationPresent(EnableRpcClient.class)) {
clientBeanFactory = new ClientBeanFactory();
beanFactory = clientBeanFactory;
} else if (clazz.isAnnotationPresent(EnableRpcServer.class)) {
serverBeanFactory = new ServerBeanFactory();
beanFactory = serverBeanFactory;
} else {
throw new RuntimeException("请开启RPC注解功能");
}
// 启动服务,客户端默认不阻塞,而服务端会在这个方法阻塞,监听 Channel 事件
beanFactory.refresh();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 关闭客户端或服务端
*/
public static void close() {
if (serverBeanFactory != null) {
close(serverBeanFactory);
}
if (clientBeanFactory != null) {
close(clientBeanFactory);
}
}
/**
* 关闭对象工厂和线程池
* @param beanFactory 对象工厂
*/
private static void close(BeanFactory beanFactory) {
RpcThreadPool threadPool = beanFactory.get(RpcThreadPool.class);
if (threadPool != null && !threadPool.isShutdown()) {
threadPool.shutdown();
}
beanFactory.close();
}
}
复制代码
3、服务提供者
- @EnableRpcServer 自定义注解,作用是开启服务提供者的注解功能
@EnableRpcServer
public class Provider {
public static void main(String[] args) {
Applications.run();
}
}
复制代码
4、消费者
- @EnableRpcClient 自定义注解,作用是开启服务消费者的注解功能
- @Reference 自定义注解,作用是标注当前接口是远程访问接口,和上文创建接口实现类的 @Service 注解一一对应
消费者这里使用了线程池,创建多个线程同时访问服务端,测试高并发下是否存在某些问题
@EnableRpcClient
public class Customer {
@Reference
private static PersonService personService;
@AutoImport
private static RpcThreadPool rpcThreadPool;
/** 调用次数 */
private final static int TIMES = 5000;
private final static Random RANDOM = new Random();
private static Integer num;
public static void main(String[] args) {
Applications.run();
long millis = System.currentTimeMillis();
// 多线程执行远程访问
for (int i = 0; i < TIMES; i++) {
int finalI = i;
num = i;
rpcThreadPool.execute(() -> {
String name = name();
String sex = sex();
int age = age();
personService.add(name, sex, age);
name = name();
Object person = personService.get(name);
System.out.println(finalI + " -> call get(" + name + ") = " + person);
name = name();
Object remove = personService.remove(name);
System.out.println(finalI + " -> call remove(" + name + ") = " + remove);
System.out.println(finalI + " -> call count = " + personService.count());
System.out.println(finalI + " -> call number(int) = " + personService.number(finalI));
System.out.println(num + " -> call number(Integer) = " + personService.number(num));
});
}
// 线程池的任务执行完毕,才执行剩下的代码
while (rpcThreadPool.getTaskCount() != rpcThreadPool.getCompletedTaskCount()) {}
System.err.println("交由线程池的总任务数量:" + TIMES);
System.err.println("线程池完成的任务数量:" + rpcThreadPool.getCompletedTaskCount());
System.err.println("线程池未完成的任务数量:" + (TIMES - rpcThreadPool.getCompletedTaskCount()));
long time = System.currentTimeMillis() - millis;
System.err.println("耗时:" + (time/1000) + "秒" + (time%1000) + "毫秒");
Applications.close();
}
/**
* @return 自动生成名字
*/
private static String name() {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
int nameLength = RANDOM.nextInt(6);
return uuid.length() < nameLength ? uuid : uuid.substring(0, nameLength);
}
/**
* @return 随机选择性别
*/
private static String sex() {
return (RANDOM.nextInt() % 2 == 0) ? "男" : "女";
}
/**
* @return 随机获取年龄
*/
private static int age() {
return RANDOM.nextInt(100);
}
}
复制代码
八、测试
- 启动服务端
- 启动客户端,并通过线程池调用远程访问接口
可以看到测试成功,功能都是正常的,好了,一个基于 Netty 实现的 RPC 框架就此完成