深入剖析Netty之NioEventLoop寻根究底

 2023-01-12
原文作者:壹氿 原文地址:https://juejin.cn/post/7054037340968189982

Github地址:github.com/lhj502819/n…,示例代码在example模块中

系列文章

在上篇文章中我们对NioEventLoop的父类进行了详细分析,今天我们就来拆解拆解这位老大哥吧,NioEventLoop较为复杂,需要耐心的思考和阅读,首先我们来回顾下前边讲过的相关内容,只有将这些串联起来后才便于我们理解,还是先看下这张类图,左侧为NioEventLoop相关类:

202212302155345761.png
NioEventLoop集成了很多类的功能,其实NioEventLoop在我看来主要还是对整个执行流程的把控,细节上对任务的执行和功能实现都交给了父类去执行,比如执行任务就是在SingleThreadEventExecutor中实现的,NioEventLoop主要是用来对时机的把控,何时执行任务,以及什么情况下需要对Selector进行重建,包括对事件处理时间的控制等,说了这么多大家可能一脸懵,别急,等看到下边的分析你就懂了。

主要功能概览

202212302155355292.png
NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务。

  • I/O任务: 指的是accept、connect、read、write等
  • 非I/O任务: 添加到taskQueue中的任务,如register0,bind0等任务

每一个功能点的实现都比较复杂,接下来我们逐一击破,Go Go Go~~~~

从何而来

首先在讲解NioEventLoop之前,我们先要知道这哥们儿是咋来的对不对,通过查看构造方法的调用处可以看到只有一个引用处,就是NioEventLoopGroup#newChild,哎,这NioEventLoopGroup不是我们启动Netty服务时用的吗,NioEventLoopGroup我们已经在前边的文章讲解过了,主要就是用来对Executor(NioEventLoop)进行封装,不了解的大家可以先去了解下,EventLoopGroup到底是个啥?,重点关注下MultithreadEventExecutorGroupNioEventLoopGroup

