Java NIO 传送文件

 2022-08-08
原文地址:https://blog.51cto.com/guotong1988/5487387
    package tools.distributedclient;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    /**
     * NIO客户端
     */
    public class MasterClient
        // 通道管理器
        private Selector selector;
    
        /**
         * 获得一个Socket通道,并对该通道做一些初始化的工作
         * 
         * @param ip
         *            连接的服务器的ip
         * @param port
         *            连接的服务器的端口号
         * @throws
        public void initClient(String ip, int port) throws IOException {
            // 获得一个Socket通道
            SocketChannel channel = SocketChannel.open();
            // 设置通道为非阻塞
            channel.configureBlocking(false);
            // 获得一个通道管理器
            this.selector = Selector.open();
    
            // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
            // 用channel.finishConnect();才能完成连接
            channel.connect(new InetSocketAddress(ip, port));
            // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
            channel.register(selector, SelectionKey.OP_CONNECT);
        }
    
        /**
         * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
         * 
         * @throws
        @SuppressWarnings("unchecked")
        public void listen() throws IOException {
            // 轮询访问selector
            while (true) {
                selector.select();
                // 获得selector中选中的项的迭代器
                Iterator ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    // 删除已选的key,以防重复处理
                    ite.remove();
                    // 连接事件发生
                    if (key.isConnectable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 如果正在连接,则完成连接
                        if (channel.isConnectionPending()) {
                            channel.finishConnect();
    
                        }
                        // 设置成非阻塞
                        channel.configureBlocking(false);
    
                        File file = new File("D:/mysql-5.6.23.zip");
                        FileInputStream input = new FileInputStream(file);
                        byte[] buffer = new byte[256 * 1024];
                        int read = 0;
                        while ((read = input.read(buffer)) != -1) {
                            System.out.println(read);
                            ByteBuffer w = ByteBuffer.wrap(buffer, 0, read);
                            while (w.hasRemaining()) {
                                channel.write(w);
                            }
                        }
                        // 在这里可以给服务端发送信息哦
                        // channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息")
                        // .getBytes()));
    
                        // 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
                        channel.register(this.selector, SelectionKey.OP_READ);
    
                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }
    
                }
    
            }
        }
    
        /**
         * 处理读取服务端发来的信息 的事件
         * 
         * @param key
         * @throws
        public void read(SelectionKey key) throws IOException {
            // 和服务端的read方法一样
        }
    
        /**
         * 启动客户端测试
         * 
         * @throws
        public static void main(String[] args) throws IOException {
            Thread t = new ClientThread("localhost", 8000);
            Thread t2 = new ClientThread("localhost", 8001);
            t.start();
            t2.start();
        }
    
    }
    
    class ClientThread extends Thread {
        int port;
        String server;
    
        public ClientThread(String server, int port) {
            this.port = port;
            this.server = server;
        }
    
        public void run() {
            MasterClient client = new MasterClient();
            try {
                client.initClient(server, port);
                client.listen();
            } catch
    package tools.distributedclient;
    
    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    /**
     * NIO服务端
     */
    public class SlaveClient2
        // 通道管理器
        private Selector selector;
        File file ;
        long readed = 0;
    
        /**
         * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
         * 
         * @param port
         *            绑定的端口号
         * @throws
        public void initServer(int port) throws IOException {
            // 获得一个ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 设置通道为非阻塞
            serverChannel.configureBlocking(false);
            // 将该通道对应的ServerSocket绑定到port端口
            serverChannel.socket().bind(new InetSocketAddress(port));
            // 获得一个通道管理器
            this.selector = Selector.open();
            // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
            // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        /**
         * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
         * 
         * @throws
        @SuppressWarnings("unchecked")
        public void listen() throws IOException {
            System.out.println("服务端启动成功!");
            // 轮询访问selector
            while (true) {
                // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
                selector.select();
                // 获得selector中选中的项的迭代器,选中的项为注册的事件
                Iterator ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    // 删除已选的key,以防重复处理
                    ite.remove();
                    // 客户端请求连接事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key
                                .channel();
                        // 获得和客户端连接的通道
                        SocketChannel channel = server.accept();
                        // 设置成非阻塞
                        channel.configureBlocking(false);
    
                        // 在这里可以给客户端发送信息哦
                        channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息")
                                .getBytes()));
                        // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
                        channel.register(this.selector, SelectionKey.OP_READ);
    
                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }
    
                }
    
            }
        }
    
        /**
         * 处理读取客户端发来的信息 的事件
         * 
         * @param key
         * @throws
        public void read(SelectionKey key) throws IOException {
            // 服务器可读取消息:得到事件发生的Socket通道
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 创建读取的缓冲区
    
            FileOutputStream output = new FileOutputStream(file,true);
            byte[] singleBytes = new byte[16 * 1024];
            ByteBuffer singleReadBuffer = ByteBuffer.wrap(singleBytes);
            singleReadBuffer.clear();
            int size = 0;
            while (true) {
                // do a single read
                int read = socketChannel.read(singleReadBuffer);
                if (read == -1) {
                    // this means we reached the end of socket stream.
                    // close();
                    break;
                }
                if (read > 0) {
                    System.out.println(read);
                    readed += read;
                    // flip input buffer
                    singleReadBuffer.flip();
                    // process input buffer
                    output.write(singleReadBuffer.array(), 0, read);
                    // clear input buffer
                    singleReadBuffer.clear();
                }
                if (read == 0) {
                    // since no more data available, just break to wait for
                    // more in coming data
                    break;
                }
            }
            output.flush();
            // String msg = new String(data).trim();
            System.out.println("服务端收到信息");
            // ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
            // channel.write(outBuffer);// 将消息回送给客户端
        }
    
        /**
         * 启动服务端测试
         * 
         * @throws
        public static void main(String[] args) throws IOException {
            SlaveClient2 server = new SlaveClient2();
            server.file = new File("D:/mysql-5.6.23.zip3");
            if(server.file.exists()){
                server.file.delete();
            }
            server.file.createNewFile();
            server.initServer(8001);
            server.listen();
        }
    
    }