selector将非阻塞模式改进为无事件阻塞,有事件非阻塞。
网络编程
非阻塞,阻塞和多路复用
- 阻塞:线程没有监听到指定的操作执行时,就会停止运行。如ServerSocketChannel.accept()和channel.read(buffer)都是阻塞方法。在单线程的情况下,服务端只能执行一轮accpet一个线程并且read客户端传来的数据,但是会阻塞在下一次的accept。所以阻塞方法只能适用在多线程的情况下。但是在多线程的情况下也会有线程太多,频繁上下文切换导致性能降低.
- 非阻塞:各个操作之间没有影响,可以监听多个线程的多个操作。问题在于一直循环cpu占用率很高.
- 多路复用:单线程情况下通过配合Selector完成对多个channel可读写事件的监控,就是多路复用。 Selector保证了有可连接事件时才去连接,有可读事件时才去读取,有可写事件时才去写入.
Selector(基于事件驱动)(多路复用)
1. 核心代码
public static void main(String[] args) throws IOException {
//1.定义Selector,管理多个channel
Selector selector = Selector.open();
//服务端的channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8282));
//2.将ServerSocketChannel注册到selector下,参数0表示不监听任何事件,通过下一个方法进行监听
SelectionKey sscKey = ssc.register(selector,0,null);
//3.定义SelectionKey的监听事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while(true){
//4.select方法
//当没有任何事件发生则阻塞,任一事件发生了就继续执行。
//避免了无效的空转
selector.select();
//5.处理事件,方法返回所有可用的集合事件
//利用迭代器遍历
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey curKey = iterator.next();
//非常重要,解决空指针异常,当一个selectionKey上的事件都处理完了之后,nio并不会将该selectionKey从集合中除去,下次执行就会爆空指针异常,所以要手动除去。
iterator.remove();
//区分事件类型
if(curKey.isAcceptable()){
log.debug("连接事件...");
//6.通过SelectionKey获取到关联的channel
ServerSocketChannel channel = (ServerSocketChannel)curKey.channel();
//执行对应的事件
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
//7.将SocketChannel注册到Selector中,并将buffer作为附件关联到SelectionKey,使之能够一一对应,一个channel维护一个独立的buffer,避免多线程情况下buffer中内容混乱。
ByteBuffer buffer = ByteBuffer.allocate(16);
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
}else if(curKey.isReadable()){
try {
log.debug("读取事件...");
//6.通过SelectionKey获取到关联的channel
SocketChannel channel = (SocketChannel)curKey.channel();
//7.获取附件的buffer
ByteBuffer buffer = (ByteBuffer)curKey.attachment();
int read = channel.read(buffer);
//如果客户端正常断开,返回值拿到-1,需要将事件取消
if(read == -1) {
curKey.cancel();
continue;
}
split(buffer);
//如果position和limit相同,则说明buffer已经满了
if(buffer.position()==buffer.limit()){
System.out.println("经过一次扩容");
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
curKey.attach(newBuffer);
}else{
for (int j = buffer.position(); j < buffer.limit(); j++) {
System.out.print((char)buffer.get(j));
}
System.out.println();
}
} catch (IOException e) {
//如果强制断开,进入异常,需要将事件取消
e.printStackTrace();
curKey.cancel();
}
}
}
}
}
2. 理解
- Selector类似于注册中心,通过selector可以获取到所有的channel。
- Selector模式下有两个集合,分别为channel集合(key为selectionKey,value为channel)和事件集合(key为selectionKey,value为事件队列)。当select()方法监听到事件后,会同时添加两个到两个集合中,而当事件执行完成后,会将事件集合中当前键值对的value去除掉已经执行的事件,但是即使value为空了当前键值对也不会删除,所以每次都需要手动地去迭代器中删除key,来解决空指针异常。
- selector.select()等待的是所有事件(包括未处理事件),所以不会造成一个事件阻塞而导致另外事件无法监听到的问题。问题在于事件必须处理,或者执行cancel方法,否则会一直轮询。
- selector将非阻塞模式改进为无事件阻塞,有事件非阻塞。
- channel通过register注册到selector,注册的同时绑定一个selectionKey,selectionKey通过interestOps监听事件,并且可以通过attach绑定buffer。
3. 处理消息边界
- 拆包粘包:LTV或者TLV协议传输
- attachment附件:channel注册时绑定对应的buffer,将一个buffer作为附件关联到selectionKey上。
- channel容量不足:扩容然后作为新的附件关联到selectionKey上。
4. buffer大小分配
- 先分配小的buffer,不断两倍扩容。消息连续易于处理,但是需要拷贝耗费性能。
- 多个数组组成buffer,一个数组不够就把多的内容写入新的数组。不连续解析复杂,但是避免了拷贝。
5. Select()何时不阻塞
- 客户端发起连接,触发accept
- 客户端发数据,客户端正常,异常关闭或者发送的数据大于buffer缓冲区触发1到n次读取事件。
- channel可写,触发1到n次写入事件。
- 调用selector的wakeup()和close()方法。
ByteBuffer 和 Channel 常用方法
- 读取 从channel读取数据填充ByteBuffer。
int readBytes = channel.read(buffer)
- 写入
ByteBuffer buffer = ...
buffer.put(..);
buffer.flip();//切换读模式
while(buffer.hasRemaining()) {
channel.write(buffer);
//channel.write(Charset.defaultCharset().encode("hello"));
}
- 文件传输
from.transferTo(position,size,to)