这里我把代码也贴过来,很简单,就不过多描述了:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        SelectorProvider selectorProvider = (SelectorProvider) args[0];
        SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = null;
        EventLoopTaskQueueFactory tailTaskQueueFactory = null;
    
        int argsLength = args.length;
        if (argsLength > 3) {
            taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        }
        if (argsLength > 4) {
            tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
        }
        return new NioEventLoop(this, executor, selectorProvider,
                selectStrategyFactory.newSelectStrategy(),
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

初始化解析

成员变量

     * 是否禁用SelectionKey的优化,默认为 false
     */
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    
    /**
     * 少于N值不开启空轮询重建新的Selector对象的功能
     */
    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
    /**
     * NIO Selector空轮询N次后,重建新的Selector对象,默认值为512,如果设置小于3则表示不开启重建 Selector,在{@link #unexpectedSelectorWakeup(int)}处理意外的唤醒时使用
     */
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    /**
     * 与{@link SelectStrategy}配合使用
     */
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    
    /**
     * The NIO {@link Selector},经过Netty包装优化过的
     */
    private Selector selector;
    private Selector unwrappedSelector;
    /**
     * 注册的SelectionKey集合,Netty自己实现的,对SelectionKey进行了包装优化
     */
    private SelectedSelectionKeySet selectedKeys;
    
    /**
     * 用于创建Selector
     */
    private final SelectorProvider provider;
    
    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;
    
    // nextWakeupNanos is:
    //    AWAKE            when EL is awake
    //    NONE             when EL is waiting with no wakeup scheduled
    //    other value T    when EL is waiting with wakeup scheduled at time T
    //下次唤醒的时间,默认为-1,表示已经唤醒,主要是用来执行定时任务时使用的
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
    /**
     * Select策略,执行任务时使用
     */
    private final SelectStrategy selectStrategy;
    
    /**
     * 处理Channel的就绪事件,占处理任务的总时间比例
     *  在NioEventLoop中,有三种类型的任务
     *      1:Channel的就绪IO事件
     *      2:普通任务
     *      3:定时任务
     *    ioRatio表示处理Channel的就绪IO事件占处理总时间的比例
     */
    private volatile int ioRatio = 50;
    
    /**
     * 取消SelectionKey的数量
     */
    private int cancelledKeys;
    /**
     * 是否需要再次select Selector对象
     */
    private boolean needsToSelectAgain;

构造方法

    /**
     * @param parent 所属的EventLoopGroup
     * @param executor 默认为 {@link ThreadPerTaskExecutor} ,每个任务启动一个线程执行,在{@link MultithreadEventExecutorGroup}初始化时设置
     * @param selectorProvider SelectorProvider.provider(),{@link Selector#open()}中就是使用SelectorProvider.provider().openSelect来创建Selector
     *                          {@link Selector},在{@link NioEventLoop}中设置
     * @param strategy Select的策略,下边会进行讲解, 默认为{@link DefaultSelectStrategyFactory}
     * @param rejectedExecutionHandler  拒绝策略,默认为抛出{@link RejectedExecutionException}
     * @param taskQueueFactory 生成普通任务队列的Factory,默认为空
     * @param tailTaskQueueFactory  生成尾任务队列的Factory ,默认为空
     */
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        //创建Selector
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        //获取未包装的Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

优化点:Selector优化

在构造方法中,我们看到创建Selector返回的是SelectorTuple,此类是一个包装类,对Selector进行了包装,JDK NIO的SelectionKeySet是用HashSet存储的,HashSet底层使用的HashMap,put的时间复杂度为O(logn),Netty使用数组对存储方式进行了改变,数组add操作的时间复杂度降为O(1),可以看到Netty的优化是非常细节的,并且通过自定义的SelectedSelectionKeySetSelector对其进行了组合封装,具体是如何优化的看下源码便知,主要是使用的反射技术进行替换。

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        /**
         * 判断是否关闭SelectionKeySet优化,默认是false,不关闭
         */
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
    
        /**
         * 创建class sun.nio.ch.SelectorImpl字节码
         */
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
    
        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            //如果获取SelectorImpl字节码失败,则返回一个SelectorTuple未包装的原生selector
            return new SelectorTuple(unwrappedSelector);
        }
    
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        /**
         * Netty对SelectionKeySet的优化,SelectedSelectionKeySet基于数组实现
         */
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    /**
                     * 通过反射替换Selector的成员变量selectedKeys(Set)为Netty优化后的数组实现{@link SelectedSelectionKeySet}
                     * 使用数组实现,add操作的时间复杂度降为O(1),而Set底层使用的HashMap,put的时间复杂度为O(logn),这也是Netty的一个优化点
                     */
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    //对Java9的适配
                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);
    
                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }
    
    
                    //设置两个字段可访问
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
    
                    /**
                     * 将Selector的SelectionKeySet替换为Netty自己的
                     */
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });
    
        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

SelectedSelectionKeySetSelector

    /**
     * 经过优化的Selector,对原生Selector进行了组合
     */
    final class SelectedSelectionKeySetSelector extends Selector {
        /**
         * 自定义的SelectionKeySet实现,我们需要知道此处的selectionKeys与{@link #delegate#selectionKeys}是同一个
         */
        private final SelectedSelectionKeySet selectionKeys;
        /**
         * 组合的原生的Selector{@link NioEventLoop#unwrappedSelector}
         */
        private final Selector delegate;
        
        SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
            this.delegate = delegate;
            this.selectionKeys = selectionKeys;
        }
    
       ..........省略部分代码...........
    
        /**
         * 在{@link NioEventLoop#run()}会调用
         */
        @Override
        public int select() throws IOException {
            selectionKeys.reset();
            return delegate.select();
        }
    
        @Override
        public Selector wakeup() {
            return delegate.wakeup();
        }
    
        @Override
        public void close() throws IOException {
            delegate.close();
        }
    }

SelectedSelectionKeySet

