2023-08-02  阅读(23)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

Netty服务端通过ServerBootstrap启动的时候,会创建两个EventLoopGroup,它们本质是两个 Reactor 线程池:一个用于与客户端建立TCP连接,另一个用于处理IO相关的读写操作。下图是ServerBootstrap启动时,EventLoopGroup的核心工作流程图:

202308022225062791.png

EventLoopGroup属于Netty最核心的接口之一,Netty默认提供了 NioEventLoopGroup、OioEventLoopGroup 等多种实现。而EventLoop则是EventLoopGroup内部的事件处理器,每个EventLoop内部都有一个线程、一个Java NIO Selector和一个taskQueue任务队列,负责处理客户端请求和内部任务等。

本章,我就对EventLoopGroup和EventLoop的核心原理进行分析讲解。这里补充一句,如果觉得干巴巴看源码难度比较大,完全可以通过调试的方式跟踪,比如本章,我们可以自己创建EventLoopGroup,注册Channel:

    public class Demo {
        public static void main(String[] args) throws InterruptedException {
            // 1.创建一个ServerSocketChannel
            ChannelFactory channelFactory = new ReflectiveChannelFactory(NioServerSocketChannel.class);
            Channel channel = channelFactory.newChannel();
            // 2.初始化Pipeline
            channel.pipeline().addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    System.out.println("Chanel:" +ch);
                }
            });
            // 3.创建EventLoopGroup
            EventLoopGroup mainGroup = new NioEventLoopGroup(2);
            // 4.注册Channel
            ChannelFuture future = mainGroup.register(channel);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("future:" + future);
                }
            });
            Thread.sleep(60 * 3600 * 1000);
        }
    }

一、继承体系

在正式分析EventLoopGroup和EventLoop之前,我们需要先从全局了解下Netty的任务调度框架体系:

202308022225114342.png

从上图可以看到,Netty的任务调度框架还是遵循了 java.util.concurrent 中的 Executor 调度体系:

  • io.netty.util.concurrent是Netty对J.U.C并发包功能的扩充,相当于Netty基于J.U.C实现了一套自己的任务调度基础体系;
  • io.netty.channel是Netty基于自己的任务调度框架,实现的一套Channel调度组件。

由于我们一般只使用Netty进行NIO网络编程,所以重点关注包io.netty.channel中的EventLoopGroup和EventLoop即可,它们是负责对Channel事件进行调度的处理器。

二、EventLoopGroup

前面说了,EventLoopGroup本质是一个线程池。既然是线程池,就可以调整内部的线程数量,默认情况下,EventLoopGroup中的线程数是CPU核数 * 2。

EventLoopGroup 内部管理着很多EventLoop对象,这些对象可以看成是EventLoopGroup内的线程,EventLoop是在EventLoopGroup对象构造时创建的。每一个EventLoop负责处理一系列的Channel,我们可以通过下图理解EventLoopGroup和EventLoop之间的关系:

202308022225150633.png

Netty的事件处理机制采用的是 无锁串行化的设计思路

- BossEventLoopGroupWorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是 线程独立 的,不同的 NioEventLoop 线程之间不会发生任何交集;
- NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是 线程安全 的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是 串行化 执行,不会发生线程上下文切换的问题。

无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。在使用 Netty 进行程序开发时,我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。

本节,我主要分析EventLoopGroup的实现类NioEventLoopGroup的源码:

202308022225179364.png

2.1 创建NioEventLoopGroup

我们先来看NioEventLoopGroup的构造,NioEventLoopGroup对象的创建流程可以用下面这张时序图表示:

202308022225195375.png

NioEventLoopGroup本身提供了很多构造器,我们重点关注下面这个即可:

    // NioEventLoopGroup.java
    
    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler,
                             final EventLoopTaskQueueFactory taskQueueFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
              rejectedExecutionHandler, taskQueueFactory);
    }

