Netty异步Future源码解读

 2023-02-05
原文作者:LemonNan 原文地址:https://juejin.cn/post/6844904021887565831

本文地址: juejin.im/post/684490…

说在前面

本文的 Netty源码使用的是 4.1.31.Final 版本,不同版本会有一些差异.

JDK Future

在说Netty的异步Future之前,先简单介绍一下JDK自带的Future机制.

首先先上一段代码

    public class JDKFuture {
    
        static ExecutorService executors = new ThreadPoolExecutor(1,
                1,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(16));
        public static void main(String[] args) throws Exception{
            int cnt = 1;
            Future[] jdkFuture=new Future[cnt];
            Object jdkFutureResult;
            for(int i = 0;i < cnt; i++){
                jdkFuture[i] = executors.submit(new JDKCallable(i));
            }
            System.out.println(String.format("%s 在 %s 即将获取任务执行结果", Thread.currentThread(), new Date()));
            jdkFutureResult = jdkFuture[0].get();
            System.out.println(String.format("%s 在 %s 任务结果获取完毕 %s", Thread.currentThread(), new Date(), jdkFutureResult));
            executors.shutdown();
        }
    
        static class JDKCallable implements Callable{
    
            int index;
    
            JDKCallable(int ind){
                this.index = ind;
            }
    
            public Object call() throws Exception {
                try {
                    System.out.println(String.format("线程 [%s] 提交任务[%s]", Thread.currentThread(), this.index));
                  // 耗时2秒,模拟耗时操作
                    Thread.sleep(2000);
                    System.out.println(String.format("线程 [%s] 执行任务[%s]执行完毕", Thread.currentThread(), this.index));
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
                return String.format("任务%s执行结果",this.index);
            }
        }
    }

输出结果为:

    线程 [Thread[pool-1-thread-1,5,main]] 提交任务[0]
    Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即将获取任务执行结果
    线程 [Thread[pool-1-thread-1,5,main]] 执行任务[0]执行完毕
    Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任务结果获取完毕 任务0执行结果

可以看到主线程在使用 future.get() 的时候,因为子线程还未处理完返回结果而导致主线程活生生的等了2秒钟(耗时操作),这也是JDK自带的Future机制不够完善的地方.因为jdk自身的future机制不够完善,所以Netty自实现了一套Future机制.

Netty 异步Future/Promise

Netty的Future是异步的,那他是怎么实现的呢?接下来就从源码开始探究.

先看一下 Netty 的 FuturePromise 这两个接口

Future

    /**
     * The result of an asynchronous operation
     * 异步操作的结果
     * 对状态的判断、添加listener、获取结果
     */
    public interface Future<V> extends java.util.concurrent.Future<V> {
        boolean isSuccess();
        boolean isCancellable();
        Throwable cause();
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        Future<V> sync() throws InterruptedException;
        Future<V> syncUninterruptibly();
        Future<V> await() throws InterruptedException;
        Future<V> awaitUninterruptibly();
        boolean await(long timeout, TimeUnit unit) throws InterruptedException;
        boolean await(long timeoutMillis) throws InterruptedException;
        boolean awaitUninterruptibly(long timeout, TimeUnit unit);
        boolean awaitUninterruptibly(long timeoutMillis);
        V getNow();
      
        @Override
        boolean cancel(boolean mayInterruptIfRunning);
    }

Promise

Promise是一个特殊的Future,它可写,可写意味着可以修改里面的结果.

    /**
     * Special {@link Future} which is writable.
     * 一个可写的特殊的Future
     * 继承 Future, 继承的方法就不列出
     */
    public interface Promise<V> extends Future<V> {
    
        /**
         * Marks this future as a success and notifies all
         * listeners.
         * If it is success or failed already it will throw an {@link IllegalStateException}.
         * 将这个 future 标记为 success 并且通知所有的 listeners
         * 如果已经成功或者失败将会抛出异常
         */
        Promise<V> setSuccess(V result);
    
        /**
         * Marks this future as a success and notifies all
         * listeners.
         *
         * @return {@code true} if and only if successfully marked this future as
         *         a success. Otherwise {@code false} because this future is
         *         already marked as either a success or a failure.
         * 尝试设置结果,成功返回true, 失败 false, 上面的方法设置失败会抛出异常
         */
        boolean trySuccess(V result);
    
      	// 这2个跟上面的差不多
        Promise<V> setFailure(Throwable cause);
        boolean tryFailure(Throwable cause);
    
        /**
         * Make this future impossible to cancel.
         *
         * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
         *         without being cancelled.  {@code false} if this future has been cancelled already.
         */
        boolean setUncancellable();
    }

源码解读

看到这里都同学都默认是用netty写过程序的~,还没写过的话可以看看官方文档或者我的另一篇Netty使用.

接下来就开始源码的解读.

那么从哪里开始呢?

总所周知!,我们使用Netty开发的时候,写出数据用的是 writeAndFlush(msg), 至于 write(msg) 嘛, 不就是少了个 flush (没错,是我比较懒).

开始

在大家知道 channel().writectx.write 的区别后, 我们就从 channel().write 开始讲起.

不行,我感觉还是要说一下一些补充的,不然心里不舒服.

Netty中有一个pipeline,也就是事件调用链,开发的时候在调用链里面加入自己处理事件的handle,但是在这条 pipeline 中, Netty给我们加上了 Headtail 这两个handle,方便Netty框架处理事件.

先看 DefaultChannelPipeline 的初始化,在初始化代码里给我们添加了2个handle, head 和 tail, 这2个东西很有用,为什么这么说呢?详情看后面解答

        protected DefaultChannelPipeline(Channel channel) {
            this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
            this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
            this.voidPromise = new VoidChannelPromise(channel, true);
            // ChannelInboundHandler
            this.tail = new DefaultChannelPipeline.TailContext(this);
    	      // ChannelInboundHandler && ChannelOutboundHandler
            this.head = new DefaultChannelPipeline.HeadContext(this);
            this.head.next = this.tail;
            this.tail.prev = this.head;
        }

Real 开始

没错,还是从 channel().write(msg) 开始说起(为什么我要用还是).

跟踪代码 channel().write(), 首先会调用到 DefaultChannelPipeline的 writeAndFlush 方法.

1.DefaultChannelPipeline#writeAndFlush

        public final ChannelFuture writeAndFlush(Object msg) {
            return this.tail.writeAndFlush(msg);
        }

this.tail 就是上面构造函数里面初始化的 tailHandle, 而 write 是出栈事件, 会从 tailHandle 开始往前传递,最后传递到 headHandle(怎么感觉好像提前剧透了).

    public ChannelFuture writeAndFlush(Object msg) {
      	// 这里new了一个 promise, 然后这个promise将会一直传递,一直传递.....
        return this.writeAndFlush(msg, this.newPromise());
    }

接下来来到了AbstractChannelHandlerContext 的 writeAndFlush.

        /**
         * 执行 write and flush 操作
         * @param msg
         * @param promise
         */
        private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
            // 这个方法在 ChannelHandler#handlerAdded 调用后,才会返回 true
            if (invokeHandler()) {
                // write 继续传递
                invokeWrite0(msg, promise);
                // flush data
                invokeFlush0();
            } else {
                writeAndFlush(msg, promise);
            }
        }
    
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            // 查找下一个 OutboundHandle, 因为是要输出
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            // 下一个 OutboundHandle 所在的线程
            EventExecutor executor = next.executor();
            // 如果在是同一个线程(由于Netty的channel在一个ThreadPool中只绑定一个Thread, 不同线程的话也意味着是不同线程池)
            if (executor.inEventLoop()) {
                // 在同一个线程池(这里意味着同一个线程)中,
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                // 在不同线程池(不同线程池那自然就是不同线程),需要创建一个任务,提交到下一个线程池
                final AbstractWriteTask task;
                if (flush) {
                    // 提交给下一个线程池 && flush
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                // 因为是 write 事件, so 接下来提交任务到下一个 OutboundHandle(出栈) 所在的线程, 由它执行
                if (!safeExecute(executor, task, promise, m)) {
                    // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                    // and put it back in the Recycler for re-use later.
                    //
                    // See https://github.com/netty/netty/issues/8343.
                    // 任务提交失败,取消任务
                    task.cancel();
                }
            }
        }