使用数组实现,add操作的时间复杂度降为O(1),而Set底层使用的HashMap,put的时间复杂度为O(logn)

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
        @Override
        public Iterator<SelectionKey> iterator() {
            return new Iterator<SelectionKey>() {
                private int idx;
    
                @Override
                public boolean hasNext() {
                    return idx < size;
                }
    
                @Override
                public SelectionKey next() {
                    if (!hasNext()) {
                        throw new NoSuchElementException();
                    }
                    return keys[idx++];
                }
    
                @Override
                public void remove() {
                    throw new UnsupportedOperationException();
                }
            };
        }
    
        private void increaseCapacity() {
            SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
            System.arraycopy(keys, 0, newKeys, 0, size);
            keys = newKeys;
        }
    }

run方法,执行任务

run方法为NioEventLoop中最重要的方法,那NioEventLoop中执行的任务是哪些任务呢?通过IDEA可以看到调用处为SingleThreadEventExecutor#doStartThread[SingleThreadEventExecutor]上篇文章中我们详细将结果,这里不再过多阐述,通过Debug可以看到doStartThread会在#execute中调用,#execute方法又是添加异步任务的入口,那#run方法大概率就是执行这些添加的任务喽,到底是不是呢,我们跟踪下源码便知。

    protected void run() {
        //Select计数
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    //计算Select的策略 <1>
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT:
                        //当没有普通任务时,返回定时任务最近一次要执行的时间,如果有没有定时任务则返回-1
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            //如果没有定时任务,则将最近执行时间设置为Integer的最大值
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        //设置下一次的唤醒时间
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                //select看是否有新增的感兴趣的事件
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            //延迟设置线程的唤醒时间阻塞不必要的Select唤醒
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    //重建Selector
                    rebuildSelector0();
                    //重置计数
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
    
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            //如果有新增的感兴趣的事件,则处理
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        //所有的时间都用来处理IO事件,包括普通任务和定时任务,不限制时间
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    //记录当前时间
                    final long ioStartTime = System.nanoTime();
                    try {
                        //处理Channel的就绪事件
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        //计算用来处理IO事件的时间,包括普通任务和定时任务,限制时间
                        //以处理Channel的就绪事件所需时间为基准计算执行所有任务需要的时间
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                //如果有任务执行过了或者有任务待执行,则重置select计数
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    //有新增的事件,或者任务执行过,则将空轮询次数置0
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    //针对意外唤醒,重置计数
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        //如果EventLoop状态是正在关闭、已关闭、已终止,则执行关闭逻辑,关闭Channel和Selector的绑定,关闭Channel
                        closeAll();
                        //确认是否可以关闭了
                        if (confirmShutdown()) {
                            //退出NioEventLoop线程循环
                            return;
                        }
                    }
                } catch (Error e) {
                    throw e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

整个run方法的执行流程如下:判断是否有任务需要执行(taskQueue + 定时任务)或者有感兴趣的事件

  1. 有新增的感兴趣的事件则先处理事件
  2. 有任务需要执行则先执行任务
  3. 判断是否要shutDown

以上三个步骤会循环执行。

这里插入一个点,在run方法中,我们看到NioEventLoop调用JDK NIO底层的select方法查看是否有感兴趣的事件,在服务端刚刚启动时,感兴趣的事件肯定是客户端的连接(ACCEPT)时间,那这个感兴趣的事件是如何设置的呢?大家是否还记得在服务端创建源码分析中,在Channel注册后最终会调用AbstractNioChannel#doBeginRead到方法,此处就会将感兴趣的事件设置为OP_ACCEPT。

SelectStrategy是Select的策略接口,其中:

     /**
      * Indicates a blocking select should follow.
      * 表示使用阻塞Select的策略
      */
     int SELECT = -1;
     /**
      * Indicates the IO loop should be retried, no blocking select to follow directly.
      * 表示需要进行重试的策略,目前没有使用
      */
     int CONTINUE = -2;
     /**
      * Indicates the IO loop to poll for new events without blocking.
      * 目前没有使用
      *
      */
     int BUSY_WAIT = -3;
    
     /**
      * The {@link SelectStrategy} can be used to steer the outcome of a potential select
      * call.
      *
      * @param selectSupplier The supplier with the result of a select result.
      * @param hasTasks true if tasks are waiting to be processed.
      * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
      *         the next step should be to not select but rather jump back to the IO loop and try
      *         again. Any value >= 0 is treated as an indicator that work needs to be done.
      */
     int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;

默认使用的是DefaultSelectStrategy,大家看注释就好,

    final class DefaultSelectStrategy implements SelectStrategy {
        static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    
        private DefaultSelectStrategy() { }
    
        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            //如果有任务,则返回Channel新增的感兴趣的IO事件数量
            //如果没有任务,则返回阻塞Select的策略
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    }

重点内容

我们继续看run方法,主要的逻辑我都已经打好了对应的注释,大家看注释就好,我们重点说下这几个内容:

  • IO任务处理时间比例控制:ioRatio
  • 处理新增感兴趣事件
  • 执行任务队列任务
  • 重建Selector解决JDK空轮询的bug:rebuildSelector0
  • IO任务处理事件比例:ioRatio
  • shutDown优雅关闭处理

IO任务处理时间任务比例控制

    if (ioRatio == 100) {
        try {
            if (strategy > 0) {
                //如果有新增的感兴趣的事件,则处理
                processSelectedKeys();
            }
        } finally {
            // Ensure we always run tasks.
            //所有的时间都用来处理IO事件,包括普通任务和定时任务,不限制时间
            ranTasks = runAllTasks();
        }
    } else if (strategy > 0) {//如果有新增的感兴趣的事件
        //记录当前时间
        final long ioStartTime = System.nanoTime();
        try {
            //处理Channel的就绪事件
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            //计算用来处理IO事件的时间,包括普通任务和定时任务,限制时间
            //以处理Channel的就绪事件所花时间为基准计算执行所有任务需要的时间
            final long ioTime = System.nanoTime() - ioStartTime;
            ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    } else {
        //如果没有新增的感兴趣的事件,则执行所有的任务
        ranTasks = runAllTasks(0); // This will run the minimum number of tasks
    }
  • ioRatio等于100时,则不会对执行任务限制时间,如果有新增的感兴趣的时间,则全力处理感兴趣的事件,如果有待执行的任务,则全力执行任务

  • ioRatio小于100时

    • 当有新增的感兴趣的事件,则先处理感兴趣的事件,处理完事件后,通过处理事件所花的时间计算执行所有的任务最大的时间
    • 当没有新增的感兴趣的事件,则执行所有的任务,这里参为0,表示执行最少的任务

处理新增感兴趣事件

    private void processSelectedKeys() {
        //判断是否使用的优化过的SelectionKey
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

processSelectedKeysOptimized

处理优化过的SelectionKeySet

    ate void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;
    
            final Object a = k.attachment();
            //这里取出来了附加信息,并且判断附加信息是否为AbstractNioChannel,为什么会有这种可能呢?
            // 我们在解服务端创建的源码分析文章中分析注册的流程时,AbstractNioChannel#doRegister在将Channel注册到Selector上时,将自己作为附加信息传了进去
            //  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            //因此这里是true,至于NioTask用的不多我们就不进行分析了
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    		<2>
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
    
                selectAgain();
                i = -1;
            }
        }
    }

