Netty流程中的任务执行细节
选择器的策略
在NioEventLoopGroup
构造函数中会传入一个默认选择策略工厂的实例DefaultSelectStrategyFactory.INSTANCE
:
其实内部主要还是newSelectStrategy()
,这里貌似用了懒加载的多线程安全的单例模式
:
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() { }
@Override
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
//获得策略的核心方法
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
有任务的策略
这里calculateStrategy
就是计算策略,首先是考虑是否有任务,如果有就执行相应的selectSupplier.get()
,这个是什么呢,其实就是事件循环NioEventLoop
里的selectNowSupplier
:
也就立即返回的,所以后面才可以执行任务runAllTasks
。
没任务的策略
如果没有任务,否则就是SelectStrategy.SELECT
,会执行NioEventLoop
的select(long deadlineNanos)
:
NioEventLoop的run
可以看到,如果没有截止时间就是阻塞的selector.select();
,否则可能是立即返回或者超时的。下图就是执行的流程,还是先看有没有事件,有的话会返回strategy
:
然后根据strategy
的值来判断是否需要处理事件,还是处理任务:
超时时间是根据ioRatio
io运行时间的占比算的,初始值是50
,也就是50%
,所以下面这段的意思就是先算出ioTime
也就是io消耗的时间,然后ioTime * (100 - ioRatio) / ioRatio
就是按照比例,运行任务的时间,比如50
,就是任务运行跟io
一样,如果是10
,那算出来刚好是9
倍的ioTime
,也就是io
占10%
,任务要90%
的时间,netty
默认是一半一半:
runAllTasks(long timeoutNanos)
这个就是超时的执行任务:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();//先获取要调度的任务,放入任务队列
Runnable task = pollTask();//获取任务队列的任务
if (task == null) {
afterRunningAllTasks();//处理任务后,去执行tailTasks的任务
return false;
}
//截至时间,已经花费调度任务的时间+超时时间 ScheduledFutureTask.nanoTime()表示从开始到现在执行任务持续的时间
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;//统计任务数
long lastExecutionTime;//持续执行的时间
for (;;) {
safeExecute(task);//执行任务
runTasks ++;//任务数+1
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {//到64个任务 0x3F=0x00111111 位与==0 肯定到64了,单线程,不会有线程安全问题,不过为什么不直接写==64呢?
lastExecutionTime = ScheduledFutureTask.nanoTime();//每执行64个任务统计下时间
if (lastExecutionTime >= deadline) {//如果调度任务的时间超过截止时间了,那就退出了,否则时间太长了
break;
}
}
task = pollTask();//继续获取任务
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;//保存持续执行任务的时间
return true;
}
fetchFromScheduledTaskQueue
这个就是从可调度的任务里获得任务,放入任务队列taskQueue
中:
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;//要调度的任务为空就返回
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();//获取已经执行任务消耗的时间
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);//获取可以执行的调度任务
if (scheduledTask == null) {
return true;//任务为空返回
}
if (!taskQueue.offer(scheduledTask)) {//如果无法放入任务队列,就放回去
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;//没有放成功
}
}
}
pollScheduledTask
可调度意味着延迟时间已经到了,才可以执行:
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {//没有任务或者延迟的时间还没到
return null;
}
scheduledTaskQueue.remove();//从队伍中删除
scheduledTask.setConsumed();//设置没延迟了
return scheduledTask;
}
pollTask
轮询获取任务,如果不是WAKEUP_TASK
就返回:
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
return task;
}
}
}
那什么是WAKEUP_TASK
呢,就是个空任务,也就是唤醒任务,什么都不做,所以当然不用执行啦,作为特殊的唤醒任务而已,用来唤醒线程,后面出现的时候会讲:
static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};
ScheduledFutureTask.nanoTime()
这个出现了好多次,到底是什么呢,其实就是从开始到现在执行任务的时间,也就是持续执行任务的时间:
START_TIME
是什么时候执行的呢,其实就是我前面讲过的在NioEventLoopGroup
创建的时候,其父类MultithreadEventExecutorGroup
初始化,成员变量赋值的时候private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
,然后又是GlobalEventExecutor
成员变量赋值:
final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new Runnable() {
@Override
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
刚好又调用到ScheduledFutureTask
的初始化静态常量赋值:
也就是说START_TIME
是NioEventLoopGroup
初始化的时间。
safeExecute
这个就是真正执行任务的方法,其实很简单,就是调用Runnable
的run
方法:
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
afterRunningAllTasks
这个是执行尾部的任务,不过貌似没地方用:
runAllTasks()
这个就会执行所有可以调度的任务和队列里的任务,然后返回是否执行过任务:
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;//是否至少执行了一个任务
do {
fetchedAll = fetchFromScheduledTaskQueue();//获取调度任务
if (runAllTasksFrom(taskQueue)) {//执行所有taskQueue任务
ranAtLeastOne = true;
}
} while (!fetchedAll); //直到所有的可执行的调度任务都放进任务队列,执行完为止
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();//如果有执行任务就更新持续执行的时间
}
afterRunningAllTasks();
return ranAtLeastOne;
}
runAllTasksFrom(Queue taskQueue)
这个比较好理解,就是执行任务队列,如果没有执行任务返回false
,有执行过就返回true
:
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;//没有任务返回false
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;//执行过任务返回true
}
}
}
Epoll的空轮询BUG解决方案
这个bug
貌似是说,虽然调用了阻塞的selector.select()
,但是由于操作系统底层发现socket
断开,还是会返回0
,然后底下又没能处理相应的事件,而且任务队列也为空的情况下,就会死循环下去,造成CPU100%
。netty
的解决方案就是用了一个变量selectCnt
统计轮询的次数。一旦空循环就会去执行unexpectedSelectorWakeup(selectCnt)
检测:
里面主要一段就是说如果空轮询次数大于一定阈值后,就会重新创建一个选择器,然后把老的事件啊,通道啊,都注册到新的上面去:
下面就是真正创建新选择器的方法:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();//重新创建一个electorTuple
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);//重新注册到新选择器的上面
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();//关闭旧的选择器
} catch (Throwable t) {
...
}
...
}
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。