继续flush
监听器添加完了, 要开始发送了。我们一路下来,先到HeadContext
的flush
:
AbstractUnsafe的flush
ChannelOutboundBuffer的addFlush
这里就是快要发送了,要把消息设置成不可取消状态:
DefaultPromise的setUncancellable
如果设置成功,就说明已经不可取消了,如果没有什么设置过就能设置成功,result
变为UNCANCELLABLE
对象:
AbstractUnsafe的flush0
NioSocketChannel的doWrite
然后来到这里,发送完成之后要进行删除操作:
ChannelOutboundBuffer的removeBytes
ChannelOutboundBuffer的remove
总算看到我们写的时候的回调对象了,编号1811
。
然后就是这里的safeSuccess
就是写成功就结果,我们看上面那句,就是我们前面讲的netty
自动释放的地方,原来是一起的,发送完成了就释放和回调:
ChannelOutboundBuffer的safeSuccess
看到这里result
是null
哦:
ChannelOutboundBuffer的trySuccess
调用了Promise
的trySuccess
:
DefaultPromise的setSuccess0
刚好可以设置SUCCESS
对象。
这个时候你会发现result
其实有值了,就是前面设置的UNCANCELLABLE
对象。所以第一个判断为false
,但是第二个还有呢,第二个是true
。
DefaultPromise的checkNotifyWaiters
通知之前,得检查下是否有监听器,没有就不通知了,而且还要看是否有人因为等通知而wait
阻塞在,也需要唤醒。
DefaultPromise的notifyListeners
这里有个栈深度的检查,避免溢出,但是后面发现这个地方好像没啥用,因为里面有个同步代码块,如果在处理监听了就直接返回了,就算立面处理的监听有再添加其他监听器,也进了方法,但是直接会返回,所以栈不积压的,最多是2
,我想这么做或许是其他我没想到的情况吧。
DefaultPromise的notifyListenersNow
这里就要进行真正的回调啦。为了线程安全,用了同步代码块,然后循环通知所有监听器,因为同步代码块的粒度还是比较细的,所以可能通知完了之后又有新的添加到this.listeners
,所以还要循环处理,直到没有监听器位置才返回。
//现在通知
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;//设置已经通知
listeners = this.listeners;
this.listeners = null;
}
for (;;) {//循环通知
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {//处理完了
notifyingListeners = false;//设置回来,返回
return;
}
listeners = this.listeners;//还有新的加进来了继续处理
this.listeners = null;
}
}
}
DefaultPromise的notifyListeners0
调用我们的操作完成回调,结果编号1811
没变。
至此,我们的异步回调终于完成了,我们现在知道,原来我们write
的时候是不回调成功的,除非出了什么异常,否则只有当flush
的完成了才回调成功。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。