<2>处判断是否需要再次Select,默认为false,通过IDEA工具查看到needsToSelectAgain在#cancel方法中会设置为true

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

#cancel方法会在AbstractNioChannel#doDeregister中被调用,doDeregister和doRegister相反,那就是将Channel和EventLoop解绑,这里每解绑一个cancelledKeys就会自增1,当cancelledKeys大于等于设置的阈值256时则将needsToSelectAgain设置为true,当needsToSelectAgain=true时会执行清理操作,将SelectionKeySet清空,然后再调用selectAgain重新select一遍,将剩余的SelectionKey再填到SelectionKeySet中,这里主要是为了解决当Channel断开后,而在服务端的SelectionKey占用还在导致的内存不能回收问题。

    if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);
    
        selectAgain();
        i = -1;
    }

processSelectedKey

针对不同的事件做不同的处理,事件就是JDK NIO的那些事件,SelectionKey#OP_ACCEPT、SelectionKey#OP_CONNECT、SelectionKey#_OP_READ、_SelectionKey#OP_WRITE

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //这里的服务端的Unsafe类是class io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe,我们也在服务端创建源码分析中讲解过
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
    
        try {
            //当前事件的操作类型,这里客户端第一次建立连接时为OP_ACCEPT
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                //OP_CONNECT 连接成功事件
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //触发完成连接操作,这里会触发连接成功事件,Handler将会接收到事件通知进行处理
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // OP_WRITE 事件就绪
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                //向Channel写入数据,,这里会触发连接成功事件,Handler将会接收到事件通知进行处理
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //SelectionKey.OP_READ 或者 SelectionKey.OP_ACCEPT事件,readyOps=0是用来处理JDK Selector的空轮bug
                //这里会触发连接成功事件,Handler将会接收到事件通知进行处理
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

processSelectedKeysPlain

处理未使用优化过的SelectionKeySet,最终也会调用#processSelectedKey,不再过多分析

执行任务队列任务runAllTasks

runAllTasks的内容我在上篇文章中已经讲解过,大家跳过去看下哈

重建Selector解决JDK 的空轮询bug

unexpectedSelectorWakeup

主要工作是判断是否达到了重建Selector的标准

    /**
     * 针对意外的唤醒,JDK的空轮训BUG,没有事件发生也会立即返回,此方法主要是为了解决这个BUG
     * 如果已经达到了重建Selector的阈值,则会进行重建Selector,返回true,将select计数重置
     */
    private boolean unexpectedSelectorWakeup(int selectCnt) {
        if (Thread.interrupted()) {
            //如果线程被打断
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            return true;
        }
        //判断是否达到了重建Selector的阈值
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            // Selector连续多次提前返回
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            return true;
        }
        return false;
    }

