2023-08-02  阅读(4)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/346

上一章,我对Java NIO进行了介绍。我们使用的很多开源中间件底层均使用了Java NIO,比如Kafka、Zookeeper就基于Java NIO构建了自己的网络通信组件。

在Java网络编程中,如果使用Java NIO,通常是和Reactor模式结合在一起,构建通信模块。本章,我就对Reactor模式进行介绍,并给出使用Java NIO实现Reactor模式的代码示例。

一、Reactor模式

Reactor模式本质是一种 事件驱动模型 ,Doug Lea 曾在《Scalable IO in Java》中对Reactor模式进行定义,Reactor模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程:负责响应IO事件,并且分发到Handlers处理器;
  2. Handlers处理器:非阻塞的执行业务处理逻辑。

Reactor模式有多个并发输入源,一个 Service Handler ,多个 Request Handlers 。这个Service Handler会同步的将请求(Event) 多路复用 地分发给相应的Request Handler。

202308022223579671.png

1.1 核心组件

从结构上看,Reactor模式有点类似生产者/消费者模式,但是Reactor模式没有queue来做缓冲,每当一个Event输入到Service Handler后,Service Handler会主动根据Event类型分发给对应的Request Handler来处理,并且Reactor模式底层需要依赖操作系统的多路复用函数。

202308022223586632.png

我们通过上述时序图,来看下Reactor模式的具体执行流程:

  1. 初始化一个Initiation Dispatcher,相当于一个容器和Reactor模式的调用入口;
  2. 创建一系列Event Handler,每个Event Handler包含对应的Handle引用,并将Event Handler注册到Initiation Dispatcher中;
  3. 调用Initiation Dispatcher的handle_events方法,来启动事件循环;
  4. Initiation Dispatcher内部使用Synchronous Event Demultiplexer的select方法等待这些handle上事件的发生;
  5. 当某个Handle的Event发生后,select()方法返回,Initiation Dispatcher根据返回的Handle找到注册的EventHandler,并回调该Event Handler的handle_events()方法来进行事件处理。

1.2 模式演化

上述描述的是通用意义上的Reactor模式核心组件以及执行流程,具体落地时根据实现情况有所不同。Doug Lea比较好的描述了Reactor模式的几个不同变种及其演化过程。Doug Lea认为,基本上所有的I/O处理程序都可以抽象成以下处理过程:

  1. Read request;
  2. Decode request
  3. Process service
  4. Encode reply
  5. Send reply

针对处理流程的模式不同,Reactor模式也有很多变种,我在下一节详细讲解:

  • Thread-Per-Connection模式;
  • 单线程Reactor模式;
  • 多线程Reactor模式;
  • 主从Reactor模式。

二、Thread-Per-Connection模式

Thread-Per-Connection模式,就是对于每一个网络连接都分配一个线程进行处理:

202308022223594503.png

2.1 示例

示例代码如下:

    class ThreadPerConnection implements Runnable {
        public void run() {
            try {
                // 服务器监听socket
                ServerSocket serverSocket = new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
                while (!Thread.interrupted()) {
                    Socket socket = serverSocket.accept();
                    // 创建新线程,专门负责一个连接的处理
                    new Thread(new Handler(socket)).start();
                }
            } catch (IOException ex) { /* 处理异常 */ }
        }
    
        // 处理器对象
        static class Handler implements Runnable {
            final Socket socket;
            Handler(Socket s) {
                socket = s;
            }
            public void run() {
                while (true) {
                    try {
                        byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
                        // 读取数据
                        socket.getInputStream().read(input);
                        // 处理业务逻辑,获取处理结果
                        byte[] output = null;
                        // 写入结果
                        socket.getOutputStream().write(output);
                    } catch (IOException ex) { /*处理异常*/ }
                }
            }
        }
    }

2.2 优缺点

这种方式的优点就是简单,缺点也很明显:

  1. 创建/销毁线程开销太大,且机器本身的线程资源有限;
  2. 即使使用线程池,当线程从输入流读取数据时,如果没有足够数据,线程依然会进入阻塞状态,此时线程啥也不能干,造成资源浪费;
  3. 无法应对瞬间的峰值流量,可能瞬间将线程池占满,系统中的线程也是比较昂贵的系统资源,线程数太多,系统无法承受。