构造时可以指定:

  • nThreads:内部的NioEventLoop数量,如果不指定则为CPU核数的2倍;
  • Executor:J.U.C中的任务执行器,每一个EventLoop对象内部都会包含一个Executor,默认为Netty自定义的ThreadPerTaskExecutor,即为每个任务创建一个线程处理;
  • EventExecutorChooserFactory:用于创建Chooser对象的工厂类,Chooser可以看成一个负载均衡器,用于从NioEventLoopGroup中选择一个EventLoop,默认采用round-robin算法;
  • SelectorProvider:Java NIO提供的工具类,SelectorProvider使用了 JDK 的 SPI 机制来创建Selector、ServerSocketChannel、SocketChannel 等对象;
  • SelectStrategyFactory:用来创建SelectStrategy的工厂,SelectStrategy是Netty用来控制EventLoop轮询方式的策略,默认为DefaultSelectStrategy;
  • RejectedExecutionHandler:线程池的任务拒绝策略,默认是抛出RejectedExecutionException异常;
  • EventLoopTaskQueueFactory:任务队列工厂类,每一个EventLoop对象内部都会包含一个任务队列,这个工厂类就是用来创建队列的,默认会创建一个Netty自定义线程安全的MpscUnboundedArrayQueue无锁队列。

最终调用了父类MultiThreadEventExecutorGroup的构造方法,它内部维护了一个children数组,保存EventLoop对象:

    // MultiThreadEventExecutorGroup.java
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        checkPositive(nThreads, "nThreads");
        if (executor == null) {
            // 创建一个任务执行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 创建内部的EventLoop数组,nThreads默认为CPU核数*2
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创建EventLoop对象
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    //...
                }
            }
        }
        // Chooser本质可以看成一个负载均衡器,用于选择一个内部的EventLoop,默认采用round-robin算法
        chooser = chooserFactory.newChooser(children);
       //...
    }

2.2 创建EventLoop

MultiThreadEventExecutorGroup内部通过newChild方法创建NioEventLoop:

    // NioEventLoopGroup.java
    
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

NioEventLoop对象的构造,首先调用了父类SingleThreadEventLoop的构造方法,核心是初始化一个任务队列保存到内部:

    // SingleThreadEventLoop.java
    
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, 
                                    Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        // addTaskWakesUp默认为false,用于标记添加任务是否会唤醒线程
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        // 忽略这个tailTasks
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

SingleThreadEventLoop又调用了父类SingleThreadEventExecutor的构造方法,本质就是设置一些字段值:

    // SingleThreadEventLoop.java
    
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        this.executor = ThreadExecutorMap.apply(executor, this);
        // 初始化NioEventLoop内部的任务队列
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

2.3 注册Channel

前一章我已经讲解了ServerBootStrap的启动流程,ServerBootStrap注册ServerSocketChannel就是通过NioEventLoopGroup.register()方法完成的(定义在父类MultithreadEventLoopGroup中):

    // MultithreadEventLoopGroup.java
    
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
    public EventLoop next() {
        return (EventLoop) super.next();
    }

选择EventLoop

EventLoopGroup管理着内部的EventLoop数组,所以上述的next()方法就是选择一个EventLoop:

    // MultithreadEventExecutorGroup.java
    
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    
    public EventExecutor next() {
        return chooser.next();
    }

具体的选择策略通过EventExecutorChooser完成,默认为DefaultEventExecutorChooserFactory,采用了Round-Robin轮询策略:

    // DefaultEventExecutorChooserFactory.java
    
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;
    
        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }
    
        @Override
        public EventExecutor next() {
            // Round-Robin轮询算法
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

异步注册

接着,调用父类SingleThreadEventLoop的register方法注册Channel。这里将NioServerSocketChannel和SingleThreadEventLoop封装成了一个DefaultChannelPromise对象,DefaultChannelPromise可以看成是一种特殊的ChannelFuture,可以手动设置异步Future的执行结果状态:

    // SingleThreadEventLoop.java
    
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

Channel内部有一个UnSafe类,封装了对底层Java NIO SocketChannel的许多方法的调用:

    // AbstractChannel.AbstractUnsafe.java
    
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(eventLoop, "eventLoop");
        // 如果已经注册过
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        // ...
    
        AbstractChannel.this.eventLoop = eventLoop;
        // 如果是当前线程是EventLoop内部的工作线程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } 
        else {
            try {
                // 提交一个异步任务
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
               //...
            }
        }
    }

