2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104696437

继续flush

监听器添加完了, 要开始发送了。我们一路下来,先到HeadContextflush

202309132210057711.png

AbstractUnsafe的flush

202309132210063032.png

ChannelOutboundBuffer的addFlush

这里就是快要发送了,要把消息设置成不可取消状态:

202309132210081233.png

DefaultPromise的setUncancellable

如果设置成功,就说明已经不可取消了,如果没有什么设置过就能设置成功,result变为UNCANCELLABLE对象:

202309132210091964.png

AbstractUnsafe的flush0

202309132210101025.png

NioSocketChannel的doWrite

然后来到这里,发送完成之后要进行删除操作:

202309132210109226.png

ChannelOutboundBuffer的removeBytes

202309132210118247.png

ChannelOutboundBuffer的remove

总算看到我们写的时候的回调对象了,编号1811

202309132210126268.png
然后就是这里的safeSuccess就是写成功就结果,我们看上面那句,就是我们前面讲的netty自动释放的地方,原来是一起的,发送完成了就释放和回调:

202309132210157759.png

ChannelOutboundBuffer的safeSuccess

看到这里resultnull哦:

2023091322101702310.png

ChannelOutboundBuffer的trySuccess

调用了PromisetrySuccess

2023091322101796511.png

DefaultPromise的setSuccess0

刚好可以设置SUCCESS对象。

2023091322101908812.png
这个时候你会发现result其实有值了,就是前面设置的UNCANCELLABLE对象。所以第一个判断为false,但是第二个还有呢,第二个是true

2023091322101963913.png

DefaultPromise的checkNotifyWaiters

通知之前,得检查下是否有监听器,没有就不通知了,而且还要看是否有人因为等通知而wait阻塞在,也需要唤醒。

2023091322102416014.png

DefaultPromise的notifyListeners

这里有个栈深度的检查,避免溢出,但是后面发现这个地方好像没啥用,因为里面有个同步代码块,如果在处理监听了就直接返回了,就算立面处理的监听有再添加其他监听器,也进了方法,但是直接会返回,所以栈不积压的,最多是2,我想这么做或许是其他我没想到的情况吧。

2023091322102520915.png

DefaultPromise的notifyListenersNow

这里就要进行真正的回调啦。为了线程安全,用了同步代码块,然后循环通知所有监听器,因为同步代码块的粒度还是比较细的,所以可能通知完了之后又有新的添加到this.listeners,所以还要循环处理,直到没有监听器位置才返回。

     //现在通知
        private void notifyListenersNow() {
            Object listeners;
            synchronized (this) {
                if (notifyingListeners || this.listeners == null) {
                    return;
                }
                notifyingListeners = true;//设置已经通知
                listeners = this.listeners;
                this.listeners = null;
            }
            for (;;) {//循环通知
                if (listeners instanceof DefaultFutureListeners) {
                    notifyListeners0((DefaultFutureListeners) listeners);
                } else {
                    notifyListener0(this, (GenericFutureListener<?>) listeners);
                }
                synchronized (this) {
                    if (this.listeners == null) {//处理完了
                        notifyingListeners = false;//设置回来,返回
                        return;
                    }
                    listeners = this.listeners;//还有新的加进来了继续处理
                    this.listeners = null;
                }
            }
        }

DefaultPromise的notifyListeners0

调用我们的操作完成回调,结果编号1811没变。

2023091322102647916.png

2023091322102739017.png

至此,我们的异步回调终于完成了,我们现在知道,原来我们write的时候是不回调成功的,除非出了什么异常,否则只有当flush的完成了才回调成功。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文