Java NIO 之异步通道API入门

 2023-01-02
原文作者:pjmike 原文地址:https://pjmike.github.io/

NIO.2概览

NIO.2也就是人们常说的 AIO,在Java 7中引入了NIO的改进版NIO 2,它是异步非阻塞的IO方式。

AIO的核心概念就是发起非阻塞方式的I/O操作,立即响应,却不立即返回结果,当I/O操作完成时通知。

这篇文章主要介绍NIO 2的异步通道API的一些内容,后续文章再分析NIO.2的其他特性

异步通道API

从Java 7开始,java.nio.channel包中新增加4个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

这些类在风格上与NIO 通道API相似,他们共享相同的方法与参数结构体,并且大多数对于NIO通道类可用的参数,对于新的异步版本仍然可用。

异步通道API提供了两种对已启动异步操作的监测与控制机制:

  • 第一种通过返回一个 java.util.concurrent.Future对象 来表示异步操作的结果
  • 第二种是通过传递给操作一个新类的对象 java.nio.channels.CompletionHandler 来完成,它会定义在操作完毕后所执行的处理程序方法。

Future

从Java 1.5开始,引入了Future接口,使用该接口可以在任务执行完毕之后得到任务执行结果 。在NIO 2中,Future对象表示异步操作的结果,假设我们要创建一个服务器来监听客户端连接,打开AsynchronousServerSocketChannel 并将其绑定到类似于 ServerSocketChannel的地址:

    AsynchronousServerSocketChannel server 
      = AsynchronousServerSocketChannel.open().bind(null);

方法bind() 将一个套接字地址作为其参数,这里传递了一个Null地址,它会自动将套接字绑定到本地主机地址,并使用空闲的临时端口,就像传统的 ServerSocket设置 0端口一样,也是使用操作系统随机分配的临时端口。
然后调用服务器的accept()方法:

    Future<AsynchronousSocketChannel> future = server.accept();

当我们在NIO中调用 ServerSocketChannel的accept()方法时,它会阻塞,直到从客户端收到传入连接。但是AsynchronousServerSocketChannel 的accept() 方法会立即返回 Future 对象。

Future对象的泛型类型是操作的返回类型,在上面的例子,它是 AsynchronousSocketChannel ,但它也可以是Integer或String ,具体取决于操作的最终返回类型。

我们可以使用Future对象来查询操作的状态

    future.isDone();

如果基础操作已经完成,则此API返回 true,请注意,在这种情况下,完成可能意味着正常终止,异常,或者取消。

我们还可以明确检查操作是否被取消,如果操作在正常完成之前被取消,则它返回true。如下:

    future.isCancelled();

实际的取消操作,如下:

    future.cancel(true)

cancel()方法可利用一个布尔标志来指出执行接受的线程是否可被中断。

要检索操作结果,我们使用get()方法,该方法将阻塞等待结果的返回:

    AsynchronousSocketChannel client= future.get();

另外,我们也可以设置阻塞时间,下例设置为10s:

    AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS);

CompletionHandler

使用 Future 来处理操作的替代方法是使用 CompletionHandler 类的回调机制。异步通道允许指定完成处理程序以使用操作的结果:

    AsynchronousServerSocketChannel listener
      = AsynchronousServerSocketChannel.open().bind(null);
     
    listener.accept(
      attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
        public void completed(
          AsynchronousSocketChannel client, Object attachment) {
              // do whatever with client
          }
        public void failed(Throwable exc, Object attachment) {
              // handle failure
          }
      });

I/O操作成功完成时,将调用已完成的回调 API。如果操作失败,则调用失败的API.

异步通道API实例

服务端 (with Future)

下面是使用 Future的方式构建服务端。

    public class AsyncEchoServer {
        private AsynchronousServerSocketChannel server;
        private Future<AsynchronousSocketChannel> future;
        private AsynchronousSocketChannel worker;
    
        public AsyncEchoServer() throws IOException, ExecutionException, InterruptedException {
            System.out.println("Open Server Channel");
            server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9090));
            future = server.accept();
        }
    
        public void runServer() throws ExecutionException, InterruptedException, IOException, TimeoutException {
            //获取操作结果
            worker = future.get();
            if (worker != null && worker.isOpen()) {
                ByteBuffer buffer = ByteBuffer.allocate(100);
                //将通道中的数据写入缓冲区
                worker.read(buffer).get(10,TimeUnit.SECONDS);
                System.out.println("received from client: " + new String(buffer.array()));
            }
            server.close();
        }
    
        public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
            AsyncEchoServer server = new AsyncEchoServer();
            server.runServer();
        }
    }

服务端(With CompletionHandler)