从上面代码可以看到,Channel的注册是EventLoop通过内部的Executor执行器提交一个异步任务完成的,之所以这样做是因为了使得EventLoop通过一个内部的工作线程就可以实现线程安全的任务执行。

本节我们先重点关注Channel的注册,后面我会分析EventLoop的异步任务执行逻辑:

    // AbstractChannel.java
    
    private void register0(ChannelPromise promise) {
        try {
            // 由于是异步执行,所以检查一下任务状态,避免Channel已经关闭
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            // 执行注册
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // 触发handlerAdded事件,在Pipeline中传播
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
            // 触发channelRegistered事件,在Pipeline中传播
            pipeline.fireChannelRegistered();
    
            // 如果有新连接建立,则触发channelActive事件
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            //...
        }
    }

对于NioServerSocketChannel来说,doRegister方法在AbstractNioChannel类中实现,它的内部实际就是通过Java NIO的java.nio.channels.SelectableChannel完成在java.nio.channels.Selector上注册的:

    // AbstractNioChannel.java
    
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 这里调用了
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                //...
            }
        }
    }

Pipieline事件传播

回到AbstractChannel.register0()方法中,Channel注册完成后,会生成一系列事件,并在该Channel的pipeline中传播,主要有三类事件:

  • handlerAdded事件:通过pipeline.invokeHandlerAddedIfNeeded()触发;
  • channelRegistered事件:通过pipeline.fireChannelRegistered()触发;
  • channelActive事件:通过pipeline.fireChannelActive()触发。
    // AbstractChannel.java
    
    private void register0(ChannelPromise promise) {
        try {
            //...
            // 执行注册
            doRegister();
            neverRegistered = false;
            registered = true;
    
            // 首次注册成功时,触发handlerAdded事件,在Pipeline中传播
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
            // 触发channelRegistered事件,在Pipeline中传播
            pipeline.fireChannelRegistered();
    
            // 如果有新连接建立,则触发channelActive事件
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            //...
        }
    }

我们先来看pipeline.invokeHandlerAddedIfNeeded(),我之前分析过ServerBootStrap的启动流程,那时我们创建了一个ChannelInitializer对象,并添加到Channel的pipeline中。

事实上,Channel注册完成后,会调用Pipeline的invokeHandlerAddedIfNeeded方法,而该方法ChannelHandler的handlerAdded方法,由于初始时我们只往Pipeline中添加了一个ChannelInitializer,所以会触发 ChannelInitializer 对象的 handlerAdded 方法:

    // ChannelInitializer.java
    
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // 初始化Channel
            if (initChannel(ctx)) {
                removeState(ctx);
            }
        }
    }

