2023-10-18
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://www.skjava.com/series/article/2054931878

上篇文章(Netty 入门 — ByteBuf,Netty 数据传输的载体),我们了解了 Netty 的数据是以 ByteBuf 为单位进行传输的,但是有了数据,你没有通道,数据是无法传输的,所以今天我们来熟悉 Netty 的第三个核心组件:Channel。ByteBuf 是数据,那 Channel 则是负责传输数据的通道,它是把握 Netty 通信的命门,没有它 Netty 是无法通信的。

Channel 简介

在 Java NIO 中我们知道,Channel,即通道,是用来传输数据的一条“管道”,它与 Buffer 相辅相成,在 Java NIO 中,我们只能从 Channel 读取数据到 Buffer 中,或者从 Buffer 读取数据到 Channel 中,如下:

在 Netty 中同样有一个 Channel,该 Channel 是 Netty 的核心概念之一,它是 Netty 网络 IO 操作的抽象,即 Netty 网络通信的主体,由它来负责对端进行网络通信、注册、数据操作等一切 IO 相关的操作,其主要功能包括:

  1. 网络 IO 的读写
  2. 客户端发起连接
  3. 关闭连接
  4. 网络连接的相关参数
  5. 绑定端口
  6. Netty 框架相关操作,如获取 Channel 相关联的 EventLoop、pipeline 等。

为什么要另起炉灶?

JDK 提供了一个 Channel,为什么 Netty 还要另起炉灶自己实现一个呢?我认为主要原因有如下几个:

  1. 原生的 Channel 功能太少,不满足 Netty 的要求。
  2. 原生的 ServerSocketChannel 和 SocketChannel 是一个 SPI 接口,具体的实现由虚拟厂商来实现,直接通过原生 ServerSocketChannel 和 SocketChannel 来实现及满足 Netty 的要求,其工作量不亚于重新开发一个。
  3. Netty 的 Channel 需要符合 Netty 的整体架构设计,他需要和 Netty 的整体架构耦合在一起,比如 IO 模型、基于元数据描述配置化的TCP参数等等,原生的 Channel 都不支持。
  4. 自定义的 Channel,灵活性更高,功能更加全面。

Channel 原理

Channel 的核心原理如下图:

  1. 客户端与服务端成功建立连接后,服务端会为该连接创建一个 Channel。
  2. Channel 从 EventLoopGroup 中获取一个 EventLoop,Channel 注册到该 EventLoop 中,从此 Channel 就与该 EventLoop 绑定在一起了,在 Channel 整个生命周期内都只会与该 EventLoop 绑定在一起。
  3. 客户端发起的 IO 操作,在 Channel 中都将产生相对应的 Event,触发与该 Channel 绑定的 EventLoop 进行处理
  4. 如果是读写事件,执行线程调度 ChannelPipeline 来处理业务逻辑。ChannelPipeline 只负责 Handler 的编排,真正执行任务的是各个具体的 ChannelHandler。

Channel 的状态转换

Channel 从创建到消亡,他有四种状态,分别是:

  1. 打开状态(Open)
    1. Channel 处于打开状态时,表示它已经被创建,但尚未绑定到任何地址或连接到远端服务器。
  2. 活动状态(Active)
    1. Channel 处于活动状态时,表示它已经成功绑定到本地地址或连接到远端服务器。
    2. 这个时候可以调用 writeAndFlush() 向对方发送数据了。
  3. 非活动状态(Inactive)
    1. Channel 处于非活动状态时,表示它已经处于活动状态,但连接已经断开或由于其他原因不可用。
    2. 当连接被关闭或出现错误时,Channel 会进入非活动状态。
    3. 无法进行读取或写入操作,但可重新激活 Channel。
  4. 关闭状态(Closed):
    1. Channel 处于关闭状态时,表示它已经完全关闭,无法再进行任何操作。

状态流转如下:

Netty 提供了四个方法来判断 Channel 的状态:

  • isOpen():检查 Channel 是否为 open 状态。
  • isRegistered():检查 Channel 是否为 registered 状态。
  • isActive():检查 Channel 是否为 active 状态。
  • isWritable():这个方法有误导性,它并不是判断当前 Channel 是否可写,实际上它是用来检测当前 Channel 的写操作是否可以立刻被 IO 线程处理,当该方法返回 false 时,任何写请求都会被阻塞,知道 I/O 线程有能力能处理这些请求。

各个状态以及他们对应的操作如下表格:

状态 isOpen() isActive() close() writeAndFlush() 读操作 写操作
打开(Open) true false true false true true
活动(Active) true true true true true true
非活动(Inactive) true false true false false false
关闭(Closed) false false false false false false

Channel 的 API

Channel 常用的 API 非常多,如下图(部分):

方法虽然多,但是总体大致分为如下几类:

类 getter API

这里方法主要用于获取 Channel 相关的属性,如绑定地址,相关配置等等

  • SocketAddress localAddress():返回与 Channel 绑定的本地地址
  • SocketAddress remoteAddress():返回与 Channel 绑定的远端地址
  • ChannelConfig config():返回一个 ChannelConfig 对象,通过这个对象可以配置Channel相关的参数
  • ChannelMetadata metadata():返回一个 ChannelMetadata 对象,ChannelMetadata 可以查询 Channel 实现是否支持某种操作,目前它还只要一个方法 hasDisconnect(),用来判断 Channel 实现是否支持 disconnect() 操作。
  • Channel parent():返回 Channel 的 parent Channel。SocketChannel 返回的是相对一个的 ServerSocketChannel,而 ServerSocketChannel 则返回 null。为什么 SocketChannel 返回的是 ServerSocketChannel 呢?因为所有的 SocketChannel (客户端发起连接)都是由 ServerSocketChannel 接受连接而创建的,所以 SocketChannel 的 parent() 返回的就是对应的 ServerSocketChannel 。
  • EventLoop eventLoop():返回 Channel 注册的 EventLoop。
  • ChannelPipeline pipeline():返回与 Channel 关联的 ChannelPipeline。
  • ByteBufAllocator alloc():返回与 Channel 关联的 ByteBufAllocator 对象。
  • Unsafe unsafe():返回 Channel 的 Unsafe 对象。Unsafe 是 Channel 的内部类,只供 Channel 内部使用。

Future 相关 API

Netty 中的所有 IO 操作都是异步的,这就意味着任何的 IO 调用都将立刻返回,但是并不能保证所有的操作都在调用结束后就完成了,而且我们也不知道 IO 操作执行的结果。Netty 在完成 IO 调用后会返回一个 Future 对象,该对象就是 Channel 异步 IO 的结果。Channel 提供了我们操作这些 Future 的方法:

  • ChannelFuture closeFuture():当 Channel 关闭时返回一个 ChannelFuture,我们可以通过该方法来来对 Channel 关闭后做一些处理。
  • ChannelPromise voidPromise():返回一个 ChannelPromise 实例对象。
  • ChannelPromise newPromise():返回一个 ChannelPromise。
  • ChannelProgressivePromise newProgressivePromise():返回一个新的 ChannelProgressivePromise 实例对象。
  • ChannelFuture newSucceededFuture():创建一个新的 ChannelFuture,并将其标注为 succeed 。
  • ChannelFuture newFailedFuture(Throwable cause):创建一个新的 ChannelFuture,并将其标注为 failed。

ChannelFutureChannelPromise 是 Netty 提供的两个特殊 Future,利用他们我们能够在 Netty 完成一些异步操作的处理。

判断状态 API

Channel 提供了四个 isXxx 方法用来判断 Channel 的状态:

  • boolean isOpen():判断 Channel 是否 opened
  • boolean isRegistered():判断 Channel 是否 registered
  • boolean isActive():判断 Channel 是否 active
  • boolean isWritable():判断 Channel 是否可以立刻处理 IO 事件

