一、多路复用IO模型
场景描述
一个餐厅同时有100位客人到店,当然到店后第一件要做的事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。
方法A: 无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后是第二位客人。。。。然后是第三位客人。很明显,只有脑袋被门夹过的老板,才会这样设置服务流程。因为随后的80位客人,再等待超时后就会离店(还会给差评)。
方法B: 老板马上新雇佣99名服务员,同时印制99本新的菜单。每一名服务员手持一本菜单负责一位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100名)。这样每一位客人享受的就是VIP服务咯,当然客人不会走,但是人力成本可是一个大头哦(亏死你)。
方法C: 就是改进点菜的方式,当客人到店后,自己申请一本菜单。想好自己要点的才后,就呼叫服务员。服务员站在自己身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后,都要交给后堂厨师。服务员可以记录号多份菜单后,同时交给厨师就行了。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP服务并且要进行一定的等待,但是这些都是可接受的;对于服务员来说,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。
到店情况:并发量 。到店情况不理想时,一个服务员一本菜单,当然是足够了。所以不同的老板在不同的场合下,将会灵活选择服务员和菜单的配置。
客人:客户端请求
点餐内容:客户端发送的实际数据
老板:操作系统
人力成本:系统资源
**菜单:文件状态描述符。**操作系统对于一个进程能够同时持有的文件状态描述符的个数是有限制的,在linux系统中$ulimit -n查看这个限制值,当然也是可以(并且应该)进行内核参数调整的。
服务员:操作系统内核用于IO操作的线程(内核线程)
厨师:应用程序线程(当然厨房就是应用程序进程咯)
餐单传递方式:包括了阻塞式和非阻塞式两种。
- 方法A:阻塞式/非阻塞式 同步IO
- 方法B:使用线程进行处理的 阻塞式/非阻塞式 同步IO
- 方法C:阻塞式/非阻塞式 多路复用IO
多路复用IO实现方式
目前流程的多路复用IO实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较:
IO模型 | 相对性能 | 关键思路 | 操作系统 | JAVA支持情况 |
---|---|---|---|---|
select | 较高 | Reactor | windows/Linux | 支持,Reactor模式(反应器设计模式)。Linux操作系统的kernels2.4内核版本之前,默认使用select;而目前windows下对同步IO的支持,都是select模型 |
poll | 较高 | Reactor | Linux | Linux下的JAVANIO框架,Linuxkernels2.6内核版本之前使用poll进行支持。也是使用的Reactor模式 |
epoll | 高 | Reactor/Proactor | Linux | Linuxkernels2.6内核版本及以后使用epoll进行支持;Linuxkernels2.6内核版本之前使用poll进行支持;另外一定注意,由于Linux下没有Windows下的IOCP技术提供真正的异步IO支持,所以Linux下使用epoll模拟异步IO |
kqueue | 高 | ProactorLinux | 目前JAVA的版本不支持 |
多路复用IO技术最适用的是“高并发”场景,所谓高并发是指1毫秒内至少同时有上千个连接请求准备好。其他情况下多路复用IO技术发挥不出来它的优势。另一方面,使用JAVA NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。
二、NIO
JDK 1.4中的java.nio.*包中引入新的Java I/O库,其目的是提高速度。实际上,“旧”的I/O包已经使用NIO重新实现过,即使我们不显式的使用NIO编程,也能从中受益。速度的提高在文件I/O和网络I/O中都可能会发生,但本文只讨论后者。
NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。
Channel
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。一个通道会有一个专属的文件状态描述符。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据。
JDK API中的Channel的描述是:
A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.
JAVA NIO 框架中,自有的Channel通道包括:
- 所有被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。如上图所示
- ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。
- ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到 服务器IP:端口的通信连接。
- DatagramChannel:UDP 数据报文的监听通道。
Buffer
数据缓存区:在JAVA NIO 框架中,为了保证每个通道的数据读写速度JAVA NIO 框架为每一种需要支持数据读写的通道集成了Buffer的支持。
这句话怎么理解呢?例如ServerSocketChannel通道它只支持对OP_ACCEPT事件的监听,所以它是不能直接进行网络数据内容的读写的。所以ServerSocketChannel是没有集成Buffer的。
Buffer有两种工作模式:写模式和读模式。在读模式下,应用程序只能从Buffer中读取数据,不能进行写操作。但是在写模式下,应用程序是可以进行读操作的,这就表示可能会出现脏读的情况。所以一旦您决定要从Buffer中读取数据,一定要将Buffer的状态改为读模式。
表 1.Buffer 中的参数项
索引 | 说明 |
---|---|
capacity | 缓冲区数组的总长度 |
position | 下一个要操作的数据元素的位置 |
limit | 缓冲区数组中不可操作的下一个元素的位置,limit<=capacity |
mark | 用于记录当前position的前一个位置或者默认是0 |
在实际操作数据时它们有如下关系图:
我们通过 ByteBuffer.allocate(11) 方法创建一个 11 个 byte 的数组缓冲区,初始状态如上图所示,position 的位置为 0,capacity 和 limit 默认都是数组长度。当我们写入 5 个字节时位置变化如下图所示:
这时我们需要将缓冲区的 5 个字节数据写入 Channel 通信信道,所以我们需要调用 byteBuffer.flip() 方法,数组的状态又发生如下变化:
这时底层操作系统就可以从缓冲区中正确读取这 5 个字节数据发送出去了。在下一次写数据之前我们在调一下 clear() 方法。缓冲区的索引状态又回到初始位置。
这里还要说明一下 mark,当我们调用 mark() 时,它将记录当前 position 的前一个位置,当我们调用 reset 时,position 将恢复 mark 记录下来的值。
还有一点需要说明,通过 Channel 获取的 I/O 数据首先要经过操作系统的 Socket 缓冲区再将数据复制到 Buffer 中,这个的操作系统缓冲区就是底层的 TCP 协议关联的 RecvQ 或者 SendQ 队列,从操作系统缓冲区到用户缓冲区复制数据比较耗性能,Buffer 提供了另外一种直接操作操作系统缓冲区的的方式即 ByteBuffer.allocateDirector(size),这个方法返回的 byteBuffer 就是与底层存储空间关联的缓冲区,它的操作方式与 linux2.4 内核的 sendfile 操作方式类似。
Selector
Selector的英文含义是“选择器”,不过根据我们详细介绍的Selector的岗位职责,您可以把它称之为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。
事件订阅和Channel管理:
应用程序将向Selector对象注册需要它关注的Channel,以及具体的某一个Channel会对哪些IO事件感兴趣。Selector中也会维护一个“已经注册的Channel”的容器。以下代码来自WindowsSelectorImpl实现类中,对已经注册的Channel的管理容器:
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private final static int MAX_SELECTABLE_FDS = 1024;
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
- 轮询代理: 应用层不再通过阻塞模式或者非阻塞模式直接询问操作系统“事件有没有发生”,而是由Selector代其询问。
- 实现不同操作系统的支持: 之前已经提到过,多路复用IO技术是需要操作系统进行支持的,其特点就是操作系统可以同时扫描同一个端口上不同网络连接的时间。所以作为上层的JVM,必须要为不同操作系统的多路复用IO实现编写不同的代码。
三、JAVA NIO 框架简要设计分析
通过上文的描述,我们知道了多路复用IO技术是操作系统的内核实现。在不同的操作系统,甚至同一系列操作系统的版本中所实现的多路复用IO技术都是不一样的。那么作为跨平台的JAVA JVM来说如何适应多种多样的多路复用IO技术实现呢?面向对象的威力就显现出来了: 无论使用哪种实现方式,他们都会有“选择器”、“通道”、“缓存”这几个操作要素,那么可以为不同的多路复用IO技术创建一个统一的抽象组,并且为不同的操作系统进行具体的实现 。JAVA NIO中对各种多路复用IO的支持,主要的基础是java.nio.channels.spi.SelectorProvider抽象类,其中的几个主要抽象方法包括:
- public abstract DatagramChannel openDatagramChannel():创建和这个操作系统匹配的UDP
通道实现。 - public abstract AbstractSelector
openSelector():创建和这个操作系统匹配的NIO选择器,就像上文所述,不同的操作系统,不同的版本所默认支持的NIO模型是不一样的。 - public abstract ServerSocketChannel
openServerSocketChannel():创建和这个NIO模型匹配的服务器端通道。 - public abstract SocketChannel openSocketChannel():创建和这个NIO模型匹配的TCP
Socket套接字通道(用来反映客户端的TCP连接)
由于JAVA NIO框架的整个设计是很大的,所以我们只能还原一部分我们关心的问题。这里我们以JAVA NIO框架中对于不同多路复用IO技术的选择器 进行实例化创建的方式作为例子,以点窥豹观全局:
很明显,不同的SelectorProvider实现对应了不同的 选择器。由具体的SelectorProvider实现进行创建。另外说明一下,实际上netty底层也是通过这个设计获得具体使用的NIO模型。
代码示例
客户端和上就篇一样,这里就不写了
服务端:
package demo.com.test.io.nio;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class NioSocketServer {
private static final Log LOGGER = LogFactory.getLog(NioSocketServer.class);
public static void main(String[] args) throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(8083));
Selector selector = Selector.open();
//注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
try {
while(true) {
//如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
//java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
if(selector.select(100) == 0) {
//================================================
// 这里视业务情况,可以做一些然并卵的事情
//================================================
//System.out.println("100次还没有发现感兴趣的事");
continue;
}
//这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();
while(selecionKeys.hasNext()) {
SelectionKey readyKey = selecionKeys.next();
//这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
//待到下一次selector.select() > 0时,这个readyKey又会被处理一次
selecionKeys.remove();
SelectableChannel selectableChannel = readyKey.channel();
if(readyKey.isValid() && readyKey.isAcceptable()) {
NioSocketServer.LOGGER.info("======channel通道已经准备好=======");
/*
* 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
* 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
* 否则无法监听到这个socket channel到达的数据
* */
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
SocketChannel socketChannel = serverSocketChannel.accept();
registerSocketChannel(socketChannel , selector);
} else if(readyKey.isValid() && readyKey.isConnectable()) {
NioSocketServer.LOGGER.info("======socket channel 建立连接=======");
} else if(readyKey.isValid() && readyKey.isReadable()) {
NioSocketServer.LOGGER.info("======socket channel 数据准备完成,可以去读==读取=======");
readSocketChannel(readyKey);
}
}
}
} catch(Exception e) {
NioSocketServer.LOGGER.error(e.getMessage() , e);
} finally {
serverSocket.close();
}
//这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
synchronized (NioSocketServer.class) {
NioSocketServer.class.wait();
}
}
/**
* 在server socket channel接收到/准备好 一个新的 TCP连接后。
* 就会向程序返回一个新的socketChannel。<br>
* 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
* 所以程序还没法通过selector通知这个socket channel的事件。
* 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
* socket channel感兴趣的事件
* @param socketChannel 新的socket channel
* @param selector selector“选择器/代理器”
* @throws Exception
*/
private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
socketChannel.configureBlocking(false);
//socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(2048));
}
/**
* 这个方法用于读取从客户端传来的信息。
* 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
* 如果传输完成,则返回一个true的标记。
* @param socketChannel
* @throws Exception
*/
private static void readSocketChannel(SelectionKey readyKey) throws Exception {
SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel();
//获取客户端使用的端口
InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
Integer resoucePort = sourceSocketAddress.getPort();
//拿到这个socket channel使用的缓存区,准备读取数据
//在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment();
//将通道的数据写入到缓存区,注意是写入到缓存区。
//由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况
//没关系,我们后面来调整代码。这里我们暂时理解为一次接受可以完成
int realLen = -1;
try {
realLen = clientSocketChannel.read(contextBytes);
} catch(Exception e) {
//这里抛出了异常,一般就是客户端因为某种原因终止了。所以关闭channel就行了
NioSocketServer.LOGGER.error(e.getMessage());
clientSocketChannel.close();
return;
}
//如果缓存区中没有任何数据(但实际上这个不太可能,否则就不会触发OP_READ事件了)
if(realLen == -1) {
NioSocketServer.LOGGER.warn("====缓存区没有数据?====");
return;
}
//将缓存区从写状态切换为读状态(实际上这个方法是读写模式互切换)。
//这是java nio框架中的这个socket channel的写请求将全部等待。
contextBytes.flip();
//注意中文乱码的问题,我个人喜好是使用URLDecoder/URLEncoder,进行解编码。
//当然java nio框架本身也提供编解码方式,看个人咯
byte[] messageBytes = contextBytes.array();
String messageEncode = new String(messageBytes , "UTF-8");
String message = URLDecoder.decode(messageEncode, "UTF-8");
//如果收到了“over”关键字,才会清空buffer,并回发数据;
//否则不清空缓存,还要还原buffer的“写状态”
if(message.indexOf("over") != -1) {
//清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
contextBytes.clear();
NioSocketServer.LOGGER.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);
//======================================================
// 当然接受完成后,可以在这里正式处理业务了
//======================================================
//回发数据,并关闭channel
ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("你好客户端,这是服务器的返回数据", "UTF-8").getBytes());
clientSocketChannel.write(sendBuffer);
clientSocketChannel.close();
} else {
NioSocketServer.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + message);
//这是,limit和capacity的值一致,position的位置是realLen的位置
contextBytes.position(realLen);
contextBytes.limit(contextBytes.capacity());
}
}
}
代码中的注释是比较清楚的,但是还是要对几个关键点进行一下讲解:
serverChannel.register(Selector sel, int ops, Object att):实际上register(Selector sel, int ops, Object att)方法是ServerSocketChannel类的父类AbstractSelectableChannel提供的一个方法,表示只要继承了AbstractSelectableChannel类的子类都可以注册到选择器中。通过观察整个AbstractSelectableChannel继承关系,下图中的这些类可以被注册到选择器中:
SelectionKey.OP_ACCEPT:不同的Channel对象可以注册的“我关心的事件”是不一样的。例如ServerSocketChannel除了能够被允许关注OP_ACCEPT时间外,不允许再关心其他事件了(否则运行时会抛出异常)。以下梳理了常使用的AbstractSelectableChannel子类可以注册的事件列表:
实际上通过每一个AbstractSelectableChannel子类所实现的public final int validOps()方法,就可以查看这个通道“可以关心的IO事件”。
selector.selectedKeys().iterator():当选择器Selector收到操作系统的IO操作事件后,它的selectedKeys将在下一次轮询操作中,收到这些事件的关键描述字(不同的channel,就算关键字一样,也会存储成两个对象)。 但是每一个“事件关键字”被处理后都必须移除,否则下一次轮询时,这个事件会被重复处理。
Returns this selector’s selected-key set.
Keys may be removed from, but not directly added to, the selected-key set. Any attempt to add an object to the key set will cause an UnsupportedOperationException to be thrown.
The selected-key set is not thread-safe.
四、多路复用IO的优缺点
- 不用再使用多线程来进行IO处理了(包括操作系统内核IO管理模块和应用程序进程而言)。当然实际业务的处理中,应用程序进程还是可以引入线程池技术的
- 同一个端口可以处理多种协议,例如,使用ServerSocketChannel测测的服务器端口监听,既可以处理TCP协议又可以处理UDP协议。
- 操作系统级别的优化:多路复用IO技术可以是操作系统级别在一个端口上能够同时接受多个客户端的IO事件。同时具有之前我们讲到的阻塞式同步IO和非阻塞式同步IO的所有特点。Selector的一部分作用更相当于“轮询代理器”。
- 都是同步IO:目前我们介绍的阻塞式IO、非阻塞式IO甚至包括多路复用IO,这些都是基于操作系统级别对“同步IO”的实现。我们一直在说“同步IO”,一直都没有详细说,什么叫做“同步IO”。实际上一句话就可以说清楚:只有上层(包括上层的某种代理机制)系统询问我是否有某个事件发生了,否则我不会主动告诉上层系统事件发生了