ChannelInitializer的handlerAdded方法内部,调用了initChannel这个模板方法,它的内部又调用了我们自己覆写的initChannel,其实就是将我们自定义的ChannelHandler添加到Pipeline中:

    // ChannelInitializer.java
    
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { 
            try {
                // 这个initChannel方法是一个抽象方法,一般我们自己的实现是将自定义的handlers添加到pipeline中
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                // 然后将ChannelInitializer自身移除 
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

通过上面的代码可以看出,ChannelInitializer最终会将自己从Pipeline移除,避免重复执行。下图示意了这一过程,LoggingHandler和EchoClientHandler是我们自定义的业务Handler:

202308022225210656.png


我们再来看Pipeline.fireChannelRegistered()方法,这个方法就是往 Pipeline 中扔一个 channelRegistered 事件,register 属于Inbound(入站)事件,Pipeline 接下来要做的就是执行流水线中所有 Inbound 类型的 handlers 中的 channelRegistered() 方法:

    // DefaultChannelPipeline.java
    
    public final ChannelPipeline fireChannelRegistered() {
        // Register事件属于入站事件,所以从head头开始触发
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    // AbstractChannelHandlerContext.java
    
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        // 执行Handler节点的invokeChannelRegistered方法
        // 如果当前线程不是EventLoop内的工作线程,依然以提交异步任务方式执行
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }
    
    private void invokeChannelRegistered() {
        if (invokeHandler()) {    // 确保ChannelHandler#handlerAdded方法已经执行过
            try {
                // 调用当前Handler的channelRegistered方法
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
    // DefaultChannelPipeline.java
    
    public void channelRegistered(ChannelHandlerContext ctx) {
        // 判断是否需要执行当前节点的handlerAdded方法
        invokeHandlerAddedIfNeeded();
        // 向后传播注册inbound事件
        ctx.fireChannelRegistered();
    }

关键看当前Handler节点的fireChannelRegistered方法,它内部的findContextInbound()方法会沿着 pipeline 找到下一个 Inbound 类型的 handler:

    // AbstractChannelHandlerContext.java
    
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    }
    
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        // 找到下一个Inbound Handler
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }

通过上面的源码,我们也就理解了pipeline.fireChannelRegistered()方法和context.fireChannelRegistered()方法的区别:

  • pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline的head 中,pipeline 中的 handlers 依次处理该事件;
  • context.fireChannelRegistered() 是当前 handler 处理完该事件后,向后传播给下一个 handler,依次执行。

最后,我们来看channelActive事件,它是通过pipeline.fireChannelActive()触发的,原理和channelRegistered 事件是一样的:

    // DefaultChannelPipeline.java
    
    public final ChannelPipeline fireChannelActive() {
        // 从pipeline的头部Handler开始向后传播事件
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    // AbstractChannelHandlerContext.java
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    
    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelActive();
        }
    }

三、EventLoop

EventLoopGroup可以看成是一个管理EventLoop的容器,真正执行任务的是EventLoop,它是 Netty Reactor 线程模型的核心处理引擎。Netty中的所有I/O操作都是异步执行的,比如上一节中讲解的Channel注册,其实只是向EventLoop中提交了一个任务,由EventLoop负责异步执行,那么它是如何高效地实现事件循环和任务处理机制的呢?

本节,我就来分析NioEventLoop的底层原理,NioEventLoop的类继承体系如下图:

202308022225221237.png

NioEventLoop 需要负责 IO 事件和非 IO 事件,NioEventLoop 内部有一个线程一直在执行 Selector 的 select 方法或者正在处理 SelectedKeys,如果有外部线程提交一个任务给NioEventLoop ,任务就会被放到 它内部的taskQueue 中,该队列是线程安全的,默认容量为 16:

202308022225231898.png

我们之前分析EventLoopGroup源码时已经看过EventLoop的构造了,那里只是创建了EventLoop对象,并没有启动它的内部线程。事实上,EventLoop创建内部工作线程的时机是在第一个任务提交时,一般就是SocketChannel的register操作。

3.1 工作线程创建

我们从Channel的注册入手,看下EventLoop是如何进行任务调度的:

    // AbstractChannel.java
    
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        //...
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 重点看这里
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

在EventLoop中,有一个重要的execute(Runnable command)方法(事实上继承自父类SingleThreadEventExecutor)

    // SingleThreadEventExecutor.java
    
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private void execute(Runnable task, boolean immediate) {
        // 1.判断当前执行线程是否为EventLoop内部的工作线程
        boolean inEventLoop = inEventLoop();
        // 2.添加任务到内部队列
        addTask(task);
        // 3.如果不是EventLoop内部线程提交的task,则判断内部线程是否已经启动,没有则启动内部线程
        if (!inEventLoop) {
            // 启动内部线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    // 执行拒绝策略
                    reject();
                }
            }
        }
    
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

继续看startThread()方法:

    // SingleThreadEventExecutor.java
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            // 这里用了CAS操作,以非阻塞的线程安全方式更新
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    // 执行启动内部线程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

doStartThread方法看起来很长,其实就是向NioEventLoop内部的ThreadPerTaskExecutor提交一个任务,ThreadPerTaskExecutor会创建一个新线程来执行这个任务,这个线程就是NioEventLoop内部的工作线程:

    // SingleThreadEventExecutor.java
    
    private volatile Thread thread;
    
    private void doStartThread() {
        assert thread == null;
        // 对于NioEventLoop,这个executor就是ThreadPerTaskExecutor
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 将这个线程设置为NioEventLoop的内部工作线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 执行 SingleThreadEventExecutor 的 run() 方法,它在 NioEventLoop 中实现了
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   //...
                }
            }
        });
    }

