2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104060633

Netty流程中的任务执行细节

选择器的策略

NioEventLoopGroup构造函数中会传入一个默认选择策略工厂的实例DefaultSelectStrategyFactory.INSTANCE

202309132157434831.png
其实内部主要还是newSelectStrategy(),这里貌似用了懒加载的多线程安全的单例模式

    public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
        public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
    
        private DefaultSelectStrategyFactory() { }
    
        @Override
        public SelectStrategy newSelectStrategy() {
            return DefaultSelectStrategy.INSTANCE;
        }
    }
    
    final class DefaultSelectStrategy implements SelectStrategy {
        static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    
        private DefaultSelectStrategy() { }
    //获得策略的核心方法
        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    }

有任务的策略

这里calculateStrategy就是计算策略,首先是考虑是否有任务,如果有就执行相应的selectSupplier.get(),这个是什么呢,其实就是事件循环NioEventLoop里的selectNowSupplier

202309132157444402.png

202309132157455673.png
也就立即返回的,所以后面才可以执行任务runAllTasks

没任务的策略

如果没有任务,否则就是SelectStrategy.SELECT,会执行NioEventLoopselect(long deadlineNanos)

202309132157464284.png

NioEventLoop的run

可以看到,如果没有截止时间就是阻塞的selector.select();,否则可能是立即返回或者超时的。下图就是执行的流程,还是先看有没有事件,有的话会返回strategy

202309132157474455.png
然后根据strategy的值来判断是否需要处理事件,还是处理任务:

202309132157487116.png
超时时间是根据ioRatioio运行时间的占比算的,初始值是50,也就是50%,所以下面这段的意思就是先算出ioTime也就是io消耗的时间,然后ioTime * (100 - ioRatio) / ioRatio就是按照比例,运行任务的时间,比如50,就是任务运行跟io一样,如果是10,那算出来刚好是9倍的ioTime,也就是io10%,任务要90%的时间,netty默认是一半一半:

202309132157503267.png

runAllTasks(long timeoutNanos)

这个就是超时的执行任务:

      protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();//先获取要调度的任务,放入任务队列
            Runnable task = pollTask();//获取任务队列的任务
            if (task == null) {
                afterRunningAllTasks();//处理任务后,去执行tailTasks的任务
                return false;
            }
    //截至时间,已经花费调度任务的时间+超时时间  ScheduledFutureTask.nanoTime()表示从开始到现在执行任务持续的时间
            final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
            long runTasks = 0;//统计任务数
            long lastExecutionTime;//持续执行的时间
            for (;;) {
                safeExecute(task);//执行任务
    
                runTasks ++;//任务数+1
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {//到64个任务 0x3F=0x00111111 位与==0 肯定到64了,单线程,不会有线程安全问题,不过为什么不直接写==64呢?
                    lastExecutionTime = ScheduledFutureTask.nanoTime();//每执行64个任务统计下时间
                    if (lastExecutionTime >= deadline) {//如果调度任务的时间超过截止时间了,那就退出了,否则时间太长了
                        break;
                    }
                }
    
                task = pollTask();//继续获取任务
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;//保存持续执行任务的时间
            return true;
        }

fetchFromScheduledTaskQueue

这个就是从可调度的任务里获得任务,放入任务队列taskQueue中:

      private boolean fetchFromScheduledTaskQueue() {
            if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
                return true;//要调度的任务为空就返回
            }
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();//获取已经执行任务消耗的时间
            for (;;) {
                Runnable scheduledTask = pollScheduledTask(nanoTime);//获取可以执行的调度任务
                if (scheduledTask == null) {
                    return true;//任务为空返回
                }
                if (!taskQueue.offer(scheduledTask)) {//如果无法放入任务队列,就放回去
                    // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                    scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                    return false;//没有放成功
                }
            }
        }
pollScheduledTask

可调度意味着延迟时间已经到了,才可以执行:

      protected final Runnable pollScheduledTask(long nanoTime) {
            assert inEventLoop();
    
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {//没有任务或者延迟的时间还没到
                return null;
            }
            scheduledTaskQueue.remove();//从队伍中删除
            scheduledTask.setConsumed();//设置没延迟了
            return scheduledTask;
        }

pollTask

轮询获取任务,如果不是WAKEUP_TASK就返回:

    protected Runnable pollTask() {
            assert inEventLoop();
            return pollTaskFrom(taskQueue);
        }
    
        protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
            for (;;) {
                Runnable task = taskQueue.poll();
                if (task != WAKEUP_TASK) {
                    return task;
                }
            }
        }

