上篇文章(Netty 入门 — ByteBuf,Netty 数据传输的载体),我们了解了 Netty 的数据是以 ByteBuf 为单位进行传输的,但是有了数据,你没有通道,数据是无法传输的,所以今天我们来熟悉 Netty 的第三个核心组件:Channel。ByteBuf 是数据,那 Channel 则是负责传输数据的通道,它是把握 Netty 通信的命门,没有它 Netty 是无法通信的。
Channel 简介
在 Java NIO 中我们知道,Channel,即通道,是用来传输数据的一条“管道”,它与 Buffer 相辅相成,在 Java NIO 中,我们只能从 Channel 读取数据到 Buffer 中,或者从 Buffer 读取数据到 Channel 中,如下:
在 Netty 中同样有一个 Channel,该 Channel 是 Netty 的核心概念之一,它是 Netty 网络 IO 操作的抽象,即 Netty 网络通信的主体,由它来负责对端进行网络通信、注册、数据操作等一切 IO 相关的操作,其主要功能包括:
- 网络 IO 的读写
- 客户端发起连接
- 关闭连接
- 网络连接的相关参数
- 绑定端口
- Netty 框架相关操作,如获取 Channel 相关联的 EventLoop、pipeline 等。
为什么要另起炉灶?
JDK 提供了一个 Channel,为什么 Netty 还要另起炉灶自己实现一个呢?我认为主要原因有如下几个:
- 原生的 Channel 功能太少,不满足 Netty 的要求。
- 原生的 ServerSocketChannel 和 SocketChannel 是一个 SPI 接口,具体的实现由虚拟厂商来实现,直接通过原生 ServerSocketChannel 和 SocketChannel 来实现及满足 Netty 的要求,其工作量不亚于重新开发一个。
- Netty 的 Channel 需要符合 Netty 的整体架构设计,他需要和 Netty 的整体架构耦合在一起,比如 IO 模型、基于元数据描述配置化的TCP参数等等,原生的 Channel 都不支持。
- 自定义的 Channel,灵活性更高,功能更加全面。
Channel 原理
Channel 的核心原理如下图:
- 客户端与服务端成功建立连接后,服务端会为该连接创建一个 Channel。
- Channel 从 EventLoopGroup 中获取一个 EventLoop,Channel 注册到该 EventLoop 中,从此 Channel 就与该 EventLoop 绑定在一起了,在 Channel 整个生命周期内都只会与该 EventLoop 绑定在一起。
- 客户端发起的 IO 操作,在 Channel 中都将产生相对应的 Event,触发与该 Channel 绑定的 EventLoop 进行处理
- 如果是读写事件,执行线程调度 ChannelPipeline 来处理业务逻辑。ChannelPipeline 只负责 Handler 的编排,真正执行任务的是各个具体的 ChannelHandler。
Channel 的状态转换
Channel 从创建到消亡,他有四种状态,分别是:
- 打开状态(Open):
- Channel 处于打开状态时,表示它已经被创建,但尚未绑定到任何地址或连接到远端服务器。
- 活动状态(Active):
- Channel 处于活动状态时,表示它已经成功绑定到本地地址或连接到远端服务器。
- 这个时候可以调用
writeAndFlush()
向对方发送数据了。
- 非活动状态(Inactive):
- Channel 处于非活动状态时,表示它已经处于活动状态,但连接已经断开或由于其他原因不可用。
- 当连接被关闭或出现错误时,Channel 会进入非活动状态。
- 无法进行读取或写入操作,但可重新激活 Channel。
- 关闭状态(Closed):
- Channel 处于关闭状态时,表示它已经完全关闭,无法再进行任何操作。
状态流转如下:
Netty 提供了四个方法来判断 Channel 的状态:
isOpen()
:检查 Channel 是否为 open 状态。isRegistered()
:检查 Channel 是否为 registered 状态。isActive()
:检查 Channel 是否为 active 状态。isWritable()
:这个方法有误导性,它并不是判断当前 Channel 是否可写,实际上它是用来检测当前 Channel 的写操作是否可以立刻被 IO 线程处理,当该方法返回 false 时,任何写请求都会被阻塞,知道 I/O 线程有能力能处理这些请求。
各个状态以及他们对应的操作如下表格:
状态 | isOpen() | isActive() | close() | writeAndFlush() | 读操作 | 写操作 |
---|---|---|---|---|---|---|
打开(Open) | true | false | true | false | true | true |
活动(Active) | true | true | true | true | true | true |
非活动(Inactive) | true | false | true | false | false | false |
关闭(Closed) | false | false | false | false | false | false |
Channel 的 API
Channel 常用的 API 非常多,如下图(部分):
方法虽然多,但是总体大致分为如下几类:
类 getter API
这里方法主要用于获取 Channel 相关的属性,如绑定地址,相关配置等等
SocketAddress localAddress()
:返回与 Channel 绑定的本地地址SocketAddress remoteAddress()
:返回与 Channel 绑定的远端地址ChannelConfig config()
:返回一个 ChannelConfig 对象,通过这个对象可以配置Channel相关的参数ChannelMetadata metadata()
:返回一个 ChannelMetadata 对象,ChannelMetadata 可以查询 Channel 实现是否支持某种操作,目前它还只要一个方法hasDisconnect()
,用来判断 Channel 实现是否支持disconnect()
操作。Channel parent()
:返回 Channel 的 parent Channel。SocketChannel 返回的是相对一个的 ServerSocketChannel,而 ServerSocketChannel 则返回 null。为什么 SocketChannel 返回的是 ServerSocketChannel 呢?因为所有的 SocketChannel (客户端发起连接)都是由 ServerSocketChannel 接受连接而创建的,所以 SocketChannel 的parent()
返回的就是对应的 ServerSocketChannel 。EventLoop eventLoop()
:返回 Channel 注册的 EventLoop。ChannelPipeline pipeline()
:返回与 Channel 关联的 ChannelPipeline。ByteBufAllocator alloc()
:返回与 Channel 关联的 ByteBufAllocator 对象。Unsafe unsafe()
:返回 Channel 的 Unsafe 对象。Unsafe 是 Channel 的内部类,只供 Channel 内部使用。
Future 相关 API
Netty 中的所有 IO 操作都是异步的,这就意味着任何的 IO 调用都将立刻返回,但是并不能保证所有的操作都在调用结束后就完成了,而且我们也不知道 IO 操作执行的结果。Netty 在完成 IO 调用后会返回一个 Future 对象,该对象就是 Channel 异步 IO 的结果。Channel 提供了我们操作这些 Future 的方法:
ChannelFuture closeFuture()
:当 Channel 关闭时返回一个 ChannelFuture,我们可以通过该方法来来对 Channel 关闭后做一些处理。ChannelPromise voidPromise()
:返回一个ChannelPromise
实例对象。ChannelPromise newPromise()
:返回一个 ChannelPromise。ChannelProgressivePromise newProgressivePromise()
:返回一个新的 ChannelProgressivePromise 实例对象。ChannelFuture newSucceededFuture()
:创建一个新的 ChannelFuture,并将其标注为 succeed 。ChannelFuture newFailedFuture(Throwable cause)
:创建一个新的 ChannelFuture,并将其标注为 failed。
ChannelFuture
和 ChannelPromise
是 Netty 提供的两个特殊 Future,利用他们我们能够在 Netty 完成一些异步操作的处理。
判断状态 API
Channel 提供了四个 isXxx 方法用来判断 Channel 的状态:
boolean isOpen()
:判断 Channel 是否 openedboolean isRegistered()
:判断 Channel 是否 registeredboolean isActive()
:判断 Channel 是否 activeboolean isWritable()
:判断 Channel 是否可以立刻处理 IO 事件
事件触发类方法
这些方法都会触发 IO 事件,他们都会通过 ChannelPipeline 传播然后被 ChannelHandler 处理。
ChannelFuture bind(SocketAddress localAddress)
:服务端绑定本地端口,开始监听客户端的连接请求。ChannelFuture connect(SocketAddress remoteAddress)
:客户端向服务端发起连接请求。ChannelFuture disconnect()
:断开连接,但是需要注意的是该方法不会释放资源,它还可以再次通过connect()
再次与服务端建立连接。ChannelFuture close()
:关闭通道,释放资源。Channel read()
:读取通道ChannelFuture write(Object msg)
:向 Channel 中写入数据,该方法并不会将数据真实地写入通道,它只将数据写入到了通断缓存区,我们需要调用flush()
将缓存区的数据刷入到 Channel 中。Channel flush()
:将数据刷写到 Channel 中。ChannelFuture writeAndFlush(Object msg)
:相当于调用了write()
和flush()
。
Channel 的配置
ChannelConfig
在 Netty 中,每个 Channel 都有与之相对应的 ChannelConfig , 可以通过调用 config()
来获取。ChannelConfig 是一个接口,每个特定的 Channel 都有具体的 ChannelConfig 实现类,例如:
- NioSocketChannel 的对应的配置类为 NioSocketChannelConfig。
- NioServerSocketChannel 的对应的配置类为 NioServerSocketChannelConfig。
整体的 UML 图如下:
具体的实现我们这篇文章不需要关系,我们需要关注的是它提供了哪些 Config。
-
ChannelConfig 提供通用型配置
ChannelOption.CONNECT_TIMEOUT_MILLIS
:连接超时时间,默认值30000毫秒即30秒。ChannelOption.WRITE_SPIN_COUNT
:写操作的最大循环数,即一次写事件处理期间最多调用write()
的次数。它有点儿像 Java 中的自旋锁。引入该参数的主要木的是为了避免一个 Channel 写入大量数据,对其他网络通道的读写处理带来延时。ChannelOption.ALLOCATOR
:设置内存分配器。ChannelOption.RCVBUF_ALLOCATOR
:对读事件设置内存分配器。ChannelOption.AUTO_READ
:配置是否自动触发read()
,默认为 True,程序不需要显示调用read()
。ChannelOption.AUTO_CLOSE
:配置当写事件失败时,是否自动关闭 Channel,默认为 True。ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
:设置写缓存区的高水位线。如果写缓存区中的数据超过该值,Channel#isWritable()
方法将返回 false。ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
:设置写缓存区的低水位线。如果写缓存区的数据超过高水位线后,通道将变得不可写,等写缓存数据降低到低水位线后通道恢复可写状态(Channel#isWritable()
将再次返回true)。ChannelOption.MESSAGE_SIZE_ESTIMATOR
:设置用于检测通道消息大小的检测器:MessageSizeEstimator。
这里引入了两个概念:高水位线和低水位线,这两个概念我们在讲缓冲区的时候再细说。
-
NioSocketChannelConfig
NioSocketChannelConfig 在 ChannelConfig 的基础上增加了如下几个配置:
ChannelOption.SO_KEEPALIVE
: 连接保持,默认为 False,我们可以将这个参数视为 TCP 的心跳机制。ChannelOption.SO_REUSEADDR
:地址复用,默认值False。ChannelOption.SO_LINGER
:关闭 Socket 的延迟时间,默认值为 -1,表示禁用该功能ChannelOption.TCP_NODELAY
:立即发送数据,默认值为 Ture。该值其实是设置 Nagle 算法的启用。关于 Nagle 算法我们后面再细说。ChannelOption.SO_RCVBUF
:TCP 数据接收缓冲区大小。该缓冲区即 TCP 接收滑动窗口。ChannelOption.SO_SNDBUF
:TCP 数据发送缓冲区大小。该缓冲区即 TCP 发送滑动窗口。ChannelOption.IP_TOS
:IP 参数,设置 IP 头部的 Type-of-Service 字段,用于描述 IP 包的优先级和 QoS 选项。ChannelOption.ALLOW_HALF_CLOSURE
:一个连接的远端关闭时本地端是否关闭,默认值为False。
-
NioServerSocketChannelConfig
ChannelOption.SO_REUSEADDR
:地址复用,默认值False。ChannelOption.SO_RCVBUF
:TCP 数据接收缓冲区大小。该缓冲区即 TCP 接收滑动窗口。ChannelOption.SO_BACKLOG
:服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。
从上面我们可以看到 ChannelConfig 提供的都是一些通用型的配置,而 NioSocketChannelConfig 和 NioServerSocketChannelConfig 提供的基本上都是 Socket 相关的配置参数,每个都与 java.net.StandardSocketOptions
定义的标准 TCP 参数一一对应。
由于这个是入门篇,所以这里大明哥就不再扩展阐述了,对这些配置参数更加详细的说明,大明哥后面会专门有文章类分析的,这里我们只需要了解 ChannelConfig 是我们配置 Channel 通道相关参数的服务类即可。
Channel 的使用方法
看完上面部分,大明哥相信你对 Channel 有了一个基本的了解。其实Channel 的 API 没啥好演示的,因为这些 API 都不是单独使用的,需要一些其他的组件来配合,但是咱们还是要有仪式感对吧,就写一个简单的 demo 来看看 Channel 的状态变化以及简单感受下异步的风采。
- 服务端
public static void main(String[] args) throws InterruptedException {
Channel channel = new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
}
})
.bind(8081)
.channel();
System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
System.out.println("eventLoop():" + channel.eventLoop());
System.out.println("pipeline():" + channel.pipeline());
TimeUnit.SECONDS.sleep(5);
System.out.println("=============================");
System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
}
- 客户端
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
}
})
.connect("127.0.0.1",8081)
.channel();
System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
System.out.println("eventLoop():" + channel.eventLoop());
System.out.println("pipeline():" + channel.pipeline());
TimeUnit.SECONDS.sleep(5);
System.out.println("=============================");
System.out.println("isOpen:" + channel.isOpen() + ";;;isRegistered:" + channel.isRegistered() + ";;;isActive:" + channel.isActive());
}
- 运行结果
// server
isOpen:true;;;isRegistered:false;;;isActive:false
eventLoop():io.netty.channel.nio.NioEventLoop@2f333739
pipeline():DefaultChannelPipeline{(ServerBootstrap$1#0 = io.netty.bootstrap.ServerBootstrap$1)}
=============================
isOpen:true;;;isRegistered:true;;;isActive:true
//client
isOpen:true;;;isRegistered:false;;;isActive:false
eventLoop():io.netty.channel.nio.NioEventLoop@6ed3ef1
pipeline():DefaultChannelPipeline{(ChannelTestClient$1#0 = com.sike.netty.rumen.ChannelTestClient$1)}
=============================
isOpen:true;;;isRegistered:true;;;isActive:true
从结果中可以看出,无论是服务端还是客户端,Channel 都是异步的,当服务端调用 bind()
方法后返回的 Channel,它仅仅只完成了新建,注册以及绑定工作都没有完成,等待 5 秒后,我们再看其状态,则都是 true 了。
代码地址:http://m6z.cn/5O6hON