工作线程会执行到上述代码的SingleThreadEventExecutor.this.run()这一行,这是一个抽象方法,需要子类也就是NioEventLoop自己实现,NioEventLoop会在此处实现自己的核心任务调用逻辑。

3.2 任务调度流程

我们继续看NioEventLoop的run方法,这是NioEventLoop的核心逻辑,NioEventLoop 每次循环的处理流程都包含 IO事件轮询 select事件处理 processSelectedKeys任务处理 runAllTasks 几个步骤,其中有几个关键点我重点说一下:

  1. selectStrategy用于控制工作线程的select策略,在存在异步任务的场景,NioEventLoop 会优先保证 CPU 能够及时处理异步任务;
  2. ioRatio参数用于控制 I/O 事件处理和内部任务处理的时间比例,默认值为50。如果 ioRatio = 100,表示每次处理完 I/O 事件后,会执行所有的 task。如果 ioRatio < 100,也会优先处理完 I/O 事件,再处理异步任务队列。所以无论如何,processSelectedKeys() 都是先执行的。
    // NioEventLoop.java
    
    protected void run() {
        int selectCnt = 0;
        // 一直循环执行
        for (;;) {
            try {
                int strategy;
                try {
                    // select处理策略,用于控制select循环行为,包含CONTINUE、SELECT、BUSY_WAIT三种策略
                    // Netty不支持BUSY_WAIT,所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE; 
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {
                                    // 轮询I/O事件
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                } catch (IOException e) {
                    //...
                    continue;
                }
    
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                // 根据ioRatio,选择执行IO操作还是内部队列中的任务
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
    
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // 解决JDK的epoll空轮询问题
                    selectCnt = 0;
                }
            }
            //...
        }
    }

总之,NioEventLoop的run方法是一个无限循环,不间断执行以下三件事情:

  • select:轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件;
  • processSelectedKeys:根据SelectedKeys,处理已经准备就绪的 I/O 事件;
  • runAllTasks:执行内部队列中的任务。

202308022225257169.png

同时,为了提升效率,NioEventLoop会就是根据一定的策略和ioRatio参数配置,选择究竟是执行内部队列中的任务,还是执行IO操作。

epoll空轮询

注意,NioEventLoop的run方法中,有这么一段代码:

    // NioEventLoop.java
    
    protected void run() {
        int selectCnt = 0;
        // 一直循环执行
        for (;;) {
            //...
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("..."");
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // 解决JDK的epoll空轮询问题
                selectCnt = 0;
            }
            //...
        }
    }
    
    private boolean unexpectedSelectorWakeup(int selectCnt) {
        if (Thread.interrupted()) {
            return true;
        }
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            rebuildSelector();
            return true;
        }
        return false;
    }

上述的unexpectedSelectorWakeup方法用于解决JDK NIO中 Epoll 实现的空轮询问题。所谓 JDK Epoll 空轮询 ,是指NIO线程在没有感知到select事件时,应该处于阻塞状态,但是JDK的epoll实现会出现即使 Selector 轮询的事件列表为空,NIO线程一样可以被唤醒,导致 CPU 100% 占用。

Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性,Netty很巧妙的规避了这个问题, 它提供了一种检测机制判断I/O线程是否可能陷入空轮询,具体的实现方式如下:

  1. 首先,EventLoop在每次执行 select 操作前,都会记录当前时间 currentTimeNanos。通过时间比对,如果Netty发现NIO线程的阻塞时间并未达到预期,则认为可能触发了空轮询的 Bug;
  2. 同时,Netty 引入了计数变量 selectCnt。在正常情况下,selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,会触发重建 Selector 对象;
  3. 然后,Netty会将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后异常的 Selector 就可以废弃了。