下面我们将了解如何使用 CompletionHandler 方法而不是 Future 方法实现相同的服务端代码。

    public class AsyncEchoServerWithCallBack {
        private AsynchronousServerSocketChannel server;
        private AsynchronousSocketChannel worker;
        private AsynchronousChannelGroup group;
        public AsyncEchoServerWithCallBack() throws IOException, ExecutionException, InterruptedException {
            System.out.println("Open Server Channel");
            group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
            server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9090));
            //当有新连接建立时会调用 CompletionHandler接口实现对象中的 completed()方法
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    if (server.isOpen()) {
                        server.accept(null, this);
                    }
                    worker = result;
                    if ((worker != null) && (worker.isOpen())) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
                        worker.read(byteBuffer);
                        System.out.println("received the client: "+new String(byteBuffer.array()));
                    }
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    //TODO
                }
            });
        }
        public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
            AsyncEchoServerWithCallBack server = new AsyncEchoServerWithCallBack();
        }
    }

当有新连接建立时会调用 CompletionHandler接口实现对象中的 completed()方法,当出现错误时,会调用 failed方法。

accept方法的第一个参数可以是一个任意类型的对象,称为调用时的” 附加对象”。附件对象在 accept()方法调用时传入,可以在 CompletionHandler 接口的实现对象中从 completed 和 failed 方法的参数(attachment)中获取,这样就可以进行数据的传递。使用 CompletionHandler接口的方法都支持使用附件对象来传递数据。

AsynchronousChannelGroup类

异步通道在处理 I/O请求时,需要使用一个AsynchronousChannelGroup类,该类的对象表示的是一个异步通道的分组,每一个分组都有一个线程池与之对应 ,需要使用AsynchronousChannelGroup类的静态工厂方法 withFixedThreadPool,withCachedThreadPool或者 withThreaPool设置线程池。这个线程池中的线程用来处理 I/O 事件。多个异步通道可以共用一个分组的线程池资源。

调用AsynchronousSocketChannel 和 AsynchronousServerSocketChannel 类的 open 方法 打开异步套接字通道时,可以传入一个AsynchronousChannelGroup类的对象。如果调用的 open 方法没有传入 AsynchronousChannelGroup 类的对象,默认使用系统提供的分组,系统分组对应的线程池中的线程是 守护线程 ,如果使用默认分组,程序启动之后很快就退出了, 因为系统分组使用的守护线程不会阻止虚拟机的退出。

客户端

    public class AsyncEchoClient {
        private AsynchronousSocketChannel client;
        private Future<Void> future;
    
        public AsyncEchoClient() throws IOException {
            //打开一个异步channel
            System.out.println("Open client channel");
            client = AsynchronousSocketChannel.open();
            //连接本地端口和地址,在连接成功后不返回任何内容,但是,我们仍然可以使用Future对象来监视异步操作的状态
            System.out.println("Connect to server");
            future = client.connect(new InetSocketAddress("127.0.0.1", 9090));
        }
    
        /**
         * 向服务端发送消息
         *
         * @param message
         * @return
         */
        public void sendMessage(String message) throws ExecutionException, InterruptedException {
            if (!future.isDone()) {
                future.cancel(true);
                return;
            }
            //将一个字节数组封装到ByteBuffer中
            ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
            System.out.println("Sending message to the server");
            //将数据写入通道
            int numberBytes = client.write(byteBuffer).get();
            byteBuffer.clear();
        }
    
        public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
            AsyncEchoClient client = new AsyncEchoClient();
            client.sendMessage("hello world");
        }
    }

测试结果

客户端:

    Open client channel
    Connect to server

服务端:

    Open Server Channel
    received the client: hello world

Java NIO 2异步IO的体现

我们都知道由 JDK 1.7提供的NIO 2.0 新增了异步的套接字通道,它是真正的异步 IO,在异步IO操作的时候可以传递变量,当操作完成之后会回调相关的方法。那么NIO 2的异步非阻塞特性是如何体现的呢?从之前的描述就可以窥见很多细节:

异步的体现

AsynchronousServerSocketChannel为例,当调用该类的对象的 accept()方法时,其返回了一个 Future<AsynchronousSocketChannel>对象,调用accept()方法就像调用传统I/O中的ServerSocket的accept()一样,本质上都是接收客户端连接请求,只不过AsynchronousServerSocketChannel对象没有一直阻塞等待,而是立马返回一个Future对象,利用Futureget方法去获取连接结果,Future对象就是异步操作的结果,我们还可以利用Future的isDone方法查询操作完成的状态,这就是异步的体现。

当然在使用 CompletionHandler 方法中一样的道理,有新连接建立时会回调 CompletionHandler接口实现对象中的 completed()方法,当出现错误时,会调用 failed方法。

非阻塞的体现

当调用AsynchronousServerSocketChannel对象的 accept()方法后,返回Future对象,此时线程可以接着干其他事情,这是非阻塞的,要想获得操作结果,就调用 Future的 isDone方法查询操作是否完毕,使用 get()去获取结果,典型的非阻塞操作。而在传统 I/O模型中,套接字类对象的 accept方法会一直阻塞等待,直到有新连接接入进来才停止阻塞。

小结

NIO.2,也叫AIO,了解其异步通道API,也能更好地帮助我们去理解异步IO操作。当我们学习NIO2的API时,也可以对照NIO中的通道API进行学习,它们还是有很多相似的地方。

参考资料 & 鸣谢