三、单Reactor单线程模式

在Reactor模式中,有两类重要的角色——Reactor反应器和Handler处理器:

  1. Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中Selector监控的通道IO事件;
  2. Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道读取、处理业务逻辑、将结果写入通道等操作。

单Reactor单线程模式,是指Reactor反应器和Handers处理器处于一个线程中执行。这种模式下,Reactor线程是个多面手,负责多路分离套接字,accept新连接,并分派请求到处理器链中:

202308022224010044.png

3.1 示例

我这里也给出单Reactor单线程的示例代码,一共三个重要类:

  • Reactor:Reactor类,负责建立新连接,分发处理;
  • AcceptorHandler:负责对已经建立连接的Channel进行处理;
  • IOHandler:完成业务逻辑。
    public class Reactor implements Runnable {
        private Selector selector;
        private ServerSocketChannel serverSocket;
    
        Reactor() throws IOException {
            //...获取选择器、开启ServerSocket服务监听通道
            //...绑定AcceptorHandler新连接处理器到selectKey
        }
    
        public static void main(String[] args) throws IOException {
            new Thread(new Reactor()).start();
        }
    
        // 轮询和分发事件
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            if (handler != null) {
                handler.run();
            }
        }
    
        class AcceptorHandler implements Runnable {
            public void run() {
                try {
                    SocketChannel channel = serverSocket.accept();
                    if (channel != null)
                        new IOHandler(selector, channel);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class IOHandler implements Runnable {
        private static final int RECIEVING = 0, SENDING = 1;
        private final SocketChannel channel;
        private final SelectionKey sk;
        private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        private int state = RECIEVING;
    
        IOHandler(Selector selector, SocketChannel c) throws IOException {
            this.channel = c;
            c.configureBlocking(false);
            this.sk = this.channel.register(selector, 0);
            this.sk.attach(this);
            this.sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        public void run() {
            try {
                if (state == SENDING) {
                    channel.write(byteBuffer);
                    byteBuffer.clear();
                    sk.interestOps(SelectionKey.OP_READ);
                    state = RECIEVING;
                } else if (state == RECIEVING) {
                    int length = 0;
                    while ((length = channel.read(byteBuffer)) > 0) {
                        Logger.info(new String(byteBuffer.array(), 0, length));
                    }
                    byteBuffer.flip();
                    sk.interestOps(SelectionKey.OP_WRITE);
                    state = SENDING;
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

在上面的代码中,设计了一个AcceptorHandler处理器,并attach到ServerSocketChannel中。AcceptorHandler处理器的两大职责:一是接受新连接,二是在为新连接创建一个输入输出的IOHandler。IOHandler,负责Socket的数据输入、业务处理、结果输出。

上述示例的单Reactor单线程模式Reactor反应器和所有的Handler处理器,都执行在同一条线程中。

3.2 优缺点

单Reactor单线程模式,适用于处理器链中业务处理组件能快速完成的场景。这种模型不能充分利用多核资源,所以实际使用的不多,此外它还有以下缺点:

  1. Reactor反应器和Handler处理器,都执行在同一条线程上。一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
  2. 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往进行重发,这更加重了NIO线程的负载,最终导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈。

四、单Reactor多线程模式

单Reactor多线程模式与单Reactor单线程模式最大区别就是有一组NIO线程处理I/O操作,它的特点如下:

  • 有一个专门的NIO线程——acceptor线程,用于监听服务端,接收客户端的TCP连接请求;
  • 网络读写等I/O操作由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
  • 1个acceptor线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

202308022224015845.png

4.1 示例

我们来看一个单Reactor多线程模式的示例,它是我在《透彻理解分布式存储》专栏中,模仿Kafka Broker网络通信模块——NIO Server的一个实现:

202308022224023166.png

上述这张NIO Server的架构图,核心组件定义如下:

  • DataNodeNIOServer: 相当于Kafka中的Acceptor线程,负责监听客户端的连接事件,并把建立完成连接的SocketChannel交给各个Processor线程;
  • NioProcessor: 相当于Kafka中的Processor线程,负责监听SocketChannel的OP_READ/OP_WRITE事件,解析客户端请求交给业务线程处理,并从响应队列中获取业务线程处理完的结果,响应返回客户端;
  • IOThread: 业务线程,负责处理Processor线程解析完的请求,执行业务逻辑,然后将响应扔到Processor线程对应的响应队列中;
  • NetworkRequestQueue: 全局请求队列,NioProcessor线程解析完请求后,会将请求封装成NetworkRequest对象,扔到该队列中,IOThread线程会从该队列中获取请求并处理;
  • NetworkResponseQueues: 响应队列,内部为每个Processor线程分配了一个队列,IOThread线程会将处理结果扔到该队列中;
  • NetworkRequest: 请求对象的抽象,负责从SocketChannel中读取完整请求的字节流;
  • NetworkResponse: 响应对象的抽象,负责向SocketChannel写入完整响应的字节流。

DataNodeNIOServer

我们先来看DataNodeNIOServer,它负责监听客户端的连接请求,并将建立好的连接交给Processor线程处理。可以看到,DataNodeNIOServer在构造过程中会创建一系列的Processor线程和IO线程,并给每个Processor线程分配一个响应队列:

    /**
     * DataNode NIO Server
     *
     * @author Ressmix
     */
    public class DataNodeNIOServer extends Thread {
        public static final Integer PROCESSOR_NUM = 10;
        public static final Integer IO_THREAD_NUM = 10;
    
        private Selector selector;
        private List<NioProcessor> processors = new ArrayList<>();
        private NameNodeRpcClient rpcClient;
    
        public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
            this.rpcClient = rpcClient;
            init();
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 阻塞等待
                    selector.select();
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keysIterator.next();
                        keysIterator.remove();
                        // 建立连接
                        if (key.isAcceptable()) {
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                            SocketChannel channel = serverSocketChannel.accept();
                            if (channel != null) {
                                // 将建立连接的SocketChannel交给Processor处理
                                channel.configureBlocking(false);
                                Integer processorIndex = new Random().nextInt(PROCESSOR_NUM);
                                NioProcessor processor = processors.get(processorIndex);
                                processor.addChannel(channel);
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }
    
        /*---------------------------------PRIVATE METHOD--------------------------------*/
    
        private void init() {
            ServerSocketChannel serverChannel = null;
            try {
                // 监听OP_ACCEPT事件
                selector = Selector.open();
                serverChannel = ServerSocketChannel.open();
                serverChannel.configureBlocking(false);
                serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);
    
                // 创建响应队列
                NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
                // 创建Processor线程,每个线程关联一个响应队列
                for (int i = 0; i < PROCESSOR_NUM; i++) {
                    NioProcessor processor = new NioProcessor(i);
                    processors.add(processor);
                    processor.start();
                    // 每个Processor线程分配一个响应队列
                    responseQueues.assignQueue(i);
                }
    
                // 创建IO线程
                for (int i = 0; i < IO_THREAD_NUM; i++) {
                    new IOThread(rpcClient).start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

NioProcessor

NioProcessor负责监听已经建立连接的SocketChannel的OP_READOP_WRITE事件,它的整个处理流程遵循一定的模式:

  1. 从内存队列中移除一个已经建立连接的SocketChannel,将它注册到自己的Selector上,并监听OP_READ事件;

  2. 从自己的响应队列中移除一个响应,并在该响应相关的SocketChannel上监听OP_WRITE事件;

  3. 不断轮询Selector监听的发生事件的SocketChannel:

    • 如果是OP_READ事件,则创建一个NetworkRequest对象并将完整请求缓存其中,然后取消对该SocketChannel的OP_READ事件的关注,并交给IO线程处理;
    • 如果是OP_WRITE事件,则向该SocketChannel写入完整响应,并让其取消对OP_WRITE事件的关注。

经过上面这样的处理模式, 一定能保证对于同一个客户端的请求,肯定可以处理完一个完整请求/响应后,再进行下一个请求的处理,这是一种”无锁串行化”的设计思想,在NIO编程中很常见

    /**
     * Processor线程
     *
     * @author Ressmix
     */
    public class NioProcessor extends Thread {
        // Processor唯一标识
        private volatile Integer processorId;
    
        // 等待注册连接的队列
        private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<>();
    
        // 多路复用监听时的最大阻塞时间
        private static final Long POLL_BLOCK_MAX_TIME = 1000L;
    
        // 每个Processor私有的Selector多路复用器
        private Selector selector;
    
        // 缓存未读完的请求,Key为客户端IP
        private Map<String, NetworkRequest> cachedRequests = new HashMap<>();
    
        // 缓存未发送完的响应,Key为客户端IP
        private Map<String, NetworkResponse> cachedResponses = new HashMap<>();
    
        // 当前Processor维护的所有SelectionKey,Key为客户端IP
        private Map<String, SelectionKey> cachedKeys = new HashMap<>();
    
        public NioProcessor(Integer processorId) {
            super();
            this.processorId = processorId;
            try {
                this.selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public Integer getProcessorId() {
            return this.processorId;
        }
    
        public void addChannel(SocketChannel channel) {
            channelQueue.offer(channel);
            // 唤醒Selector
            // 因为Processor自身线程可能在阻塞等待,所以当有新连接添加队列时,需要由server线程唤起它
            selector.wakeup();
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.不断对已经建立连接的SocketChannel监听OP_READ事件
                    registerQueuedClients();
                    // 2.不断对需要返回响应的SocketChannel监听OP_WRITE事件
                    cacheQueuedResponse();
                    // 3.处理OP_READ事件和OP_WRITE事件
                    poll();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        /*----------------------- PRIVATE METHOD -----------------------------*/
        private void registerQueuedClients() {
            SocketChannel channel = null;
            // 不断出队元素
            while ((channel = channelQueue.poll()) != null) {
                try {
                    // 将已经建立连接的Channel注册到Selector上,并监听它的OP_READ事件
                    channel.register(selector, SelectionKey.OP_READ);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void cacheQueuedResponse() {
            NetworkResponseQueues responseQueues = NetworkResponseQueues.getInstance();
            NetworkResponse response = null;
            // 遍历当前Processor自己的响应队列中的响应
            while ((response = responseQueues.poll(processorId)) != null) {
                String client = response.getClient();
                cachedResponses.put(client, response);
                // 关注OP_WRITE事件
                cachedKeys.get(client).interestOps(SelectionKey.OP_WRITE);
            }
        }
    
        private void poll() {
            try {
                // 这里Processor线程可能会阻塞等待
                int keys = selector.select(POLL_BLOCK_MAX_TIME);
                if (keys > 0) {
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    while (keyIterator.hasNext()) {
                        try {
                            SelectionKey key = keyIterator.next();
                            keyIterator.remove();
    
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 客户端IP地址
                            String client = channel.getRemoteAddress().toString();
    
                            // 1.发生读事件
                            if (key.isReadable()) {
                                NetworkRequest request = null;
                                if (cachedRequests.get(client) != null) {
                                    // 缓存中有,说明上一次未读完,出现了拆包
                                    request = cachedRequests.get(client);
                                } else {
                                    request = new NetworkRequest();
                                }
                                // 执行读取操作
                                request.setChannel(channel);
                                request.setKey(key);
                                request.read();
                                // 1.1读取完成
                                if (request.hasCompletedRead()) {
                                    // 将完整的请求分发到一个全局请求队列中,由IO线程处理
                                    request.setClient(client);
                                    NetworkRequestQueue.getInstance().offer(request);
                                    cachedKeys.put(client, key);
                                    // 删除缓存
                                    cachedRequests.remove(client);
                                    // 取消对OP_READ的关注
                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                                }
                                // 1.2 没有读取完成,缓存等待下次继续读取
                                else {
                                    cachedRequests.put(client, request);
                                }
                            }
                            // 2.发生写事件
                            else if (key.isWritable()) {
                                NetworkResponse response = cachedResponses.get(client);
                                // 发送响应
                                channel.write(response.getBuffer());
                                cachedResponses.remove(client);
                                cachedKeys.remove(client);
                                // 取消对OP_WRITE事件的关注
                                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

上述代码中的cachedRequestscachedResponses主要是用来处理 拆包 问题,针对没有读取完的请求,按照客户端IP进行缓存,这样就保证了一定能够读完一个完整请求。

NetworkRequestQueue

NetworkRequestQueue是一个全局请求队列,Processor线程解析完SocketChannel后,会将包含完整请求的NetworkRequest对象扔到该队列中。IO线程会从该队列中获取请求进行处理:

    /**
     * 全局请求队列
     *
     * @author Ressmix
     */
    public class NetworkRequestQueue {
        private final ConcurrentLinkedQueue<NetworkRequest> requestQueue = new ConcurrentLinkedQueue<>();
    
        private NetworkRequestQueue() {
        }
    
        private static class InstanceHolder {
            private static final NetworkRequestQueue instance = new NetworkRequestQueue();
        }
    
        public static NetworkRequestQueue getInstance() {
            return InstanceHolder.instance;
        }
    
        public void offer(NetworkRequest request) {
            requestQueue.offer(request);
        }
    
        public NetworkRequest poll() {
            return requestQueue.poll();
        }
    }

IOThread

IOThread负责处理业务逻辑,它会从全局请求队列NetworkRequestQueue中不断获取请求,然后进行处理,最后将处理结果封装成NetworkResponse对象,存放到Processor线程的响应队列中:

    /**
     * 业务线程
     */
    public class IOThread extends Thread {
        // 文件上传
        public static final Integer REQUEST_SEND_FILE = 1;
        // 文件下载
        public static final Integer REQUEST_READ_FILE = 2;
    
        // 全局请求队列
        private NetworkRequestQueue requestQueue = NetworkRequestQueue.getInstance();
    
        private final NameNodeRpcClient rpcClient;
    
        public IOThread(NameNodeRpcClient rpcClient) {
            super();
            this.rpcClient = rpcClient;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.不断从全局请求队列中获取NetworkRequest
                    NetworkRequest request = requestQueue.poll();
                    if (request == null) {
                        Thread.sleep(100);
                        continue;
                    }
    
                    Integer requestType = request.getRequestType();
                    // 如果是文件上传请求
                    if (requestType.equals(REQUEST_SEND_FILE)) {
                        writeFileToLocalDisk(request);
                    }
                    // 如果是文件下载请求
                    else if (requestType.equals(REQUEST_READ_FILE)) {
                        readFileFromLocalDisk(request);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 写本地磁盘
         */
        private void writeFileToLocalDisk(NetworkRequest request) {
            // 构建针对本地文件的输出流
            FileOutputStream localFileOut = null;
            FileChannel localFileChannel = null;
    
            try {
                // 1.写磁盘
                localFileOut = new FileOutputStream(request.getFilename());
                localFileChannel = localFileOut.getChannel();
                localFileChannel.position(localFileChannel.size());
                System.out.println("对本地磁盘文件定位到position=" + localFileChannel.size());
    
                int written = localFileChannel.write(request.getFileContent());
                System.out.println("本次文件上传完毕,将" + written + " bytes的数据写入本地磁盘文件.......");
    
                // 2.增量上报
                rpcClient.deltaReportDataNodeInfo(request.getFilename(), request.getFilesize());
                System.out.println("增量上报收到的文件副本给NameNode节点......");
    
                // 3.封装响应
                NetworkResponse response = new NetworkResponse();
                response.setClient(request.getClient());
                response.setBuffer(ByteBuffer.wrap("SUCCESS".getBytes()));
                NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    localFileChannel.close();
                    localFileOut.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 从本地磁盘读文件
         */
        private void readFileFromLocalDisk(NetworkRequest request) {
            FileInputStream localFileIn = null;
            FileChannel localFileChannel = null;
    
            try {
                // 从磁盘读取文件
                File file = new File(request.getFilename());
                Long fileLength = file.length();
                localFileIn = new FileInputStream(request.getFilename());
                localFileChannel = localFileIn.getChannel();
    
                // 响应buffer:8字节响应头(存文件大小)+文件内容
                ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.valueOf(String.valueOf(fileLength)));
                buffer.putLong(fileLength);
                int hasReadImageLength = localFileChannel.read(buffer);
                System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");
    
                buffer.rewind();
    
                // 封装响应,扔到处理该请求的Processor的响应队列中
                NetworkResponse response = new NetworkResponse();
                response.setClient(request.getClient());
                response.setBuffer(buffer);
                NetworkResponseQueues.getInstance().offer(request.getProcessorId(), response);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (localFileChannel != null) {
                        localFileChannel.close();
                    }
                    if (localFileIn != null) {
                        localFileIn.close();
                    }
                } catch (Exception ex2) {
                    ex2.printStackTrace();
                }
            }
        }
    }

NetworkResponseQueues

NetworkResponseQueues内部封装了每个Processor独占的响应队列:

    /**
     * 响应队列
     */
    public class NetworkResponseQueues {
    
        private NetworkResponseQueues() {
        }
    
        // KEY为Processor标识,每个Processor线程对应一个响应队列
        private Map<Integer, ConcurrentLinkedQueue<NetworkResponse>> responseQueues = new HashMap<>();
    
        public void assignQueue(Integer processorId) {
            ConcurrentLinkedQueue<NetworkResponse> queue = new ConcurrentLinkedQueue<>();
            responseQueues.put(processorId, queue);
        }
    
        private static class InstanceHolder {
            private static final NetworkResponseQueues instance = new NetworkResponseQueues();
        }
    
        public static NetworkResponseQueues getInstance() {
            return InstanceHolder.instance;
        }
    
        // 添加一个响应
        public void offer(Integer processorId, NetworkResponse response) {
            responseQueues.get(processorId).offer(response);
        }
    
        // 获取一个响应
        public NetworkResponse poll(Integer processorId) {
            return responseQueues.get(processorId).poll();
        }
    }

NetworkRequest

NetworkRequest内部包含了一个完整请求的数据,并提供read接口从SocketChannel中读取字节:

    public class NetworkRequest {
        // 文件上传
        public static final Integer REQUEST_SEND_FILE = 1;
        // 文件下载
        public static final Integer REQUEST_READ_FILE = 2;
    
        // Processor标识
        private Integer processorId;
    
        // 该请求是哪个客户端发送过来的
        private String client;
    
        // 本次网络请求对应的SelectionKey
        private SelectionKey key;
    
        // 本次网络请求对应的Channel
        private SocketChannel channel;
    
        // 缓存的数据,处理拆包
        private InflightRequest cachedRequest = new InflightRequest();
        private ByteBuffer cachedRequestTypeBuffer;
        private ByteBuffer cachedFilenameLengthBuffer;
        private ByteBuffer cachedFilenameBuffer;
        private ByteBuffer cachedFileLengthBuffer;
        private ByteBuffer cachedFileBuffer;
    
        /**
         * 读取字节流
         */
        public void read() {
            try {
                Integer requestType = null;
                if (cachedRequest.requestType != null) {
                    requestType = cachedRequest.requestType;
                } else {
                    requestType = getRequestType(channel);
                }
                if (requestType == null) {
                    return;
                }
                System.out.println("从请求中解析出来请求类型:" + requestType);
    
                if (REQUEST_SEND_FILE.equals(requestType)) {
                    // 处理上传文件请求
                    handleSendFileRequest(channel, key);
                } else if (REQUEST_READ_FILE.equals(requestType)) {
                    // 处理下载文件请求
                    handleReadFileRequest(channel, key);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取本次请求的类型
         */
        public Integer getRequestType(SocketChannel channel) throws Exception {
            Integer requestType = null;
    
            if (cachedRequest.requestType != null) {
                return cachedRequest.requestType;
            }
    
            ByteBuffer requestTypeBuffer = null;
            if (cachedRequestTypeBuffer != null) {
                requestTypeBuffer = cachedRequestTypeBuffer;
            } else {
                requestTypeBuffer = ByteBuffer.allocate(4);
            }
    
            channel.read(requestTypeBuffer);
            if (!requestTypeBuffer.hasRemaining()) {
                // 已经读取出来了4个字节,可以提取出来requestType了
                // 将position变为0,limit还是维持着4
                requestTypeBuffer.rewind();
                requestType = requestTypeBuffer.getInt();
                cachedRequest.requestType = requestType;
            } else {
                cachedRequestTypeBuffer = requestTypeBuffer;
            }
            return requestType;
        }
    
        /**
         * 发送文件
         */
        private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
            // 从请求中解析文件名
            String filename = getFilename(channel);
            System.out.println("从网络请求中解析出来文件名:" + filename);
            if (filename == null) {
                return;
            }
            // 从请求中解析文件大小
            Long fileLength = getFileLength(channel);
            System.out.println("从网络请求中解析出来文件大小:" + fileLength);
            if (fileLength == null) {
                return;
            }
    
            // 循环不断的从channel里读取数据,并写入磁盘文件
            ByteBuffer fileBuffer = null;
            if (cachedFileBuffer != null) {
                fileBuffer = cachedFileBuffer;
            } else {
                fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
            }
    
            channel.read(fileBuffer);
            if (!fileBuffer.hasRemaining()) {
                fileBuffer.rewind();
                cachedRequest.fileContent = fileBuffer;
                cachedRequest.hasCompletedRead = true;
                System.out.println("本次文件上传请求读取完毕.......");
            } else {
                cachedFileBuffer = fileBuffer;
                System.out.println("本次文件上传出现拆包问题,缓存起来,下次继续读取.......");
            }
        }
    
        /**
         * 获取文件名
         */
        private String getFilename(SocketChannel channel) throws Exception {
            String filename = null;
            if (cachedRequest.filename != null) {
                return cachedRequest.filename;
            } else {
                Integer filenameLength = null;
                // 读取文件名的大小
                if(cachedRequest.filenameLength == null) {
                    ByteBuffer filenameLengthBuffer = null;
                    if(cachedFilenameLengthBuffer != null) {
                        filenameLengthBuffer = cachedFilenameLengthBuffer;
                    } else {
                        filenameLengthBuffer = ByteBuffer.allocate(4);
                    }
    
                    channel.read(filenameLengthBuffer);
    
                    if(!filenameLengthBuffer.hasRemaining()) {
                        filenameLengthBuffer.rewind();
                        filenameLength = filenameLengthBuffer.getInt();
                        cachedRequest.filenameLength = filenameLength;
                    } else {
                        cachedFilenameLengthBuffer = filenameLengthBuffer;
                        return null;
                    }
                }
    
                // 读取文件名
                ByteBuffer filenameBuffer = null;
                if(cachedFilenameBuffer != null) {
                    filenameBuffer = cachedFilenameBuffer;
                } else {
                    filenameBuffer = ByteBuffer.allocate(filenameLength);
                }
    
                channel.read(filenameBuffer);
                if(!filenameBuffer.hasRemaining()) {
                    filenameBuffer.rewind();
                    filename = new String(filenameBuffer.array());
                } else {
                    cachedFilenameBuffer = filenameBuffer;
                }
                cachedRequest.filename = filename;
            }
            return filename;
        }
    
        /**
         * 获取文件大小
         */
        private Long getFileLength(SocketChannel channel) {
            Long fileLength = null;
    
            if(cachedRequest.filesize != null) {
                return cachedRequest.filesize;
            } else {
                ByteBuffer filesizeBuffer = null;
                if(cachedFileLengthBuffer != null) {
                    filesizeBuffer = cachedFileLengthBuffer;
                } else {
                    filesizeBuffer = ByteBuffer.allocate(8);
                }
    
    
                if(!filesizeBuffer.hasRemaining()) {
                    filesizeBuffer.rewind();
                    fileLength = filesizeBuffer.getLong();
                    cachedRequest.filesize = fileLength;
                } else {
                    cachedFileLengthBuffer = filesizeBuffer;
                }
            }
            return fileLength;
        }
    
    
        /**
         * 读取文件
         */
        private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
            // 从请求中解析文件名
            String filename = getFilename(channel);
            System.out.println("从网络请求中解析出来文件名:" + filename);
            if(filename == null) {
                return;
            }
            cachedRequest.hasCompletedRead = true;
        }
    
        /**
         * 本次请求是否已经读取完成
         */
        public boolean hasCompletedRead() {
            Long hasReaded = cachedRequest.hasReadedSize;
            Long total = cachedRequest.filesize;
            if (hasReaded == null) {
                return false;
            }
            if (total == null) {
                return false;
            }
            return hasReaded.equals(total);
        }
    
    
        /**
         * 缓存数据
         */
        class InflightRequest {
            // 请求类型
            Integer requestType;
            // 文件名,以前缀分隔符开始,比如/dir/enclosure/qq.jpg
            String filename;
            // 文件名大小
            Integer filenameLength;
            // 文件总大小
            Long filesize;
            // 文件内容
            ByteBuffer fileContent;
            // 已读取的大小
            Long hasReadedSize;
            // 是否读取完整
            Boolean hasCompletedRead = false;
    
            public InflightRequest(String filename, Long imageSize, Long hasReadedSize, Integer requestType) {
                this.filename = filename;
                this.filesize = imageSize;
                this.hasReadedSize = hasReadedSize;
                this.requestType = requestType;
            }
    
            public InflightRequest() {
            }
        }
        //...
    }

上述的整个处理流程是很清晰,就是按照我们自定义好的报文格式解析请求,核心点就是对 拆包 的处理逻辑:

  1. 判断缓存的数据是否存在,存在则直接返回;
  2. 判断缓存数据的ByteBuffer是否已经读完,读完则缓存数据,否则把ByteBuffer缓存起来。

NetworkResponse

NetworkResponse内部包含了一个完整响应的数据,由IO线程创建并写入数据:

    public class NetworkResponse {
        private String client;
        private ByteBuffer buffer;
    
        public ByteBuffer getBuffer() {
            return buffer;
        }
    
        public void setBuffer(ByteBuffer buffer) {
            this.buffer = buffer;
        }
    
        public String getClient() {
            return client;
        }
    
        public void setClient(String client) {
            this.client = client;
        }
    }

4.2 优缺点

单Reactor多线程模式,是实际使用最多的Reactor模式,在绝大多数场景都可以满足性能需求。Kafka和Zookeeper底层都是采用了多线程Reactor模式。

但是在极特殊应用场景中,由一个Acceptor线程负责监听和处理所有的客户端连接,可能会存在性能问题。例如,百万客户端并发连接,或者服务端需要对客户端的握手信息进行安全认证,而认证本身非常损耗性能。

在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了 主从Reactor多线程模型

五、主从Reactor多线程模式

主从Reactor多线程模式比起单Reactor多线程模式,是将Reactor分成两部分:

  • mainReactor负责监听ServerSocket,accept新连接,并将建立的Socket分派给subReactor;
  • subReactor负责多路分离已连接的Socket,读写网络数据,并交给worker线程池处理。

通常,mainReactor和subReactor这两个线程池中的线程数与CPU核数有关,具体还是要根据机器的压测情况确定:

202308022224033807.png

特点:

  • 服务端不再只用一个Acceptor线程接收客户端连接,而是一个独立的MainReactor线程池;
  • mainReactor线程池中的每个线程都可以接收客户端的TCP连接请求,并将建立完连接后的SocketChannel交给subReactor线程池处理;
  • subReactor线程池中的线程会将接收到的SocketChannel注册到自己的Selector上,并负责监听和处理该SocketChannel的读/写事件;
  • mainReactor线程池只用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的线程上,由它们负责后续的I/O操作。

Netty框架底层就是采用主从Reactor多线程模式,后续我在Netty源码分析章节会详细介绍。

六、总结

本章,我对Reactor模式进行了讲解,并使用Java NIO给出了 单Reactor多线程模式 的代码示例。事实上,Kafka和Zookeeper也是基于Java NIO和Reactor模式构建了自己的底层通信组件,感兴趣的读者可以阅读它们的源码,或者参考我其它专栏中对这两个开源中间件的剖析。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文