总结一下,Netty解决 JDK Epoll 空轮询 问题的思路就是:引入计数器变量,统计一定时间窗口内 select 操作的执行次数,识别出可能存在异常的 Selector 对象,然后采用重建 Selector 的方式巧妙地避免了 JDK epoll 空轮询的问题。

3.3 轮询I/O事件

先来看轮询IO事件,比较简单,就是调用底层Java NIO Selector的select方法

    // NioEventLoop.java
    
    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            // 调用了底层NIO Selector的select方法
            return selector.select();
        }
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

3.4 处理I/O事件

通过 select 过程,NioEventLoop已经获取到准备就绪的 I/O 事件,接下来需要调用 processSelectedKeys() 方法处理IO事件:

    // NioEventLoop.java
    
    private SelectedSelectionKeySet selectedKeys;    // 保存java.nio.channels.SelectionKey的集合
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            // Netty优化过的selectedKeys
            processSelectedKeysOptimized();
        } else {
            // 正常处理逻辑
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

处理 I/O 事件时有两种选择,Netty 是否采用优化策略由 DISABLE_KEYSET_OPTIMIZATION参数决定:

  1. 处理 Netty 优化过的 selectedKeys;
  2. 正常的处理逻辑。

根据是否设置了 selectedKeys 来判断采用哪种策略,这两种策略的差异就是使用的 selectedKeys 集合不同。第一种Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型。

processSelectedKeysPlain

我们先来看正常的处理逻辑:

    // NioEventLoop.java
    
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }
    
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();
    
            if (a instanceof AbstractNioChannel) {
                // I/O 事件由 Netty 负责处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                // 用户自定义任务
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (!i.hasNext()) {
                break;
            }
    
            // Netty在处理I/O事件时,如果发现超过256个(默认)Channel从Selector对象中移除,就会将needsToSelectAgain置true,重新做一次轮询,从而确保 keySet 的有效性
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

NioEventLoop会遍历已经就绪的 SelectionKey,SelectionKey 的类型可能是AbstractNioChannelNioTask,这两种类型对应的处理方式是不同的,我们关注AbstractNioChannel类型即可:

    // NioEventLoop.java
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {    // 检查 Key 是否合法
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop == this) {    
                unsafe.close(unsafe.voidPromise());    // Key 不合法,直接关闭连接
            }
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // 处理连接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // 将该事件从事件集合中清除,避免事件集合中一直存在连接建立事件
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 通知上层连接已经建立
                unsafe.finishConnect();
            }
            // 处理可写事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            // 处理可读事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
  • OP_CONNECT连接事件 :EventLoop内部调用了unsafe.finishConnect(),底层调用pipeline().fireChannelActive() 方法,这时会产生一个 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelActive() 方法,通知各个 ChannelHandler 连接建立成功;

  • OP_WRITE可写事件 :内部会执行ch.unsafe().forceFlush()操作,将数据刷到对端,最终会调用Java NIO中Channel .write()方法执行底层写操作;

  • OP_READ可读事件 :Netty 将 OP_READ和 OP_ACCEPT事件进行了统一封装,都通过unsafe.read()进行处理,unsafe.read() 的处理逻辑如下:

    • 从 Channel 中读取数据并存储到分配的 ByteBuf;
    • 调用pipeline.fireChannelRead()方法产生 Inbound 事件,在Pipeline中传播,依次调用 ChannelHandler 的 channelRead() 方法处理数据;
    • 调用pipeline.fireChannelReadComplete()方法完成读操作,同样在Pipeline传播;
    • 执行 removeReadOp()清除 OP_READ 事件。

processSelectedKeysOptimized

介绍完正常的 I/O 事件处理 processSelectedKeysPlain 后,我们再分析 Netty 优化的 processSelectedKeysOptimized 源码就轻松很多:

    // NioEventLoop.java
    
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            selectedKeys.keys[i] = null;
    
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
    
                selectAgain();
                i = -1;
            }
        }
    }

可以发现 processSelectedKeysOptimized 与 processSelectedKeysPlain 的主要区别就是 selectedKeys 的遍历方式不同,来看下 SelectedSelectionKeySet 的源码:

    // SelectedSelectionKeySet.java
    
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
        //...
    }