rebuildSelector

    /**
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     * 重建Selector对象来解决JDK epoll的100% CPU的bug,其实Netty并没有解决JDK NIO这个问题,只是进行了规避
     */
    public void rebuildSelector() {
        if (!inEventLoop()) {
            //执行一个事件
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        rebuildSelector0();
    }

rebuildSelector0

核心思想就是创建一个新的Selector,将原来注册的Channel全部都注册到新的Selector上

    /**
     * 创建一个新的Selector,将之前注册到老的selector上的Channel重新转移到新的Selector上,并将老的Selector关闭
     */
    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
    
        if (oldSelector == null) {
            return;
        }
    
        try {
            //创建新的Selector
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
    
        // Register all channels to the new Selector.
        int nChannels = 0;
        //遍历所有的SelectionKey,将Channel重新注册到新的Selector上
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }
    
                int interestOps = key.interestOps();
                //取消老Key
                key.cancel();
                //将Channel注册到新的Selector
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    // 修改Channel的SelectionKey为新的SelectionKy
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }
        //修改NioEventLoop的Selector为新的Selector
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
    }

shutdown优雅关闭处理

NioEventLoop在启动后会不断的判断是否达到了关闭条件state >= _ST_SHUTTING_DOWN_,达到条件则先关闭所有的Channel连接,随后会判断是不是满足条件(#confirmShutdown,**在上篇文章中我们已经对其讲解过,主要是配合优雅关闭的安静期和最大等待时间进行计算)**退出。

    if (isShuttingDown()) {
        //如果EventLoop状态是正在关闭、已关闭、已终止,则执行关闭逻辑,关闭Channel和Selector的绑定,关闭Channel
        closeAll();
        //确认是否可以关闭了
        if (confirmShutdown()) {
            //退出NioEventLoop线程循环
            return;
        }
    }

总结

今天我们对NioEventLoop做了详细的介绍,从功能概览到初始化,再到任务执行,篇幅较长,如果大家耐心的读完了肯定会有所收获的。

我是壹氿,感谢各位小伙伴点赞、收藏和评论,文章持续更新,我们下期再见!