上一章,我对Java NIO进行了介绍。我们使用的很多开源中间件底层均使用了Java NIO,比如Kafka、Zookeeper就基于Java NIO构建了自己的网络通信组件。
在Java网络编程中,如果使用Java NIO,通常是和Reactor模式结合在一起,构建通信模块。本章,我就对Reactor模式进行介绍,并给出使用Java NIO实现Reactor模式的代码示例。
一、Reactor模式
Reactor模式本质是一种 事件驱动模型 ,Doug Lea 曾在《Scalable IO in Java》中对Reactor模式进行定义,Reactor模式由Reactor反应器线程、Handlers处理器两大角色组成:
- Reactor反应器线程:负责响应IO事件,并且分发到Handlers处理器;
- Handlers处理器:非阻塞的执行业务处理逻辑。
Reactor模式有多个并发输入源,一个 Service Handler ,多个 Request Handlers 。这个Service Handler会同步的将请求(Event) 多路复用 地分发给相应的Request Handler。
1.1 核心组件
从结构上看,Reactor模式有点类似生产者/消费者模式,但是Reactor模式没有queue来做缓冲,每当一个Event输入到Service Handler后,Service Handler会主动根据Event类型分发给对应的Request Handler来处理,并且Reactor模式底层需要依赖操作系统的多路复用函数。
我们通过上述时序图,来看下Reactor模式的具体执行流程:
- 初始化一个Initiation Dispatcher,相当于一个容器和Reactor模式的调用入口;
- 创建一系列Event Handler,每个Event Handler包含对应的Handle引用,并将Event Handler注册到Initiation Dispatcher中;
- 调用Initiation Dispatcher的handle_events方法,来启动事件循环;
- Initiation Dispatcher内部使用Synchronous Event Demultiplexer的select方法等待这些handle上事件的发生;
- 当某个Handle的Event发生后,select()方法返回,Initiation Dispatcher根据返回的Handle找到注册的EventHandler,并回调该Event Handler的handle_events()方法来进行事件处理。
1.2 模式演化
上述描述的是通用意义上的Reactor模式核心组件以及执行流程,具体落地时根据实现情况有所不同。Doug Lea比较好的描述了Reactor模式的几个不同变种及其演化过程。Doug Lea认为,基本上所有的I/O处理程序都可以抽象成以下处理过程:
- Read request;
- Decode request
- Process service
- Encode reply
- Send reply
针对处理流程的模式不同,Reactor模式也有很多变种,我在下一节详细讲解:
- Thread-Per-Connection模式;
- 单线程Reactor模式;
- 多线程Reactor模式;
- 主从Reactor模式。
二、Thread-Per-Connection模式
Thread-Per-Connection模式,就是对于每一个网络连接都分配一个线程进行处理:
2.1 示例
示例代码如下:
class ThreadPerConnection implements Runnable {
public void run() {
try {
// 服务器监听socket
ServerSocket serverSocket = new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
// 创建新线程,专门负责一个连接的处理
new Thread(new Handler(socket)).start();
}
} catch (IOException ex) { /* 处理异常 */ }
}
// 处理器对象
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
// 读取数据
socket.getInputStream().read(input);
// 处理业务逻辑,获取处理结果
byte[] output = null;
// 写入结果
socket.getOutputStream().write(output);
} catch (IOException ex) { /*处理异常*/ }
}
}
}
}
2.2 优缺点
这种方式的优点就是简单,缺点也很明显:
- 创建/销毁线程开销太大,且机器本身的线程资源有限;
- 即使使用线程池,当线程从输入流读取数据时,如果没有足够数据,线程依然会进入阻塞状态,此时线程啥也不能干,造成资源浪费;
- 无法应对瞬间的峰值流量,可能瞬间将线程池占满,系统中的线程也是比较昂贵的系统资源,线程数太多,系统无法承受。
三、单Reactor单线程模式
在Reactor模式中,有两类重要的角色——Reactor反应器和Handler处理器:
- Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中Selector监控的通道IO事件;
- Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道读取、处理业务逻辑、将结果写入通道等操作。
单Reactor单线程模式,是指Reactor反应器和Handers处理器处于一个线程中执行。这种模式下,Reactor线程是个多面手,负责多路分离套接字,accept新连接,并分派请求到处理器链中:
3.1 示例
我这里也给出单Reactor单线程的示例代码,一共三个重要类:
- Reactor:Reactor类,负责建立新连接,分发处理;
- AcceptorHandler:负责对已经建立连接的Channel进行处理;
- IOHandler:完成业务逻辑。
public class Reactor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocket;
Reactor() throws IOException {
//...获取选择器、开启ServerSocket服务监听通道
//...绑定AcceptorHandler新连接处理器到selectKey
}
public static void main(String[] args) throws IOException {
new Thread(new Reactor()).start();
}
// 轮询和分发事件
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
if (handler != null) {
handler.run();
}
}
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new IOHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class IOHandler implements Runnable {
private static final int RECIEVING = 0, SENDING = 1;
private final SocketChannel channel;
private final SelectionKey sk;
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
private int state = RECIEVING;
IOHandler(Selector selector, SocketChannel c) throws IOException {
this.channel = c;
c.configureBlocking(false);
this.sk = this.channel.register(selector, 0);
this.sk.attach(this);
this.sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
try {
if (state == SENDING) {
channel.write(byteBuffer);
byteBuffer.clear();
sk.interestOps(SelectionKey.OP_READ);
state = RECIEVING;
} else if (state == RECIEVING) {
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
byteBuffer.flip();
sk.interestOps(SelectionKey.OP_WRITE);
state = SENDING;
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
在上面的代码中,设计了一个AcceptorHandler处理器,并attach到ServerSocketChannel中。AcceptorHandler处理器的两大职责:一是接受新连接,二是在为新连接创建一个输入输出的IOHandler。IOHandler,负责Socket的数据输入、业务处理、结果输出。
上述示例的单Reactor单线程模式Reactor反应器和所有的Handler处理器,都执行在同一条线程中。
3.2 优缺点
单Reactor单线程模式,适用于处理器链中业务处理组件能快速完成的场景。这种模型不能充分利用多核资源,所以实际使用的不多,此外它还有以下缺点:
- Reactor反应器和Handler处理器,都执行在同一条线程上。一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
- 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往进行重发,这更加重了NIO线程的负载,最终导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈。
四、单Reactor多线程模式
单Reactor多线程模式与单Reactor单线程模式最大区别就是有一组NIO线程处理I/O操作,它的特点如下:
- 有一个专门的NIO线程——acceptor线程,用于监听服务端,接收客户端的TCP连接请求;
- 网络读写等I/O操作由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
- 1个acceptor线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。
4.1 示例
我们来看一个单Reactor多线程模式的示例,它是我在《透彻理解分布式存储》专栏中,模仿Kafka Broker网络通信模块——NIO Server的一个实现:
上述这张NIO Server的架构图,核心组件定义如下:
- DataNodeNIOServer: 相当于Kafka中的Acceptor线程,负责监听客户端的连接事件,并把建立完成连接的SocketChannel交给各个Processor线程;
- NioProcessor: 相当于Kafka中的Processor线程,负责监听SocketChannel的
OP_READ
/OP_WRITE
事件,解析客户端请求交给业务线程处理,并从响应队列中获取业务线程处理完的结果,响应返回客户端; - IOThread: 业务线程,负责处理Processor线程解析完的请求,执行业务逻辑,然后将响应扔到Processor线程对应的响应队列中;
- NetworkRequestQueue: 全局请求队列,NioProcessor线程解析完请求后,会将请求封装成NetworkRequest对象,扔到该队列中,IOThread线程会从该队列中获取请求并处理;
- NetworkResponseQueues: 响应队列,内部为每个Processor线程分配了一个队列,IOThread线程会将处理结果扔到该队列中;
- NetworkRequest: 请求对象的抽象,负责从SocketChannel中读取完整请求的字节流;
- NetworkResponse: 响应对象的抽象,负责向SocketChannel写入完整响应的字节流。
DataNodeNIOServer
我们先来看DataNodeNIOServer,它负责监听客户端的连接请求,并将建立好的连接交给Processor线程处理。可以看到,DataNodeNIOServer在构造过程中会创建一系列的Processor线程和IO线程,并给每个Processor线程分配一个响应队列:
/**
* DataNode NIO Server
*
* @author Ressmix
*/
public class DataNodeNIOServer extends Thread {
public static final Integer PROCESSOR_NUM = 10;
public static final Integer IO_THREAD_NUM = 10;
private Selector selector;
private List<NioProcessor> processors = new ArrayList<>();
private NameNodeRpcClient rpcClient;
public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
this.rpcClient = rpcClient;
init();
}
@Override
public void run() {
while (true) {
try {
// 阻塞等待
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
// 建立连接
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverSocketChannel.accept();
if (channel != null) {
// 将建立连接的SocketChannel交给Processor处理
channel.configureBlocking(false);
Integer processorIndex = new Random().nextInt(PROCESSOR_NUM);
NioProcessor processor = processors.get(processorIndex);
processor.addChannel(channel);
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
/*---------------------------------PRIVATE METHOD--------------------------------*/
private void init() {
ServerSocketChannel serverChannel = null;
try {
// 监听OP_ACCEPT事件
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);
// 创建响应队列
NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
// 创建Processor线程,每个线程关联一个响应队列
for (int i = 0; i < PROCESSOR_NUM; i++) {
NioProcessor processor = new NioProcessor(i);
processors.add(processor);
processor.start();
// 每个Processor线程分配一个响应队列
responseQueues.assignQueue(i);
}
// 创建IO线程
for (int i = 0; i < IO_THREAD_NUM; i++) {
new IOThread(rpcClient).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NioProcessor
NioProcessor负责监听已经建立连接的SocketChannel的OP_READ
和OP_WRITE
事件,它的整个处理流程遵循一定的模式:
-
从内存队列中移除一个已经建立连接的SocketChannel,将它注册到自己的Selector上,并监听
OP_READ
事件; -
从自己的响应队列中移除一个响应,并在该响应相关的SocketChannel上监听
OP_WRITE
事件; -
不断轮询Selector监听的发生事件的SocketChannel:
- 如果是
OP_READ
事件,则创建一个NetworkRequest对象并将完整请求缓存其中,然后取消对该SocketChannel的OP_READ
事件的关注,并交给IO线程处理; - 如果是
OP_WRITE
事件,则向该SocketChannel写入完整响应,并让其取消对OP_WRITE
事件的关注。
- 如果是
经过上面这样的处理模式, 一定能保证对于同一个客户端的请求,肯定可以处理完一个完整请求/响应后,再进行下一个请求的处理,这是一种”无锁串行化”的设计思想,在NIO编程中很常见 :
/**
* Processor线程
*
* @author Ressmix
*/
public class NioProcessor extends Thread {
// Processor唯一标识
private volatile Integer processorId;
// 等待注册连接的队列
private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<>();
// 多路复用监听时的最大阻塞时间
private static final Long POLL_BLOCK_MAX_TIME = 1000L;
// 每个Processor私有的Selector多路复用器
private Selector selector;
// 缓存未读完的请求,Key为客户端IP
private Map<String, NetworkRequest> cachedRequests = new HashMap<>();
// 缓存未发送完的响应,Key为客户端IP
private Map<String, NetworkResponse> cachedResponses = new HashMap<>();
// 当前Processor维护的所有SelectionKey,Key为客户端IP
private Map<String, SelectionKey> cachedKeys = new HashMap<>();
public NioProcessor(Integer processorId) {
super();
this.processorId = processorId;
try {
this.selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public Integer getProcessorId() {
return this.processorId;
}
public void addChannel(SocketChannel channel) {
channelQueue.offer(channel);
// 唤醒Selector
// 因为Processor自身线程可能在阻塞等待,所以当有新连接添加队列时,需要由server线程唤起它
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
// 1.不断对已经建立连接的SocketChannel监听OP_READ事件
registerQueuedClients();
// 2.不断对需要返回响应的SocketChannel监听OP_WRITE事件
cacheQueuedResponse();
// 3.处理OP_READ事件和OP_WRITE事件
poll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*----------------------- PRIVATE METHOD -----------------------------*/
private void registerQueuedClients() {
SocketChannel channel = null;
// 不断出队元素
while ((channel = channelQueue.poll()) != null) {
try {
// 将已经建立连接的Channel注册到Selector上,并监听它的OP_READ事件
channel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
private void cacheQueuedResponse() {
NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
NetworkResponse response = null;
// 遍历当前Processor自己的响应队列中的响应
while ((response = responseQueues.poll(processorId)) != null) {
String client = response.getClient();
cachedResponses.put(client, response);
// 关注OP_WRITE事件
cachedKeys.get(client).interestOps(SelectionKey.OP_WRITE);
}
}
private void poll() {
try {
// 这里Processor线程可能会阻塞等待
int keys = selector.select(POLL_BLOCK_MAX_TIME);
if (keys > 0) {
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
try {
SelectionKey key = keyIterator.next();
keyIterator.remove();
SocketChannel channel = (SocketChannel) key.channel();
// 客户端IP地址
String client = channel.getRemoteAddress().toString();
// 1.发生读事件
if (key.isReadable()) {
NetworkRequest request = null;
if (cachedRequests.get(client) != null) {
// 缓存中有,说明上一次未读完,出现了拆包
request = cachedRequests.get(client);
} else {
request = new NetworkRequest();
}
// 执行读取操作
request.setChannel(channel);
request.setKey(key);
request.read();
// 1.1读取完成
if (request.hasCompletedRead()) {
// 将完整的请求分发到一个全局请求队列中,由IO线程处理
request.setClient(client);
NetworkRequestQueue.getInstance().offer(request);
cachedKeys.put(client, key);
// 删除缓存
cachedRequests.remove(client);
// 取消对OP_READ的关注
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
}
// 1.2 没有读取完成,缓存等待下次继续读取
else {
cachedRequests.put(client, request);
}
}
// 2.发生写事件
else if (key.isWritable()) {
NetworkResponse response = cachedResponses.get(client);
// 发送响应
channel.write(response.getBuffer());
cachedResponses.remove(client);
cachedKeys.remove(client);
// 取消对OP_WRITE事件的关注
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
上述代码中的cachedRequests
和cachedResponses
主要是用来处理 拆包 问题,针对没有读取完的请求,按照客户端IP进行缓存,这样就保证了一定能够读完一个完整请求。
NetworkRequestQueue
NetworkRequestQueue是一个全局请求队列,Processor线程解析完SocketChannel后,会将包含完整请求的NetworkRequest对象扔到该队列中。IO线程会从该队列中获取请求进行处理:
/**
* 全局请求队列
*
* @author Ressmix
*/
public class NetworkRequestQueue {
private final ConcurrentLinkedQueue<NetworkRequest> requestQueue = new ConcurrentLinkedQueue<>();
private NetworkRequestQueue() {
}
private static class InstanceHolder {
private static final NetworkRequestQueue instance = new NetworkRequestQueue();
}
public static NetworkRequestQueue getInstance() {
return InstanceHolder.instance;
}
public void offer(NetworkRequest request) {
requestQueue.offer(request);
}
public NetworkRequest poll() {
return requestQueue.poll();
}
}
IOThread
IOThread负责处理业务逻辑,它会从全局请求队列NetworkRequestQueue
中不断获取请求,然后进行处理,最后将处理结果封装成NetworkResponse对象,存放到Processor线程的响应队列中:
/**
* 业务线程
*/
public class IOThread extends Thread {
// 文件上传
public static final Integer REQUEST_SEND_FILE = 1;
// 文件下载
public static final Integer REQUEST_READ_FILE = 2;
// 全局请求队列
private NetworkRequestQueue requestQueue = NetworkRequestQueue.getInstance();
private final NameNodeRpcClient rpcClient;
public IOThread(NameNodeRpcClient rpcClient) {
super();
this.rpcClient = rpcClient;
}
@Override
public void run() {
while (true) {
try {
// 1.不断从全局请求队列中获取NetworkRequest
NetworkRequest request = requestQueue.poll();
if (request == null) {
Thread.sleep(100);
continue;
}
Integer requestType = request.getRequestType();
// 如果是文件上传请求
if (requestType.equals(REQUEST_SEND_FILE)) {
writeFileToLocalDisk(request);
}
// 如果是文件下载请求
else if (requestType.equals(REQUEST_READ_FILE)) {
readFileFromLocalDisk(request);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 写本地磁盘
*/
private void writeFileToLocalDisk(NetworkRequest request) {
// 构建针对本地文件的输出流
FileOutputStream localFileOut = null;
FileChannel localFileChannel = null;
try {
// 1.写磁盘
localFileOut = new FileOutputStream(request.getFilename());
localFileChannel = localFileOut.getChannel();
localFileChannel.position(localFileChannel.size());
System.out.println("对本地磁盘文件定位到position=" + localFileChannel.size());
int written = localFileChannel.write(request.getFileContent());
System.out.println("本次文件上传完毕,将" + written + " bytes的数据写入本地磁盘文件.......");
// 2.增量上报
rpcClient.deltaReportDataNodeInfo(request.getFilename(), request.getFilesize());
System.out.println("增量上报收到的文件副本给NameNode节点......");
// 3.封装响应
NetworkResponse response = new NetworkResponse();
response.setClient(request.getClient());
response.setBuffer(ByteBuffer.wrap("SUCCESS".getBytes()));
NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
localFileChannel.close();
localFileOut.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 从本地磁盘读文件
*/
private void readFileFromLocalDisk(NetworkRequest request) {
FileInputStream localFileIn = null;
FileChannel localFileChannel = null;
try {
// 从磁盘读取文件
File file = new File(request.getFilename());
Long fileLength = file.length();
localFileIn = new FileInputStream(request.getFilename());
localFileChannel = localFileIn.getChannel();
// 响应buffer:8字节响应头(存文件大小)+文件内容
ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.valueOf(String.valueOf(fileLength)));
buffer.putLong(fileLength);
int hasReadImageLength = localFileChannel.read(buffer);
System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");
buffer.rewind();
// 封装响应,扔到处理该请求的Processor的响应队列中
NetworkResponse response = new NetworkResponse();
response.setClient(request.getClient());
response.setBuffer(buffer);
NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (localFileChannel != null) {
localFileChannel.close();
}
if (localFileIn != null) {
localFileIn.close();
}
} catch (Exception ex2) {
ex2.printStackTrace();
}
}
}
}
NetworkResponseQueues
NetworkResponseQueues内部封装了每个Processor独占的响应队列:
/**
* 响应队列
*/
public class NetworkResponseQueues {
private NetworkResponseQueues() {
}
// KEY为Processor标识,每个Processor线程对应一个响应队列
private Map<Integer, ConcurrentLinkedQueue<NetworkResponse>> responseQueues = new HashMap<>();
public void assignQueue(Integer processorId) {
ConcurrentLinkedQueue<NetworkResponse> queue = new ConcurrentLinkedQueue<>();
responseQueues.put(processorId, queue);
}
private static class InstanceHolder {
private static final NetworkResponseQueues instance = new NetworkResponseQueues();
}
public static NetworkResponseQueues getInstance() {
return InstanceHolder.instance;
}
// 添加一个响应
public void offer(Integer processorId, NetworkResponse response) {
responseQueues.get(processorId).offer(response);
}
// 获取一个响应
public NetworkResponse poll(Integer processorId) {
return responseQueues.get(processorId).poll();
}
}
NetworkRequest
NetworkRequest内部包含了一个完整请求的数据,并提供read
接口从SocketChannel中读取字节:
public class NetworkRequest {
// 文件上传
public static final Integer REQUEST_SEND_FILE = 1;
// 文件下载
public static final Integer REQUEST_READ_FILE = 2;
// Processor标识
private Integer processorId;
// 该请求是哪个客户端发送过来的
private String client;
// 本次网络请求对应的SelectionKey
private SelectionKey key;
// 本次网络请求对应的Channel
private SocketChannel channel;
// 缓存的数据,处理拆包
private InflightRequest cachedRequest = new InflightRequest();
private ByteBuffer cachedRequestTypeBuffer;
private ByteBuffer cachedFilenameLengthBuffer;
private ByteBuffer cachedFilenameBuffer;
private ByteBuffer cachedFileLengthBuffer;
private ByteBuffer cachedFileBuffer;
/**
* 读取字节流
*/
public void read() {
try {
Integer requestType = null;
if (cachedRequest.requestType != null) {
requestType = cachedRequest.requestType;
} else {
requestType = getRequestType(channel);
}
if (requestType == null) {
return;
}
System.out.println("从请求中解析出来请求类型:" + requestType);
if (REQUEST_SEND_FILE.equals(requestType)) {
// 处理上传文件请求
handleSendFileRequest(channel, key);
} else if (REQUEST_READ_FILE.equals(requestType)) {
// 处理下载文件请求
handleReadFileRequest(channel, key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取本次请求的类型
*/
public Integer getRequestType(SocketChannel channel) throws Exception {
Integer requestType = null;
if (cachedRequest.requestType != null) {
return cachedRequest.requestType;
}
ByteBuffer requestTypeBuffer = null;
if (cachedRequestTypeBuffer != null) {
requestTypeBuffer = cachedRequestTypeBuffer;
} else {
requestTypeBuffer = ByteBuffer.allocate(4);
}
channel.read(requestTypeBuffer);
if (!requestTypeBuffer.hasRemaining()) {
// 已经读取出来了4个字节,可以提取出来requestType了
// 将position变为0,limit还是维持着4
requestTypeBuffer.rewind();
requestType = requestTypeBuffer.getInt();
cachedRequest.requestType = requestType;
} else {
cachedRequestTypeBuffer = requestTypeBuffer;
}
return requestType;
}
/**
* 发送文件
*/
private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
// 从请求中解析文件名
String filename = getFilename(channel);
System.out.println("从网络请求中解析出来文件名:" + filename);
if (filename == null) {
return;
}
// 从请求中解析文件大小
Long fileLength = getFileLength(channel);
System.out.println("从网络请求中解析出来文件大小:" + fileLength);
if (fileLength == null) {
return;
}
// 循环不断的从channel里读取数据,并写入磁盘文件
ByteBuffer fileBuffer = null;
if (cachedFileBuffer != null) {
fileBuffer = cachedFileBuffer;
} else {
fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
}
channel.read(fileBuffer);
if (!fileBuffer.hasRemaining()) {
fileBuffer.rewind();
cachedRequest.fileContent = fileBuffer;
cachedRequest.hasCompletedRead = true;
System.out.println("本次文件上传请求读取完毕.......");
} else {
cachedFileBuffer = fileBuffer;
System.out.println("本次文件上传出现拆包问题,缓存起来,下次继续读取.......");
}
}
/**
* 获取文件名
*/
private String getFilename(SocketChannel channel) throws Exception {
String filename = null;
if (cachedRequest.filename != null) {
return cachedRequest.filename;
} else {
Integer filenameLength = null;
// 读取文件名的大小
if(cachedRequest.filenameLength == null) {
ByteBuffer filenameLengthBuffer = null;
if(cachedFilenameLengthBuffer != null) {
filenameLengthBuffer = cachedFilenameLengthBuffer;
} else {
filenameLengthBuffer = ByteBuffer.allocate(4);
}
channel.read(filenameLengthBuffer);
if(!filenameLengthBuffer.hasRemaining()) {
filenameLengthBuffer.rewind();
filenameLength = filenameLengthBuffer.getInt();
cachedRequest.filenameLength = filenameLength;
} else {
cachedFilenameLengthBuffer = filenameLengthBuffer;
return null;
}
}
// 读取文件名
ByteBuffer filenameBuffer = null;
if(cachedFilenameBuffer != null) {
filenameBuffer = cachedFilenameBuffer;
} else {
filenameBuffer = ByteBuffer.allocate(filenameLength);
}
channel.read(filenameBuffer);
if(!filenameBuffer.hasRemaining()) {
filenameBuffer.rewind();
filename = new String(filenameBuffer.array());
} else {
cachedFilenameBuffer = filenameBuffer;
}
cachedRequest.filename = filename;
}
return filename;
}
/**
* 获取文件大小
*/
private Long getFileLength(SocketChannel channel) {
Long fileLength = null;
if(cachedRequest.filesize != null) {
return cachedRequest.filesize;
} else {
ByteBuffer filesizeBuffer = null;
if(cachedFileLengthBuffer != null) {
filesizeBuffer = cachedFileLengthBuffer;
} else {
filesizeBuffer = ByteBuffer.allocate(8);
}
if(!filesizeBuffer.hasRemaining()) {
filesizeBuffer.rewind();
fileLength = filesizeBuffer.getLong();
cachedRequest.filesize = fileLength;
} else {
cachedFileLengthBuffer = filesizeBuffer;
}
}
return fileLength;
}
/**
* 读取文件
*/
private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
// 从请求中解析文件名
String filename = getFilename(channel);
System.out.println("从网络请求中解析出来文件名:" + filename);
if(filename == null) {
return;
}
cachedRequest.hasCompletedRead = true;
}
/**
* 本次请求是否已经读取完成
*/
public boolean hasCompletedRead() {
Long hasReaded = cachedRequest.hasReadedSize;
Long total = cachedRequest.filesize;
if (hasReaded == null) {
return false;
}
if (total == null) {
return false;
}
return hasReaded.equals(total);
}
/**
* 缓存数据
*/
class InflightRequest {
// 请求类型
Integer requestType;
// 文件名,以前缀分隔符开始,比如/dir/enclosure/qq.jpg
String filename;
// 文件名大小
Integer filenameLength;
// 文件总大小
Long filesize;
// 文件内容
ByteBuffer fileContent;
// 已读取的大小
Long hasReadedSize;
// 是否读取完整
Boolean hasCompletedRead = false;
public InflightRequest(String filename, Long imageSize, Long hasReadedSize, Integer requestType) {
this.filename = filename;
this.filesize = imageSize;
this.hasReadedSize = hasReadedSize;
this.requestType = requestType;
}
public InflightRequest() {
}
}
//...
}
上述的整个处理流程是很清晰,就是按照我们自定义好的报文格式解析请求,核心点就是对 拆包 的处理逻辑:
- 判断缓存的数据是否存在,存在则直接返回;
- 判断缓存数据的ByteBuffer是否已经读完,读完则缓存数据,否则把ByteBuffer缓存起来。
NetworkResponse
NetworkResponse内部包含了一个完整响应的数据,由IO线程创建并写入数据:
public class NetworkResponse {
private String client;
private ByteBuffer buffer;
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getClient() {
return client;
}
public void setClient(String client) {
this.client = client;
}
}
4.2 优缺点
单Reactor多线程模式,是实际使用最多的Reactor模式,在绝大多数场景都可以满足性能需求。Kafka和Zookeeper底层都是采用了多线程Reactor模式。
但是在极特殊应用场景中,由一个Acceptor线程负责监听和处理所有的客户端连接,可能会存在性能问题。例如,百万客户端并发连接,或者服务端需要对客户端的握手信息进行安全认证,而认证本身非常损耗性能。
在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了 主从Reactor多线程模型 。
五、主从Reactor多线程模式
主从Reactor多线程模式比起单Reactor多线程模式,是将Reactor分成两部分:
- mainReactor负责监听ServerSocket,accept新连接,并将建立的Socket分派给subReactor;
- subReactor负责多路分离已连接的Socket,读写网络数据,并交给worker线程池处理。
通常,mainReactor和subReactor这两个线程池中的线程数与CPU核数有关,具体还是要根据机器的压测情况确定:
特点:
- 服务端不再只用一个Acceptor线程接收客户端连接,而是一个独立的MainReactor线程池;
- mainReactor线程池中的每个线程都可以接收客户端的TCP连接请求,并将建立完连接后的SocketChannel交给subReactor线程池处理;
- subReactor线程池中的线程会将接收到的SocketChannel注册到自己的Selector上,并负责监听和处理该SocketChannel的读/写事件;
- mainReactor线程池只用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的线程上,由它们负责后续的I/O操作。
Netty框架底层就是采用主从Reactor多线程模式,后续我在Netty源码分析章节会详细介绍。
六、总结
本章,我对Reactor模式进行了讲解,并使用Java NIO给出了 单Reactor多线程模式 的代码示例。事实上,Kafka和Zookeeper也是基于Java NIO和Reactor模式构建了自己的底层通信组件,感兴趣的读者可以阅读它们的源码,或者参考我其它专栏中对这两个开源中间件的剖析。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。