Netty的异步理解
Netty是以事件驱动的,那什么是事件驱动,简单理解就是闹钟,你以前读书的时候用过闹钟吧,你晚上10
点定时到噪声6
点闹,你不会等着他走到6
点到,你肯定睡觉去啦,6
点了闹钟闹铃响了,你就知道要起床了,这就是事件驱动,闹钟闹铃就是一个事件,驱动着你得起来了。这个是最常见的生活中的例子啊,Netty
基本上所有的逻辑都是事件驱动的,常见的就是处理器的监听读写事件,你应该没看到处理器是在一个死循环里轮询读写吧,只有当有读写事件的时候,从缓冲区读出事件了,才会去向管道里发送,然后被相应的处理器方法执行。而且他也提供了异步回调的机制,几乎所有的输出事件都是有异步回调结果的,一个ChannelFuture
对象,这个就是用来接收回调事件,从而驱动你做其他事。
JDK的Future
要讲异步回调,得先讲下这个,这个是Netty
对JDK
的Future
的封装,JDK
已经有了,为什么还要封装,那我们就要来看下JDK
的Future
接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看到,对于事情的进度,只有一个isDone
方法,只是告诉你完成了没有,具体是成功了,失败了,还是取消了,还是异常了,其实是不清楚。另外get
方法是阻塞的,就算你用超时的方法,你也得轮询,你压根不知道什么时候有返回。基于这些问题,netty
对他进行了升级。
Netty的Future
首先Netty希望能知道到底是不是成功了还是失败了,还要能知道如果失败了,是否有异常,最重要的还是想要完成的时候通知我,而不是我一直去轮询或者阻塞,所以使用了观察者模式,可以添加监听器,主要是下列4
个方法:
boolean isSuccess();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
ChannelFuture
前面有了回调,那现在我们想要看一些IO
操作的回调,也就是通道里的操作回调,而且回调后能对通道再做一些操作,所以我们希望回调能把通道实例传进来,于是有了这个接口,加了一个获取通道的方法,有了通道基本上已经万能了。
我们可以根据一些方法可以判断具体的完成情况,比如:
Promise
前面定义了一大堆,但是怎么样算成功,怎么样算失败呢,当然是需要根据情况设置啦,所以需要可以设置的接口,这个接口就是可以设置成功与失败的。主要是下列方法,为什么有tryXXX
,因为规定成功和失败结果只能设置一次,不能重复设置,重复设置就报异常,所以可以尝试,如果设置了就返回失败:
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
ProgressivePromise
除了想知道结果外,能不能知道运行的进度啊,比如为发一个大文件,你能告诉我现在发送了百分之多少么,于是又加了进度的方法,try
是因为可能已经完成了或者超出进度最大范围了,就什么都不做返回失败:
ProgressivePromise<V> setProgress(long progress, long total);
boolean tryProgress(long progress, long total);
其他的一些异步接口也差不多,我就不多说了,下面分析下使用和原理。
进行write
如果我们希望发送一个消息出去,等发完了告诉我,于是我这么写,为什么我把write
和flush
分开呢,因为连在一起可能会直接发出去,这个时候连事件还没监听就有了结果了,再监听就直接返回结果了,感觉就是同步的了,后面会源码解释:
ChannelFuture channelFuture = ctx.write(buf);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
System.out.println(future);
}
});
ctx.flush();
事件完成监听器
我们普通的监听器就是这样,有个回调方法:
事件进度监听器
还有一类是有进度的回调,多了一个进度回调方法:
追踪写回调结果channelFuture
我们还是一步步debug
来弄清楚这个异步回调这么来的,又是怎么回调的。当你调用写的时候他会创建一个可写的异步回调。
创建异步回调newPromise
会将当前的通道
和事件循环执行器
传进去,这样就能 保证回调的时候我们就能拿到通道实例 ,就可以干好多事啦,这里为什么还要有个执行器,通道里不是有么,其实这里的执行器是处理器上下文的执行器,可能和通道的不一样,看你创建的时候有没传了:
可以看到,执行器不一定是通道的,如果创建的时候没传,那就用通道的,否则可能是其他通道的执行器哦。
创建的时候可能会传,不过一般是null
。
成功创建异步回调promise编号1811
HeadContext的write
随后传入这里:
ChannelOutboundBuffer的addMessage
然后又到这里:
Entry的newInstance
最后被封装到Entry
中,也就是出站缓冲区里,居然没有写什么结果,是的,write
是没回调的,只有flush
了才有。
监听写回调结果
可见这个时候还没任何结果,于是我们添加监听器:
DefaultChannelPromise的addListener
最后到了这里:
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//同步添加
synchronized (this) {
addListener0(listener);
}
if (isDone()) {//如果完成了,就直接通知
notifyListeners();
}
return this;
}
DefaultChannelPromise的addListener0
这个方法在外面添加了同步操作,避免多线同时添加的安全问题,然后把监听器添加进去,如果监听器是DefaultFutureListeners
类型就直接添加,否则就把监听器封装成DefaultFutureListeners
类型。
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;//没有监听器
} else if (listeners instanceof DefaultFutureListeners) {//是DefaultFutureListeners就添加
((DefaultFutureListeners) listeners).add(listener);
} else {//创建一个,放进去
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
DefaultFutureListeners
这个是个单独的类型,是个容器,主要是用来放监听器的,我们可以看到,初始某人有个监听器数组,长度是2
,如果超出了就会扩容2
倍。而且他还能统计有多少个是GenericProgressiveFutureListener
类型的。
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;//监听器数组
private int size;//监听器个数,也是添加的索引
private int progressiveSize; // the number of progressive listeners 进度监听器数
@SuppressWarnings("unchecked")
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
//添加监听回调器,满了就扩容,每次2倍
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;//进度监听器数+1
}
}
//移除回调监听器
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {//向前移动数组元素
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
listeners[-- size] = null;
this.size = size;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
//所有监听器
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
public int size() {
return size;
}
public int progressiveSize() {
return progressiveSize;
}
}
DefaultChannelPromise的isDone
这个时候还没结果哦:
当然是没完成啦:
于是没完成,这里就是我前面说了,如果完成了,就直接通知了,但是没完成,就不执行:
至此写操作和添加监听操作完成了,接下去就是要flush啦,我们下一篇讲吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。