事件触发类方法

这些方法都会触发 IO 事件,他们都会通过 ChannelPipeline 传播然后被 ChannelHandler 处理。

  • ChannelFuture bind(SocketAddress localAddress):服务端绑定本地端口,开始监听客户端的连接请求。
  • ChannelFuture connect(SocketAddress remoteAddress):客户端向服务端发起连接请求。
  • ChannelFuture disconnect():断开连接,但是需要注意的是该方法不会释放资源,它还可以再次通过 connect() 再次与服务端建立连接。
  • ChannelFuture close():关闭通道,释放资源。
  • Channel read():读取通道
  • ChannelFuture write(Object msg):向 Channel 中写入数据,该方法并不会将数据真实地写入通道,它只将数据写入到了通断缓存区,我们需要调用 flush() 将缓存区的数据刷入到 Channel 中。
  • Channel flush():将数据刷写到 Channel 中。
  • ChannelFuture writeAndFlush(Object msg):相当于调用了 write()flush()

Channel 的配置

ChannelConfig

在 Netty 中,每个 Channel 都有与之相对应的 ChannelConfig , 可以通过调用 config() 来获取。ChannelConfig 是一个接口,每个特定的 Channel 都有具体的 ChannelConfig 实现类,例如:

  • NioSocketChannel 的对应的配置类为 NioSocketChannelConfig。
  • NioServerSocketChannel 的对应的配置类为 NioServerSocketChannelConfig。

整体的 UML 图如下:

