2023-09-23
原文作者:李林超 原文地址: https://www.lilinchao.com/archives/2113.html

一次无法写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)

  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件
  • 服务端代码

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 可写事件  服务端
     */
    public class WriteServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(8080));
    
            Selector selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);
    
            while (true){
                selector.select();
    
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()){
                    SelectionKey key = iter.next();
                    iter.remove();
                    if(key.isAcceptable()){
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
                        SelectionKey sckey = sc.register(selector,SelectionKey.OP_READ);
    
                        //1.向客户端发送内容
                        StringBuilder sb = new StringBuilder();
                        for (int i=0;i<30000000;i++){
                            sb.append("a");
                        }
                        ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
    
                        //2.write表示实际写了多少字节
                        int write = sc.write(buffer);
                        System.out.println("实际写入字节:" + write);
                        //3.如果有剩余未读字节,才需要关注写事件
                        if(buffer.hasRemaining()){
                            // read 1 write 4
                            //在原有关注事件的基础上,多关注一个写事件
                            sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                            //把buffer作为附件加入sckey
                            sckey.attach(buffer);
                        }
    
                    }else if(key.isWritable()){
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        SocketChannel sc = (SocketChannel) key.channel();
                        int write = sc.write(buffer);
                        System.out.println("实际写入字节:" + write);
                        if (!buffer.hasRemaining()) { // 写完了
                            key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                            key.attach(null);
                        }
                    }
                }
    
            }
    
        }
    }
  • 客户端代码
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 客户端
     */
    public class WriteClient {
        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
            sc.connect(new InetSocketAddress("localhost", 8080));
            int count = 0;
            while (true) {
                selector.select();
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isConnectable()) {
                        System.out.println(sc.finishConnect());
                    } else if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                        count += sc.read(buffer);
                        buffer.clear();
                        System.out.println(count);
                    }
                }
            }
        }
    }
  • 服务端运行结果
    实际写入字节:5242840
    实际写入字节:3014633
    实际写入字节:4063201
    实际写入字节:4718556
    实际写入字节:2490349
    实际写入字节:2621420
    实际写入字节:2621420
    实际写入字节:2621420
    实际写入字节:2606161

附参考原文:

《黑马程序员Netty教程》

阅读全文