《Java 源码分析》:Java NIO 之 Selector(第二部分selector.select())
上篇博文《Java 源码分析》:Java NIO 之 Selector(第一部分Selector.open())从源码的角度主要介绍了Selector.open()这个方法背后主要做了什么,发生了什么。
本篇就是第二部分:从源码的角度来看下selector.select()背后做了些什么,怎么做的
在看这篇博文之前,希望你已经阅读了上一篇博文:《Java 源码分析》:Java NIO 之 Selector(第一部分Selector.open())。因为这篇博文是在上篇博文的基础上来进行介绍的。
为了更好的方便理解这篇博文所介绍的内容,我们先来回顾下上篇博文中所介绍的内容。
Selector selector = Selector.open();这行代码简单来说:实例化了一个WindowSelectorImpl类的对象。并且在windows下通过两个链接的socketChannel实现了Pipe。
知道上面一点知识就更加方便的来理解了。下面开始详细的介绍。
selector.select() 介绍
selector.select()在Selector类中此方法是一个抽象的。
如下:
public abstract int select() throws IOException;
函数功能(根据源码上的注释翻译而来):选择一些I/O操作已经准备好的管道。每个管道对应着一个key。这个方法 是一个阻塞的选择操作。当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。
除了这个方法,还有两个重载方法,如下
1、select(long timeout)
public abstract int select(long timeout)
throws IOException;
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
这个方法并不能提供精确时间的保证,和当执行wait(long timeout)方法时并不能保证会延时timeout道理一样。
这里的timeout说明如下:
- 如果 timeout为正,则select(long timeout)在等待有通道被选择时至多会阻塞timeout毫秒
- 如果timeout为零,则永远阻塞直到有至少一个通道准备就绪。
- timeout不能为负数
2、selectNow()
public abstract int selectNow() throws IOException;
这个方法与select()的区别在于,是非阻塞的,即当前操作即使没有通道准备好也是立即返回。只是返回的是0.
值得注意的是:调用这个方法会清除所有之前执行了wakeup方法的作用。
以上就是select()以及其两个重载方法的一点说明。
下面来看select()方法在其子类的具体实现
在上篇博文中我们通过源码的角度知道Selector selector = Selector.open();代码中的selector实际是指向的是其子类WindowsSelectorImpl 的对象实例。
因此在 我们执行 selector.select()方法时,实际上时调用的是 WindowsSelectorImpl 中的select()方法
在找这个函数的时候,觉得有必要说下Selector, WindowsSelectorImpl 之间的继承关系:
- WindowsSelectorImpl 的直接父类是 SelectorImpl(select方法在这个里面实现);
- SelectorImpl 的直接父类是 AbstractSelector
- AbstractSelector 的直接父类是 Selector.
以上就是他们的继承关系,其中select()方法是在 SelectorImpl类中进行实现的。
——SelectorImpl.java中的部分代码如下———–
public int select(long timeout)
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
public int select() throws IOException {
return select(0);
}
public int selectNow() throws IOException {
return lockAndDoSelect(0);
}
其中select()方法调用了select(timeout)方法,只是timeout=0.
以上三个函数都是调用了 lockAndDoSelect()方法(此方法也是在SelectorImpl中实现的)
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen()) //检查这个Selector是否打开
throw new ClosedSelectorException();
//双重锁
//publicKeys和 publicSelectedKeys的定义如下:
//private Set<SelectionKey> publicKeys; // Immutable
//private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
1、doSelect(timeout)方法在第2点中进行说明,这里先看下这个isOpen()函数的具体实现,这个函数是在 AbstractSelector 中实现的。
public final boolean isOpen() {
return selectorOpen.get();//
}
函数的功能:检查这个Selector是否打开;
在isOpen()方法体中的selectorOpen变量在AbstractSelector类中定义如下:
private AtomicBoolean selectorOpen = new AtomicBoolean(true);
即selectorOpen是一个原子性的变量;如果在执行selector.select()方法之前执行了Selector selector = Selector.open();则selectorOpen就进行了初始化,为true。否则为false。
2、doSelect(timeout)方法的介绍
这个是lockAndDoSelect()方法中的关键,下面具体来看
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
这个方法的代码还是比较复杂的哈(这次就不详细的对每行代码进行一个分析了哈,以后遇到问题了再详细的来理解分析),但是我们首先要关注一点,就是subSelector.poll()这行代码,这个是一个核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:
SubSelector 是 WindowsSelectorImpl的一个内部类。
SubSelector 类中的poll()方法源码如下:
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//readFds保存发生read的FD
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];//writeFds保存发生写的FD
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];//exceptFds保存发生except的FD
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
其中poll0是一个native方法
/*
参数说明:readFds保存发生read的FD,writeFds保存发生写的FD,
exceptFds保存发生except的FD
*/
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。
比如,由于pollWrapper中保存的也有ServerSocketChannel的FD(在上篇博文中提到),所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;
又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD发一份数据,也会造成poll0()返回;
如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,
所有在OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();
WindowsSelectorImpl.wakeup()
看完了select()方法的内部实现思路,最后来看下:WindowsSelectorImpl.wakeup()的具体实现.
wakeup()方法源码如下:
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);
native实现摘要:
——-WindowsSelectorImpl.c —-
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
send(scoutFd, (char*)&POLLIN, 1, 0);
}
这里完成了向最开始建立的pipe的sink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。(原来自己建立一个socket链着自己另外一个socket就是为了干这事)
sun.nio.ch包下面所有的类库可以在这里看到:http://www.docjar.com/docs/api/sun/nio/ch/package-index.html
小结
关于selector.select()方法中的脉络就这样的顺了一遍,还是有很多的细节自己由于水平的原因没有理解清楚,如有错误,请批评指正。
由于目前自己从源码的角度只看了Selector.open()方法和selector.select()方法的内部实现。还有以下几块内容有待自己去理解:
1、Channel 类中的 register()方法,其中涉及到ServerSocketChannel 类和SocketChannel类。
2、selector.selectedKey()方法返回的Set集合中的值是何时添加进去的,以及SelectionKey类相关的一些操作的具体实现。
这就是两块比较重要内容,这对理解整个的Java NIO有很大帮助,因此,自己也会在最近的时间来啃以上相关的内容。
最后,还是贴上下面的框图。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。