2.HeadContext#write、flush

接下来本篇文章最重要的地方了, HeadContext !

HeadContext的write和flush方法 实际上都是调用 unsafe的方法实现.

write

    	// 如果是 writeAndFlush ,调用 write后会调用flush
    	@Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                // 这个调用 AbstrachUnsafe.write
                unsafe.write(msg, promise);
            }
    
    	// 这是 unsafe 的 write 方法
            @Override
            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                // outboundBuffer = null 表明 channel已经关闭并且需要将 future 结果设置为 false
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                    // release message now to prevent resource-leak
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                int size;
                try {
                    msg = filterOutboundMessage(msg);
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    ReferenceCountUtil.release(msg);
                    return;
                }
                // 将 msg添加进 buffer 中
                outboundBuffer.addMessage(msg, size, promise);
            }

flush

如果是WriteAndFlush, 则在调用write后,会调用Head的flush方法,同 write是调用AbstractUnsafe的flush

            /**
             * write 之后再调用这个 flush
             */
            @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    return;
                }
    
                // buffer 标记为可以被 flush
                outboundBuffer.addFlush();
                // 接下来就是真正的 flush
                flush0();
            }

ChannelOutboundBuffer 是个啥呢?

ChannelOutboundBuffer 简单来说就是存储当前channel写出的数据, 并且在调用flush的时候将他们都写出去.