具体的实现我们这篇文章不需要关系,我们需要关注的是它提供了哪些 Config。

  • ChannelConfig 提供通用型配置

    • ChannelOption.CONNECT_TIMEOUT_MILLIS:连接超时时间,默认值30000毫秒即30秒。
    • ChannelOption.WRITE_SPIN_COUNT:写操作的最大循环数,即一次写事件处理期间最多调用 write() 的次数。它有点儿像 Java 中的自旋锁。引入该参数的主要木的是为了避免一个 Channel 写入大量数据,对其他网络通道的读写处理带来延时。
    • ChannelOption.ALLOCATOR:设置内存分配器。
    • ChannelOption.RCVBUF_ALLOCATOR:对读事件设置内存分配器。
    • ChannelOption.AUTO_READ:配置是否自动触发 read() ,默认为 True,程序不需要显示调用 read()
    • ChannelOption.AUTO_CLOSE:配置当写事件失败时,是否自动关闭 Channel,默认为 True。
    • ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK:设置写缓存区的高水位线。如果写缓存区中的数据超过该值, Channel#isWritable() 方法将返回 false。
    • ChannelOption.WRITE_BUFFER_LOW_WATER_MARK:设置写缓存区的低水位线。如果写缓存区的数据超过高水位线后,通道将变得不可写,等写缓存数据降低到低水位线后通道恢复可写状态(Channel#isWritable()将再次返回true)。
    • ChannelOption.MESSAGE_SIZE_ESTIMATOR:设置用于检测通道消息大小的检测器:MessageSizeEstimator。

    这里引入了两个概念:高水位线和低水位线,这两个概念我们在讲缓冲区的时候再细说。

  • NioSocketChannelConfig

    NioSocketChannelConfig 在 ChannelConfig 的基础上增加了如下几个配置:

    • ChannelOption.SO_KEEPALIVE: 连接保持,默认为 False,我们可以将这个参数视为 TCP 的心跳机制。
    • ChannelOption.SO_REUSEADDR:地址复用,默认值False。
    • ChannelOption.SO_LINGER:关闭 Socket 的延迟时间,默认值为 -1,表示禁用该功能
    • ChannelOption.TCP_NODELAY:立即发送数据,默认值为 Ture。该值其实是设置 Nagle 算法的启用。关于 Nagle 算法我们后面再细说。
    • ChannelOption.SO_RCVBUF:TCP 数据接收缓冲区大小。该缓冲区即 TCP 接收滑动窗口。
    • ChannelOption.SO_SNDBUF:TCP 数据发送缓冲区大小。该缓冲区即 TCP 发送滑动窗口。
    • ChannelOption.IP_TOS:IP 参数,设置 IP 头部的 Type-of-Service 字段,用于描述 IP 包的优先级和 QoS 选项。
    • ChannelOption.ALLOW_HALF_CLOSURE:一个连接的远端关闭时本地端是否关闭,默认值为False。
  • NioServerSocketChannelConfig

    • ChannelOption.SO_REUSEADDR:地址复用,默认值False。
    • ChannelOption.SO_RCVBUF:TCP 数据接收缓冲区大小。该缓冲区即 TCP 接收滑动窗口。
    • ChannelOption.SO_BACKLOG:服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。

从上面我们可以看到 ChannelConfig 提供的都是一些通用型的配置,而 NioSocketChannelConfig 和 NioServerSocketChannelConfig 提供的基本上都是 Socket 相关的配置参数,每个都与 java.net.StandardSocketOptions 定义的标准 TCP 参数一一对应。

由于这个是入门篇,所以这里大明哥就不再扩展阐述了,对这些配置参数更加详细的说明,大明哥后面会专门有文章类分析的,这里我们只需要了解 ChannelConfig 是我们配置 Channel 通道相关参数的服务类即可。

Channel 的使用方法

看完上面部分,大明哥相信你对 Channel 有了一个基本的了解。其实Channel 的 API 没啥好演示的,因为这些 API 都不是单独使用的,需要一些其他的组件来配合,但是咱们还是要有仪式感对吧,就写一个简单的 demo 来看看 Channel 的状态变化以及简单感受下异步的风采。

  • 服务端
public static void main(String[] args) throws InterruptedException {
  Channel channel = new ServerBootstrap()
                  .group(new NioEventLoopGroup())
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<NioSocketChannel>() {
                      @Override
                      protected void initChannel(NioSocketChannel ch) throws Exception {
                          ch.pipeline().addLast(new LoggingHandler());
                      }
                  })
                  .bind(8081)
                  .channel();
  System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());

  System.out.println("eventLoop():" + channel.eventLoop());
  System.out.println("pipeline():" + channel.pipeline());

  TimeUnit.SECONDS.sleep(5);
  System.out.println("=============================");
  System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
}
  • 客户端
public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler());
                }
            })
            .connect("127.0.0.1",8081)
            .channel();

    System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());

    System.out.println("eventLoop():" + channel.eventLoop());
    System.out.println("pipeline():" + channel.pipeline());

    TimeUnit.SECONDS.sleep(5);
    System.out.println("=============================");
    System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
}
  • 运行结果
// server
isOpen:true;;;isRegistered:false;;;isActive:false
eventLoop():io.netty.channel.nio.NioEventLoop@2f333739
pipeline():DefaultChannelPipeline{(ServerBootstrap$1#0 = io.netty.bootstrap.ServerBootstrap$1)}
=============================
isOpen:true;;;isRegistered:true;;;isActive:true

//client
isOpen:true;;;isRegistered:false;;;isActive:false
eventLoop():io.netty.channel.nio.NioEventLoop@6ed3ef1
pipeline():DefaultChannelPipeline{(ChannelTestClient$1#0 = com.sike.netty.rumen.ChannelTestClient$1)}
=============================
isOpen:true;;;isRegistered:true;;;isActive:true

从结果中可以看出,无论是服务端还是客户端,Channel 都是异步的,当服务端调用 bind() 方法后返回的 Channel,它仅仅只完成了新建,注册以及绑定工作都没有完成,等待 5 秒后,我们再看其状态,则都是 true 了。

代码地址:http://m6z.cn/5O6hON

阅读全文