Github地址:github.com/lhj502819/n…,示例代码在example模块中
系列文章
- 你知道都有哪些I/O模型吗?
- Java NIO三大角色Channel、Buffer、Selector
- Doug lea《Scalable IO in Java》翻译
- Reactor模型你知道都有哪些吗?
- Netty服务端创建源码流程解析
- EventLoopGroup到底是个啥?
- 深入剖析Netty之EventLoop刨根问底
- 未完待续..
在上篇文章中我们对NioEventLoop的父类进行了详细分析,今天我们就来拆解拆解这位老大哥吧,NioEventLoop较为复杂,需要耐心的思考和阅读,首先我们来回顾下前边讲过的相关内容,只有将这些串联起来后才便于我们理解,还是先看下这张类图,左侧为NioEventLoop相关类:
NioEventLoop集成了很多类的功能,其实NioEventLoop在我看来主要还是对整个执行流程的把控,细节上对任务的执行和功能实现都交给了父类去执行,比如执行任务就是在SingleThreadEventExecutor
中实现的,NioEventLoop主要是用来对时机的把控,何时执行任务,以及什么情况下需要对Selector进行重建,包括对事件处理时间的控制等,说了这么多大家可能一脸懵,别急,等看到下边的分析你就懂了。
主要功能概览
NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务。
- I/O任务: 指的是accept、connect、read、write等
- 非I/O任务: 添加到taskQueue中的任务,如register0,bind0等任务
每一个功能点的实现都比较复杂,接下来我们逐一击破,Go Go Go~~~~
从何而来
首先在讲解NioEventLoop之前,我们先要知道这哥们儿是咋来的对不对,通过查看构造方法的调用处可以看到只有一个引用处,就是NioEventLoopGroup#newChild
,哎,这NioEventLoopGroup
不是我们启动Netty服务时用的吗,NioEventLoopGroup
我们已经在前边的文章讲解过了,主要就是用来对Executor(NioEventLoop)
进行封装,不了解的大家可以先去了解下,EventLoopGroup到底是个啥?,重点关注下MultithreadEventExecutorGroup
和NioEventLoopGroup
。
这里我把代码也贴过来,很简单,就不过多描述了:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
初始化解析
成员变量
* 是否禁用SelectionKey的优化,默认为 false
*/
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
/**
* 少于N值不开启空轮询重建新的Selector对象的功能
*/
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
/**
* NIO Selector空轮询N次后,重建新的Selector对象,默认值为512,如果设置小于3则表示不开启重建 Selector,在{@link #unexpectedSelectorWakeup(int)}处理意外的唤醒时使用
*/
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
/**
* 与{@link SelectStrategy}配合使用
*/
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
/**
* The NIO {@link Selector},经过Netty包装优化过的
*/
private Selector selector;
private Selector unwrappedSelector;
/**
* 注册的SelectionKey集合,Netty自己实现的,对SelectionKey进行了包装优化
*/
private SelectedSelectionKeySet selectedKeys;
/**
* 用于创建Selector
*/
private final SelectorProvider provider;
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
//下次唤醒的时间,默认为-1,表示已经唤醒,主要是用来执行定时任务时使用的
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
/**
* Select策略,执行任务时使用
*/
private final SelectStrategy selectStrategy;
/**
* 处理Channel的就绪事件,占处理任务的总时间比例
* 在NioEventLoop中,有三种类型的任务
* 1:Channel的就绪IO事件
* 2:普通任务
* 3:定时任务
* ioRatio表示处理Channel的就绪IO事件占处理总时间的比例
*/
private volatile int ioRatio = 50;
/**
* 取消SelectionKey的数量
*/
private int cancelledKeys;
/**
* 是否需要再次select Selector对象
*/
private boolean needsToSelectAgain;
构造方法
/**
* @param parent 所属的EventLoopGroup
* @param executor 默认为 {@link ThreadPerTaskExecutor} ,每个任务启动一个线程执行,在{@link MultithreadEventExecutorGroup}初始化时设置
* @param selectorProvider SelectorProvider.provider(),{@link Selector#open()}中就是使用SelectorProvider.provider().openSelect来创建Selector
* {@link Selector},在{@link NioEventLoop}中设置
* @param strategy Select的策略,下边会进行讲解, 默认为{@link DefaultSelectStrategyFactory}
* @param rejectedExecutionHandler 拒绝策略,默认为抛出{@link RejectedExecutionException}
* @param taskQueueFactory 生成普通任务队列的Factory,默认为空
* @param tailTaskQueueFactory 生成尾任务队列的Factory ,默认为空
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
//创建Selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
//获取未包装的Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
优化点:Selector优化
在构造方法中,我们看到创建Selector返回的是SelectorTuple
,此类是一个包装类,对Selector进行了包装,JDK NIO的SelectionKeySet是用HashSet存储的,HashSet底层使用的HashMap,put的时间复杂度为O(logn),Netty使用数组对存储方式进行了改变,数组add操作的时间复杂度降为O(1),可以看到Netty的优化是非常细节的,并且通过自定义的SelectedSelectionKeySetSelector
对其进行了组合封装,具体是如何优化的看下源码便知,主要是使用的反射技术进行替换。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
/**
* 判断是否关闭SelectionKeySet优化,默认是false,不关闭
*/
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
/**
* 创建class sun.nio.ch.SelectorImpl字节码
*/
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
//如果获取SelectorImpl字节码失败,则返回一个SelectorTuple未包装的原生selector
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
/**
* Netty对SelectionKeySet的优化,SelectedSelectionKeySet基于数组实现
*/
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
/**
* 通过反射替换Selector的成员变量selectedKeys(Set)为Netty优化后的数组实现{@link SelectedSelectionKeySet}
* 使用数组实现,add操作的时间复杂度降为O(1),而Set底层使用的HashMap,put的时间复杂度为O(logn),这也是Netty的一个优化点
*/
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//对Java9的适配
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
//设置两个字段可访问
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
/**
* 将Selector的SelectionKeySet替换为Netty自己的
*/
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
SelectedSelectionKeySetSelector
/**
* 经过优化的Selector,对原生Selector进行了组合
*/
final class SelectedSelectionKeySetSelector extends Selector {
/**
* 自定义的SelectionKeySet实现,我们需要知道此处的selectionKeys与{@link #delegate#selectionKeys}是同一个
*/
private final SelectedSelectionKeySet selectionKeys;
/**
* 组合的原生的Selector{@link NioEventLoop#unwrappedSelector}
*/
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
..........省略部分代码...........
/**
* 在{@link NioEventLoop#run()}会调用
*/
@Override
public int select() throws IOException {
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
SelectedSelectionKeySet
使用数组实现,add操作的时间复杂度降为O(1),而Set底层使用的HashMap,put的时间复杂度为O(logn)
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
run方法,执行任务
run方法为NioEventLoop中最重要的方法,那NioEventLoop中执行的任务是哪些任务呢?通过IDEA可以看到调用处为SingleThreadEventExecutor#doStartThread
,[SingleThreadEventExecutor]
在上篇文章中我们详细将结果,这里不再过多阐述,通过Debug可以看到doStartThread会在#execute
中调用,#execute
方法又是添加异步任务的入口,那#run
方法大概率就是执行这些添加的任务喽,到底是不是呢,我们跟踪下源码便知。
protected void run() {
//Select计数
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//计算Select的策略 <1>
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
//当没有普通任务时,返回定时任务最近一次要执行的时间,如果有没有定时任务则返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
//如果没有定时任务,则将最近执行时间设置为Integer的最大值
curDeadlineNanos = NONE; // nothing on the calendar
}
//设置下一次的唤醒时间
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//select看是否有新增的感兴趣的事件
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
//延迟设置线程的唤醒时间阻塞不必要的Select唤醒
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
//重建Selector
rebuildSelector0();
//重置计数
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
//如果有新增的感兴趣的事件,则处理
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//所有的时间都用来处理IO事件,包括普通任务和定时任务,不限制时间
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//记录当前时间
final long ioStartTime = System.nanoTime();
try {
//处理Channel的就绪事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//计算用来处理IO事件的时间,包括普通任务和定时任务,限制时间
//以处理Channel的就绪事件所需时间为基准计算执行所有任务需要的时间
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
//如果有任务执行过了或者有任务待执行,则重置select计数
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//有新增的事件,或者任务执行过,则将空轮询次数置0
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
//针对意外唤醒,重置计数
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
//如果EventLoop状态是正在关闭、已关闭、已终止,则执行关闭逻辑,关闭Channel和Selector的绑定,关闭Channel
closeAll();
//确认是否可以关闭了
if (confirmShutdown()) {
//退出NioEventLoop线程循环
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
整个run
方法的执行流程如下:判断是否有任务需要执行(taskQueue + 定时任务)或者有感兴趣的事件
- 有新增的感兴趣的事件则先处理事件
- 有任务需要执行则先执行任务
- 判断是否要shutDown
以上三个步骤会循环执行。
这里插入一个点,在run方法中,我们看到NioEventLoop调用JDK NIO底层的select方法查看是否有感兴趣的事件,在服务端刚刚启动时,感兴趣的事件肯定是客户端的连接(ACCEPT)时间,那这个感兴趣的事件是如何设置的呢?大家是否还记得在服务端创建源码分析中,在Channel注册后最终会调用AbstractNioChannel#doBeginRead
到方法,此处就会将感兴趣的事件设置为OP_ACCEPT。
SelectStrategy是Select的策略接口,其中:
/**
* Indicates a blocking select should follow.
* 表示使用阻塞Select的策略
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
* 表示需要进行重试的策略,目前没有使用
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
* 目前没有使用
*
*/
int BUSY_WAIT = -3;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
默认使用的是DefaultSelectStrategy
,大家看注释就好,
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
//如果有任务,则返回Channel新增的感兴趣的IO事件数量
//如果没有任务,则返回阻塞Select的策略
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
重点内容
我们继续看run
方法,主要的逻辑我都已经打好了对应的注释,大家看注释就好,我们重点说下这几个内容:
- IO任务处理时间比例控制:ioRatio
- 处理新增感兴趣事件
- 执行任务队列任务
- 重建Selector解决JDK空轮询的bug:
rebuildSelector0
- IO任务处理事件比例:ioRatio
- shutDown优雅关闭处理
IO任务处理时间任务比例控制
if (ioRatio == 100) {
try {
if (strategy > 0) {
//如果有新增的感兴趣的事件,则处理
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//所有的时间都用来处理IO事件,包括普通任务和定时任务,不限制时间
ranTasks = runAllTasks();
}
} else if (strategy > 0) {//如果有新增的感兴趣的事件
//记录当前时间
final long ioStartTime = System.nanoTime();
try {
//处理Channel的就绪事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//计算用来处理IO事件的时间,包括普通任务和定时任务,限制时间
//以处理Channel的就绪事件所花时间为基准计算执行所有任务需要的时间
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//如果没有新增的感兴趣的事件,则执行所有的任务
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
-
ioRatio等于100时,则不会对执行任务限制时间,如果有新增的感兴趣的时间,则全力处理感兴趣的事件,如果有待执行的任务,则全力执行任务
-
ioRatio小于100时
- 当有新增的感兴趣的事件,则先处理感兴趣的事件,处理完事件后,通过处理事件所花的时间计算执行所有的任务最大的时间
- 当没有新增的感兴趣的事件,则执行所有的任务,这里参为0,表示执行最少的任务
处理新增感兴趣事件
private void processSelectedKeys() {
//判断是否使用的优化过的SelectionKey
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
processSelectedKeysOptimized
处理优化过的SelectionKeySet
ate void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
//这里取出来了附加信息,并且判断附加信息是否为AbstractNioChannel,为什么会有这种可能呢?
// 我们在解服务端创建的源码分析文章中分析注册的流程时,AbstractNioChannel#doRegister在将Channel注册到Selector上时,将自己作为附加信息传了进去
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
//因此这里是true,至于NioTask用的不多我们就不进行分析了
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
<2>
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
<2>处判断是否需要再次Select,默认为false,通过IDEA工具查看到needsToSelectAgain在#cancel方法中会设置为true
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
#cancel
方法会在AbstractNioChannel#doDeregister
中被调用,doDeregister和doRegister相反,那就是将Channel和EventLoop解绑,这里每解绑一个cancelledKeys
就会自增1,当cancelledKeys
大于等于设置的阈值256时则将needsToSelectAgain
设置为true,当needsToSelectAgain=true
时会执行清理操作,将SelectionKeySet清空,然后再调用selectAgain
重新select一遍,将剩余的SelectionKey再填到SelectionKeySet中,这里主要是为了解决当Channel断开后,而在服务端的SelectionKey占用还在导致的内存不能回收问题。
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
processSelectedKey
针对不同的事件做不同的处理,事件就是JDK NIO的那些事件,SelectionKey#OP_ACCEPT、SelectionKey#OP_CONNECT、SelectionKey#_OP_READ、_SelectionKey#OP_WRITE
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//这里的服务端的Unsafe类是class io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe,我们也在服务端创建源码分析中讲解过
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
//当前事件的操作类型,这里客户端第一次建立连接时为OP_ACCEPT
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//OP_CONNECT 连接成功事件
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//触发完成连接操作,这里会触发连接成功事件,Handler将会接收到事件通知进行处理
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// OP_WRITE 事件就绪
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
//向Channel写入数据,,这里会触发连接成功事件,Handler将会接收到事件通知进行处理
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//SelectionKey.OP_READ 或者 SelectionKey.OP_ACCEPT事件,readyOps=0是用来处理JDK Selector的空轮bug
//这里会触发连接成功事件,Handler将会接收到事件通知进行处理
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
processSelectedKeysPlain
处理未使用优化过的SelectionKeySet,最终也会调用#processSelectedKey
,不再过多分析
执行任务队列任务runAllTasks
runAllTasks
的内容我在上篇文章中已经讲解过,大家跳过去看下哈
重建Selector解决JDK 的空轮询bug
unexpectedSelectorWakeup
主要工作是判断是否达到了重建Selector的标准
/**
* 针对意外的唤醒,JDK的空轮训BUG,没有事件发生也会立即返回,此方法主要是为了解决这个BUG
* 如果已经达到了重建Selector的阈值,则会进行重建Selector,返回true,将select计数重置
*/
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
//如果线程被打断
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
//判断是否达到了重建Selector的阈值
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
// Selector连续多次提前返回
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
rebuildSelector
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
* 重建Selector对象来解决JDK epoll的100% CPU的bug,其实Netty并没有解决JDK NIO这个问题,只是进行了规避
*/
public void rebuildSelector() {
if (!inEventLoop()) {
//执行一个事件
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
rebuildSelector0
核心思想就是创建一个新的Selector,将原来注册的Channel全部都注册到新的Selector上
/**
* 创建一个新的Selector,将之前注册到老的selector上的Channel重新转移到新的Selector上,并将老的Selector关闭
*/
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//创建新的Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
//遍历所有的SelectionKey,将Channel重新注册到新的Selector上
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
//取消老Key
key.cancel();
//将Channel注册到新的Selector
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
// 修改Channel的SelectionKey为新的SelectionKy
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//修改NioEventLoop的Selector为新的Selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
}
shutdown优雅关闭处理
NioEventLoop在启动后会不断的判断是否达到了关闭条件state >= _ST_SHUTTING_DOWN_
,达到条件则先关闭所有的Channel连接,随后会判断是不是满足条件(#confirmShutdown
,**在上篇文章中我们已经对其讲解过,主要是配合优雅关闭的安静期和最大等待时间进行计算)**退出。
if (isShuttingDown()) {
//如果EventLoop状态是正在关闭、已关闭、已终止,则执行关闭逻辑,关闭Channel和Selector的绑定,关闭Channel
closeAll();
//确认是否可以关闭了
if (confirmShutdown()) {
//退出NioEventLoop线程循环
return;
}
}
总结
今天我们对NioEventLoop
做了详细的介绍,从功能概览到初始化,再到任务执行,篇幅较长,如果大家耐心的读完了肯定会有所收获的。
我是壹氿,感谢各位小伙伴点赞、收藏和评论,文章持续更新,我们下期再见!