那什么是WAKEUP_TASK呢,就是个空任务,也就是唤醒任务,什么都不做,所以当然不用执行啦,作为特殊的唤醒任务而已,用来唤醒线程,后面出现的时候会讲:

       static final Runnable WAKEUP_TASK = new Runnable() {
           @Override
           public void run() { } // Do nothing
        };

ScheduledFutureTask.nanoTime()

这个出现了好多次,到底是什么呢,其实就是从开始到现在执行任务的时间,也就是持续执行任务的时间:

202309132157512788.png
START_TIME是什么时候执行的呢,其实就是我前面讲过的在NioEventLoopGroup创建的时候,其父类MultithreadEventExecutorGroup初始化,成员变量赋值的时候private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);,然后又是GlobalEventExecutor成员变量赋值:

    final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
                this, Executors.<Void>callable(new Runnable() {
            @Override
            public void run() {
                // NOOP
            }
        }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

刚好又调用到ScheduledFutureTask的初始化静态常量赋值:

202309132157519429.png
也就是说START_TIMENioEventLoopGroup初始化的时间。

safeExecute

这个就是真正执行任务的方法,其实很简单,就是调用Runnablerun方法:

        protected static void safeExecute(Runnable task) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception. Task: {}", task, t);
            }
        }

afterRunningAllTasks

这个是执行尾部的任务,不过貌似没地方用:

2023091321575278410.png

2023091321575355711.png

runAllTasks()

这个就会执行所有可以调度的任务和队列里的任务,然后返回是否执行过任务:

     protected boolean runAllTasks() {
            assert inEventLoop();
            boolean fetchedAll;
            boolean ranAtLeastOne = false;//是否至少执行了一个任务
    
            do {
                fetchedAll = fetchFromScheduledTaskQueue();//获取调度任务
                if (runAllTasksFrom(taskQueue)) {//执行所有taskQueue任务
                    ranAtLeastOne = true;
                }
            } while (!fetchedAll); //直到所有的可执行的调度任务都放进任务队列,执行完为止
    
            if (ranAtLeastOne) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();//如果有执行任务就更新持续执行的时间
            }
            afterRunningAllTasks();
            return ranAtLeastOne;
        }

runAllTasksFrom(Queue taskQueue)

这个比较好理解,就是执行任务队列,如果没有执行任务返回false,有执行过就返回true

        protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
            Runnable task = pollTaskFrom(taskQueue);
            if (task == null) {
                return false;//没有任务返回false
            }
            for (;;) {
                safeExecute(task);
                task = pollTaskFrom(taskQueue);
                if (task == null) {
                    return true;//执行过任务返回true
                }
            }
        }

Epoll的空轮询BUG解决方案

这个bug貌似是说,虽然调用了阻塞的selector.select(),但是由于操作系统底层发现socket断开,还是会返回0,然后底下又没能处理相应的事件,而且任务队列也为空的情况下,就会死循环下去,造成CPU100%netty的解决方案就是用了一个变量selectCnt统计轮询的次数。一旦空循环就会去执行unexpectedSelectorWakeup(selectCnt)检测:

2023091321575456912.png
里面主要一段就是说如果空轮询次数大于一定阈值后,就会重新创建一个选择器,然后把老的事件啊,通道啊,都注册到新的上面去:

2023091321575543913.png

2023091321575613614.png
下面就是真正创建新选择器的方法:

        private void rebuildSelector0() {
            final Selector oldSelector = selector;
            final SelectorTuple newSelectorTuple;
    
            if (oldSelector == null) {
                return;
            }
    
            try {
                newSelectorTuple = openSelector();//重新创建一个electorTuple
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
    
            // Register all channels to the new Selector.
            int nChannels = 0;
            for (SelectionKey key: oldSelector.keys()) {
                Object a = key.attachment();
                try {
                    if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                        continue;
                    }
    
                    int interestOps = key.interestOps();
                    key.cancel();
                    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);//重新注册到新选择器的上面
                    if (a instanceof AbstractNioChannel) {
                        // Update SelectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
    				...
                }
            }
    
            selector = newSelectorTuple.selector;
            unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
            try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();//关闭旧的选择器
            } catch (Throwable t) {
               ...
            }
    
      		 ...
        }

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文