可以看到,SelectedSelectionKeySet内部维护了一个SelectionKey数组,所以processSelectedKeysOptimized可以直接通过遍历数组取出 I/O 事件,相比processSelectedKeysPlain采用的 JDK HashSet 的遍历效率更高。

SelectedSelectionKeySet 内部通过 size 变量记录数组的逻辑长度(每次执行 add 操作,会将元素添加到 SelectionKey[] 尾部,当 size 等于 SelectionKey[] 的真实长度时,自动扩容),相比于HashSet,由于不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的 add 操作。

Netty在创建Selector时,会通过反射的方式,将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet,而原先 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型。

3.5 内部任务处理

NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性,Netty没有使用J.U.C中的并发队列,而是自己实现了 多生产者单消费者队列 MpscChunkedArrayQueue ,这个我会在后续章节分析。

NioEventLoop 处理的任务类型基本可以分为三类:

  • 普通任务 :通过NioEventLoop.execute()方法向任务队列 taskQueue 中添加任务,例如NioServerSocketChannel 的 注册;
  • 定时任务 :通过 NioEventLoop.schedule()方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务,例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现;
  • 尾部队列 :tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。

EventLoop处理内部任务有两个方法:

  • runAllTasks;
  • runAllTasks(long timeoutNanos)。

第二种方式携带了超时时间,防止NIO工作线程因为处理任务时间过长而导致 I/O 事件阻塞,我们来看下NioEventLoop中的runAllTasks(long timeoutNanos)方法:

    // NioEventLoop.java
    
    protected boolean runAllTasks(long timeoutNanos) {
        // 合并定时任务到普通任务队列
        fetchFromScheduledTaskQueue();
        // 从普通任务队列中取出任务并处理
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
        // 计算任务处理的超时时间
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            // 安全执行任务,执行调用Runnable任务的run()方法同步执行
            safeExecute(task);
            runTasks ++;
            // 每执行 64 个任务检查一下是否超时
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            // 取出下一个任务,继续执行
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        // 收尾工作,执行尾部队列tailTasks的任务
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

EventLoop内部的定时任务队列scheduledTaskQueue本质是一个优先级队列,里面的定时任务只有满足截止时间 deadlineNanos小于当前时间,才可以取出合并到普通任务队列中。由于优先级队列是有序的,所以只要取出的定时任务不满足合并条件,那么队列中剩下的任务都不会满足条件,合并操作完成并退出。

最后,NioEventLoop执行内部队列中的任务的流程是比较简单的,就是不断取出Runnable任务,最终直接调用run方法执行:

    // AbstractEventExecutor.java
    
    protected static void safeExecute(Runnable task) {
        try {
            // 安全执行任务,执行调用Runnable任务的run()方法同步执行
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

另外,整个执行过程中还有一个收尾动作afterRunningAllTasks,主要是执行尾部队列tailTasks的任务。尾部队列并不常用,主要用于一些统计场景,比如,我们想对 Netty 的运行状态进行统计分析,如任务循环耗时、占用内存大小等等,则可以向尾部队列添加一个任务执行统计。

四、总结

本章,我对Netty中核心的EventLoopGroup和EventLoop的底层原理进行了分析,核心是EventLoop的设计和实现。EventLoop的设计思想被广泛运用于各类的开源框架中,如 Kafka、Redis、Nginx等等,学习它的源码可以让我们对Java NIO有更深层次的理解。

通过EventLoop的底层源码解读,我们应该意识到在使用Netty进行日常开发时,需要遵循一些原则:

  1. 尽量使用主从Reactor模型,即采用 Boss 和 Worker 两个 EventLoopGroup,分担 Reactor 线程的压力;
  2. Reactor线程模式只适合处理耗时短的任务场景,对于耗时较长的业务 ChannelHandler,最好自己维护一个业务线程池,异步处理处理任务,避免因为ChannelHandler阻塞而造成EventLoop不可用。
  3. 在设计业务架构时,需要明确业务分层和 Netty 分层之间的界限,不要一味地将业务逻辑都添加到 ChannelHandler 中,完全可以基于MQ等中间件进行解耦。

Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文