Java NIO 模型之NIO、BIO、AIO

 2022-07-26
原文作者:程序员青峰

同步阻塞模型,一个客户端连接对应一个处理线程。

缺点

  1. IO代码里read是阻塞操作,如果连接不做读写操作会导致线程阻塞,浪费资源
  2. 如果读写很多,会导致服务器线程过多,压力太大。

应用场景

BIO适用于连接数目较小且固定的架构,这种方式对服务器资源的要求比较高,但是程序简单易理解。

202207262313553041.png

示例代码

    /**
     * 服务端
     * @author 风信子
     */
    public class SocketServer {
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = new ServerSocket(9000);
            while (true){
                System.out.println("等待连接...");
                //阻塞方法
                final Socket socket =  serverSocket.accept();
                System.out.println("有客户端连接...");
                new Thread(new Runnable() {
                    public void run() {
                        try{
                            handler(socket);
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        }
        public static void handler(Socket socket) throws IOException{
            System.out.println("当前线程:"+Thread.currentThread().getId());
            byte[] bytes = new byte[1024];
            System.out.println("准备read...");
            // 接受客户端数据没有就阻塞
            int read = socket.getInputStream().read(bytes);
            if(read!=-1){
                System.out.println("接收到客户端的信息为:"+new String(bytes,0,read));
                System.out.println("当前线程:"+Thread.currentThread().getId());
            }
            socket.getOutputStream().write("hello client".getBytes());
            socket.getOutputStream().flush();
        }
    }
    
    /**
     * 客户端
     * @author 风信子
     */
    public class SocketClient {
        public static void main(String[] args) throws IOException {
            Socket socket = new Socket("127.0.0.1",9000);
            socket.getOutputStream().write("Hello BIO".getBytes());
            socket.getOutputStream().flush();
            System.out.println("数据发送结束!");
            byte[] bytes = new byte[1024];
            // 接受服务器传回的数据
            socket.getInputStream().read(bytes);
            System.out.println("接收到的信息为:"+new String(bytes));
            socket.close();
        }
    }
    

NIO (Non Blocking IO)

同步非阻塞模型,服务实现模型为一个线程可以处理多个连接(请求),客户端的连接都会注册到 多路复用器selector 上面,多路复用轮询到连接用IO请求就进行处理,I/O 多路复用底层一般用的是Linux API (select,poll,epoll)来实现,区别见下面表格:

select poll epoll(jdk1.5及以上)
select poll epoll(jdk1.5及以上)
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用进行线性遍历,时间复杂度为O(n) 每次调用进行线性遍历,时间复杂度为O(n) 事件通知方式,每当有io事件就绪,系统注册的回调函数就会被调用,时间复杂度为O(1)
最大连接 有上限 无上限 无上限

应用场景

NIO 适用于连接数目多且连接时间短(轻量级)的架构,比如聊天服务器,弹幕系统,服务间通信。编程比较复杂。jdk1.4开始支持。

202207262313563552.png

NIO 有三大组件: Channel(通道)、Buffer(缓冲区),Selector(选择器)

202207262313571043.png

  1. channel 类似于流,每个channel对应一个buffer缓冲区,buffer底层是个数组
  2. channel 会注册到selector上面,由selector 根据channel读写事件的发生将其交给空闲的线程处理。
  3. selector 可以对应一个或多个线程
  4. NIO 的Buffer和Channel都是既可以读也可以写的

NIO 服务端程序分析

  1. 创建一个ServerSocketChannel和Selector,将serverSocketChannel注册到Selector上
  2. selector通过select()方法监听channel事件,当客户端连接时selector监听到连接事件,获取到ServerSocketChannel注册时绑定的selectionKey
  3. selectionKey通过channel()方法可以获取绑定的ServerSocketChannel
  4. ServerSocketChannel通过accept()方法得到SocketChannel
  5. 将SocketChannel注册到Selector上,关心read事件
  6. 注册后返回一个SelectionKey,会和该SocketChannel关联
  7. selector继续通过select()方法监听事件,当客户端发送数据给服务端,selector监听到read事件,获取到SocketChannel注册时绑定的selectionKey
  8. selectionKey通过channel()方法可以获取绑定的socketChannel
  9. 将socketChannel里的数据读取出来
  10. 用socketChannel将服务端数据写回客户端

总结:NIO模型的selector 就像一个大总管,负责监听各种IO事件,然后转交给后端线程去处理 。 NIO相对于BIO非阻塞的体现就在,BIO的后端线程需要阻塞等待客户端写数据(比如read方法),如果客户端不写数据线程就要阻塞, NIO把等待客户端操作的事情交给了大总管 selector,selector 负责轮询所有已注册的客户端,发现有事件发生了才转交给后端线程处 理,后端线程不需要做任何阻塞等待,直接处理客户端事件的数据即可,处理完马上结束,或返回线程池供其他客户端事件继续使用。还 有就是 channel 的读写是非阻塞的

Redis就是典型的NIO线程模型,selector收集所有连接的事件并且转交给后端线程,线程连续执行所有事件命令并将结果写回客户端*

示例代码

    /**
     * @author 风信子
     * NIO 服务端实现
     */
    public class NioServer {
    
        public static void main(String[] args) throws IOException {
            // 创建一个本地端口监听的服务socket通道,并设置为非阻塞方式
            ServerSocketChannel ssc = ServerSocketChannel.open();
            // selector是非阻塞模式,必须设置为非阻塞才能在selector上注册,否则会报错.
            ssc.configureBlocking(false);
            ssc.socket().bind(new InetSocketAddress(9000));
            // 创建一个selector
            Selector selector = Selector.open();
            // 把ServerSocketChannel 注册到selector上面,并设置对客户端的accept感兴趣
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            while (true){
                System.out.println("等待事件发生..");
                // 轮询监听channel里面的key,select 是阻塞的 accept也是阻塞的
                selector.select();
                System.out.println("有事件发生了..");
                // 轮询监听到客户端请求
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()){
                    SelectionKey key = it.next();
                    // 删除本次处理的key ,防止下次select 重复处理
                    it.remove();
                    handler(key);
                }
            }
        }
    
        public static void handler(SelectionKey key) throws IOException{
            if(key.isAcceptable()){
                System.out.println("有客户端连接事件发生了..");
                ServerSocketChannel ssc =(ServerSocketChannel) key.channel();
                // NIO非阻塞体现:此处accept方法会阻塞 但它是连接事件所有很快就会执行完,不会阻塞
                // 处理完连接请求不会继续等待客户端的数据发送
                SocketChannel sc= ssc.accept();
                sc.configureBlocking(false);
                // 通过Selector 监听Channel 时对读事件感兴趣
                sc.register(key.selector(), SelectionKey.OP_READ);
            }else if(key.isReadable()){
                System.out.println("有客户端可读数据事件发生..");
                SocketChannel sc =(SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // NIO非阻塞体现:首先read请求不会阻塞,其次这种事件响应模型,当调用到read方法是肯定是客户端发生了发送数据的事件
                int len = sc.read(byteBuffer);
                if(len!=-1){
                    System.out.println("接收到了客户端的消息:"+new String(byteBuffer.array(),0,len));
                }
                ByteBuffer bufferWrite = ByteBuffer.wrap("hello client".getBytes());
                sc.write(bufferWrite);
                key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                sc.close();
            }
        }
    }
    
    /**
     * @author 风信子
     * NIO 客户端实现
     */
    public class NioClient {
        Selector selector;
    
        public static void main(String[] args) throws IOException{
            NioClient nioClient = new NioClient();
            nioClient.initClient("127.0.0.1",9000);
            nioClient.connection();
        }
    
        public void initClient(String ip,int port) throws IOException {
            // 获取一个socket 通道
            SocketChannel socketChannel = SocketChannel.open();
            // 设置通道为非阻塞
            socketChannel.configureBlocking(false);
            // 获取一个通道管理器
            this.selector = Selector.open();
            // 客户端连接服务器,其实方法执行并没实现连接,需要在listen()方法中
            // 调用channel.finishConnection才能完成连接
            socketChannel.connect(new InetSocketAddress(ip,port));
            // 将管道管理器和通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件
            socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
        }
    
        public void connection() throws IOException{
            // 轮询访问selector
            while(true){
                // 选择一组可以进行I/O操作的事件,放在selector中,客户端该方法不会阻塞
                // 这里和服务端的方法不一样,查看api注释可以知道,服务端当至少一个通道被选中时
                // selector的wakeup方法被调用,方法返回,而对于客户端来说,通道是一直被选中的
                this.selector.select();
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()){
                    SelectionKey key = it.next();
                    it.remove();
                    // 连接事件发生
                    if(key.isConnectable()){
                        SocketChannel socketChannel =(SocketChannel) key.channel();
                        // 如果正在连接则完成连接
                        if(socketChannel.isConnectionPending()){
                            socketChannel.finishConnect();
                        }
                        // 设置成非阻塞
                        socketChannel.configureBlocking(false);
                        // 向服务器发送信息
                        ByteBuffer byteBuffer = ByteBuffer.wrap("hello server".getBytes());
                        socketChannel.write(byteBuffer);
                        // 连接成功之后注册读取服务器信息事件
                        socketChannel.register(this.selector,SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        read(key);
                    }
                }
            }
        }
    
        public void read(SelectionKey key) throws IOException{
            SocketChannel channel = (SocketChannel)key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int len = channel.read(byteBuffer);
            if(len!=-1){
                System.out.println("接收到服务端信息:"+new String(byteBuffer.array(),0,len));
            }
        }
    }
    

AIO(NIO 2.0)

异步非阻塞模型,由操作系统完成后回调通知服务端程序启用线程去处理,一般使用于连接数较多且连接时间长的应用。

应用场景

AIO方式适用于连接数目多且连接比较长(重操作)的架构,jdk7开始支持。

代码示例

    /**
     * 服务端
     * @author 风信子
     */
    public class AIOServer {
        public static void main(String[] args) throws Exception {
            final AsynchronousServerSocketChannel serverChannel =
                    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000)) ;
            // 异步点: 通过钩子函数处理连接请求
            serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                    try{
                        // 此处接收客户端请求,不写这行代码客户端连接不上服务器
                        serverChannel.accept(attachment,this);
                        System.out.println(socketChannel.getRemoteAddress());
                        ByteBuffer byteBuffer  = ByteBuffer.allocate(1024);
                        // 异步点: 通过钩子函数处理数据接收操作
                        socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer attachment) {
                                byteBuffer.flip();
                                System.out.println("接收到客户端消息:"+new String(byteBuffer.array(),0,result));
                                socketChannel.write(ByteBuffer.wrap("hello client".getBytes()));
                            }
    
                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                exc.printStackTrace();
                            }
                        });
    
                    }catch (IOException e){
    
                    }
    
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    exc.printStackTrace();
                }
            });
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 客户端
     * @author 风信子
     */
    public class AIOClient {
    
        public static void main(String... args) throws Exception {
            AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
            socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes()));
            ByteBuffer buffer = ByteBuffer.allocate(512);
            Integer len = socketChannel.read(buffer).get();
            if (len != -1) {
                System.out.println("客户端收到信息:" + new String(buffer.array(), 0, len));
            }
        }
    }
    

BIO、NIO、AIO 对比

BIO NIO AIO
BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量

网络段子

老张爱喝茶,废话不说,煮开水。 出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。

老张把水壶放到火上,立等水开。( 同步阻塞 ) 老张觉得自己有点傻 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。( 同步非阻塞 ) 老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。 老张把响水壶放到火上,立等水开。( 异步阻塞 ) 老张觉得这样傻等意义不大 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。( 异步非阻塞 ) 老张觉得自己聪明了。

    所谓同步异步,只是对于水壶而言。 
    普通水壶,同步;
    响水壶,异步。 
    虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。
    这是普通水壶所不能及的。
    同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。
    所谓阻塞非阻塞,仅仅对于老张而言。
    立等的老张,阻塞;看电视的老张,非阻塞。