跟着源码一直走,在 flush0 之后,最终会调用到 AbstractNioMessageChannel#doWrite 方法.(上面还有doRead方法,是接收数据的时候调用的)

        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            final SelectionKey key = selectionKey();
            final int interestOps = key.interestOps();
    
            for (;;) {
                //
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    // 判断写事件
                    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                    }
                    break;
                }
                try {
                    // 循环写出数据
                    boolean done = false;
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                        // 真正的写出数据
                        // 最终会调用 javaChannel().send(nioData, mi); 
                        // 很眼熟吧,这个是java nio的方法,注册的时候也是javaChannel().register()
                        if (doWriteMessage(msg, in)) {
                            done = true;
                            break;
                        }
                    }
    
                    // 成功写出,从 buffer 中移除刚才写出的数据
                    if (done) {
                        in.remove();
                    } else {
                        // Did not write all messages.
                        // 写出失败
                        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                            key.interestOps(interestOps | SelectionKey.OP_WRITE);
                        }
                        break;
                    }
                } catch (Exception e) {
                    // 出错后是否继续写出后面的数据
                    if (continueOnWriteError()) {
                        in.remove(e);
                    } else {
                        throw e;
                    }
                }
            }
        }

3.Promise

到上面位置,数据是写出去了,那promise的相关作用呢?没看出来啊?

说实话,这个藏得挺深,居然! 放在了 buffer.remove() 里!

        public boolean remove() {
            // 刚写出去数据的Entry
            Entry e = flushedEntry;
            if (e == null) {
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
          	// 这个就是writeAndFlush 的时候 new 的 DefaultPromise()
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
    	      // buffer 中移除
            removeEntry(e);
    
            if (!e.cancelled) {
                // only release message, notify and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);
    	          // !!! 划重点 !!!
    						// 这里设置了 promise 的结果, 调用了 trySuccess, 通知所有 listener
                // !!! 划重点 !!!
                safeSuccess(promise);
                decrementPendingOutboundBytes(size, false, true);
            }
    
            // recycle the entry
    	      // 重置Entry的信息,方便重用.
            // 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相对应, newInstance 是获取一个缓存的 Entry
            e.recycle();
            return true;
        }

promise 通知所有 listener 是在写数据成功,并且在 buffer.remove() 调用的时候在里面 safeSuccess(promise) , 最终调用 Promise 的 trySuccess() 从而触发 notifyListeners() 通知所有 listeners.

4.NotifyListener

这个是在 Promise#trySuccess的时候调用的,通知所有listeners操作已经完成.

        /**
         * 通知监听者,任务已经完成
         */
        private void notifyListeners() {
            // 获取future所属线程(池)
            EventExecutor executor = executor();
            // 执行通知是当前线程 则直接回调信息
            // currentThread == this.executor
            if (executor.inEventLoop()) {
                // 获取 ThreadLocal 变量
                final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
                // listen 的层级数
                final int stackDepth = threadLocals.futureListenerStackDepth();
                if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                    threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                    try {
                        // 通知所有的 listener
                        notifyListenersNow();
                    } finally {
                        threadLocals.setFutureListenerStackDepth(stackDepth);
                    }
                    return;
                }
            }
    
            // 如果 executor 不是当前线程, 则交给 future 所属 executor 去执行
            // 意思是添加通知的 executor 可能是前面的 executor , 然后到后面的 executor 也就是当前线程才执行通知
            // 此时将通知交回给之前的 executor
            // 执行通知的不是当前线程, 封装成一个任务, 由之前提交的线程完成通知(回调)
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    notifyListenersNow();
                }
            });
        }

总结

Netty 的 Future 异步机制是在操作完成后,将通知封装成Task,由Promise所属线程(Executors)执行.