本文地址: juejin.im/post/684490…
说在前面
本文的 Netty源码使用的是 4.1.31.Final
版本,不同版本会有一些差异.
JDK Future
在说Netty的异步Future之前,先简单介绍一下JDK自带的Future机制.
首先先上一段代码
public class JDKFuture {
static ExecutorService executors = new ThreadPoolExecutor(1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(16));
public static void main(String[] args) throws Exception{
int cnt = 1;
Future[] jdkFuture=new Future[cnt];
Object jdkFutureResult;
for(int i = 0;i < cnt; i++){
jdkFuture[i] = executors.submit(new JDKCallable(i));
}
System.out.println(String.format("%s 在 %s 即将获取任务执行结果", Thread.currentThread(), new Date()));
jdkFutureResult = jdkFuture[0].get();
System.out.println(String.format("%s 在 %s 任务结果获取完毕 %s", Thread.currentThread(), new Date(), jdkFutureResult));
executors.shutdown();
}
static class JDKCallable implements Callable{
int index;
JDKCallable(int ind){
this.index = ind;
}
public Object call() throws Exception {
try {
System.out.println(String.format("线程 [%s] 提交任务[%s]", Thread.currentThread(), this.index));
// 耗时2秒,模拟耗时操作
Thread.sleep(2000);
System.out.println(String.format("线程 [%s] 执行任务[%s]执行完毕", Thread.currentThread(), this.index));
}catch(InterruptedException e){
e.printStackTrace();
}
return String.format("任务%s执行结果",this.index);
}
}
}
输出结果为:
线程 [Thread[pool-1-thread-1,5,main]] 提交任务[0]
Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即将获取任务执行结果
线程 [Thread[pool-1-thread-1,5,main]] 执行任务[0]执行完毕
Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任务结果获取完毕 任务0执行结果
可以看到主线程在使用 future.get()
的时候,因为子线程还未处理完返回结果而导致主线程活生生的等了2秒钟(耗时操作),这也是JDK自带的Future机制不够完善的地方.因为jdk自身的future机制不够完善,所以Netty自实现了一套Future机制.
Netty 异步Future/Promise
Netty的Future是异步的,那他是怎么实现的呢?接下来就从源码开始探究.
先看一下 Netty 的 Future
和 Promise
这两个接口
Future
/**
* The result of an asynchronous operation
* 异步操作的结果
* 对状态的判断、添加listener、获取结果
*/
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
Promise
Promise是一个特殊的Future,它可写,可写意味着可以修改里面的结果.
/**
* Special {@link Future} which is writable.
* 一个可写的特殊的Future
* 继承 Future, 继承的方法就不列出
*/
public interface Promise<V> extends Future<V> {
/**
* Marks this future as a success and notifies all
* listeners.
* If it is success or failed already it will throw an {@link IllegalStateException}.
* 将这个 future 标记为 success 并且通知所有的 listeners
* 如果已经成功或者失败将会抛出异常
*/
Promise<V> setSuccess(V result);
/**
* Marks this future as a success and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
* 尝试设置结果,成功返回true, 失败 false, 上面的方法设置失败会抛出异常
*/
boolean trySuccess(V result);
// 这2个跟上面的差不多
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
/**
* Make this future impossible to cancel.
*
* @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
* without being cancelled. {@code false} if this future has been cancelled already.
*/
boolean setUncancellable();
}
源码解读
看到这里都同学都默认是用netty写过程序的~,还没写过的话可以看看官方文档或者我的另一篇Netty使用.
接下来就开始源码的解读.
那么从哪里开始呢?
总所周知!
,我们使用Netty开发的时候,写出数据用的是 writeAndFlush(msg)
, 至于 write(msg)
嘛, 不就是少了个 flush (没错,是我比较懒).
开始
在大家知道 channel().write
和 ctx.write
的区别后, 我们就从 channel().write
开始讲起.
不行,我感觉还是要说一下一些补充的,不然心里不舒服.
Netty中有一个pipeline
,也就是事件调用链,开发的时候在调用链里面加入自己处理事件的handle,但是在这条 pipeline 中, Netty给我们加上了 Head
和 tail
这两个handle,方便Netty框架处理事件.
先看 DefaultChannelPipeline 的初始化,在初始化代码里给我们添加了2个handle, head 和 tail, 这2个东西很有用,为什么这么说呢?详情看后面解答
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
// ChannelInboundHandler
this.tail = new DefaultChannelPipeline.TailContext(this);
// ChannelInboundHandler && ChannelOutboundHandler
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
Real 开始
没错,还是从 channel().write(msg)
开始说起(为什么我要用还是).
跟踪代码 channel().write(), 首先会调用到 DefaultChannelPipeline的 writeAndFlush 方法.
1.DefaultChannelPipeline#writeAndFlush
public final ChannelFuture writeAndFlush(Object msg) {
return this.tail.writeAndFlush(msg);
}
this.tail 就是上面构造函数里面初始化的 tailHandle
, 而 write
是出栈事件, 会从 tailHandle 开始往前传递,最后传递到 headHandle
(怎么感觉好像提前剧透了).
public ChannelFuture writeAndFlush(Object msg) {
// 这里new了一个 promise, 然后这个promise将会一直传递,一直传递.....
return this.writeAndFlush(msg, this.newPromise());
}
接下来来到了AbstractChannelHandlerContext
的 writeAndFlush.
/**
* 执行 write and flush 操作
* @param msg
* @param promise
*/
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
// 这个方法在 ChannelHandler#handlerAdded 调用后,才会返回 true
if (invokeHandler()) {
// write 继续传递
invokeWrite0(msg, promise);
// flush data
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 查找下一个 OutboundHandle, 因为是要输出
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
// 下一个 OutboundHandle 所在的线程
EventExecutor executor = next.executor();
// 如果在是同一个线程(由于Netty的channel在一个ThreadPool中只绑定一个Thread, 不同线程的话也意味着是不同线程池)
if (executor.inEventLoop()) {
// 在同一个线程池(这里意味着同一个线程)中,
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 在不同线程池(不同线程池那自然就是不同线程),需要创建一个任务,提交到下一个线程池
final AbstractWriteTask task;
if (flush) {
// 提交给下一个线程池 && flush
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
// 因为是 write 事件, so 接下来提交任务到下一个 OutboundHandle(出栈) 所在的线程, 由它执行
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
// 任务提交失败,取消任务
task.cancel();
}
}
}
2.HeadContext#write、flush
接下来本篇文章最重要的地方了, HeadContext !
HeadContext的write和flush方法 实际上都是调用 unsafe的方法实现.
write
// 如果是 writeAndFlush ,调用 write后会调用flush
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 这个调用 AbstrachUnsafe.write
unsafe.write(msg, promise);
}
// 这是 unsafe 的 write 方法
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// outboundBuffer = null 表明 channel已经关闭并且需要将 future 结果设置为 false
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 将 msg添加进 buffer 中
outboundBuffer.addMessage(msg, size, promise);
}
flush
如果是WriteAndFlush, 则在调用write后,会调用Head的flush方法,同 write是调用AbstractUnsafe的flush
/**
* write 之后再调用这个 flush
*/
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// buffer 标记为可以被 flush
outboundBuffer.addFlush();
// 接下来就是真正的 flush
flush0();
}
ChannelOutboundBuffer 是个啥呢?
ChannelOutboundBuffer 简单来说就是存储当前channel写出的数据
, 并且在调用flush的时候将他们都写出去.
跟着源码一直走,在 flush0
之后,最终会调用到 AbstractNioMessageChannel#doWrite
方法.(上面还有doRead方法,是接收数据的时候调用的)
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
//
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
// 判断写事件
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
// 循环写出数据
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
// 真正的写出数据
// 最终会调用 javaChannel().send(nioData, mi);
// 很眼熟吧,这个是java nio的方法,注册的时候也是javaChannel().register()
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
// 成功写出,从 buffer 中移除刚才写出的数据
if (done) {
in.remove();
} else {
// Did not write all messages.
// 写出失败
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
// 出错后是否继续写出后面的数据
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
3.Promise
到上面位置,数据是写出去了,那promise的相关作用呢?没看出来啊?
说实话,这个藏得挺深,居然! 放在了 buffer.remove() 里
!
public boolean remove() {
// 刚写出去数据的Entry
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
// 这个就是writeAndFlush 的时候 new 的 DefaultPromise()
ChannelPromise promise = e.promise;
int size = e.pendingSize;
// buffer 中移除
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
// !!! 划重点 !!!
// 这里设置了 promise 的结果, 调用了 trySuccess, 通知所有 listener
// !!! 划重点 !!!
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
// 重置Entry的信息,方便重用.
// 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相对应, newInstance 是获取一个缓存的 Entry
e.recycle();
return true;
}
promise 通知所有 listener 是在写数据成功,并且在 buffer.remove()
调用的时候在里面 safeSuccess(promise)
, 最终调用 Promise 的 trySuccess()
从而触发 notifyListeners()
通知所有 listeners.
4.NotifyListener
这个是在 Promise#trySuccess的时候调用的,通知所有listeners操作已经完成.
/**
* 通知监听者,任务已经完成
*/
private void notifyListeners() {
// 获取future所属线程(池)
EventExecutor executor = executor();
// 执行通知是当前线程 则直接回调信息
// currentThread == this.executor
if (executor.inEventLoop()) {
// 获取 ThreadLocal 变量
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// listen 的层级数
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 通知所有的 listener
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 如果 executor 不是当前线程, 则交给 future 所属 executor 去执行
// 意思是添加通知的 executor 可能是前面的 executor , 然后到后面的 executor 也就是当前线程才执行通知
// 此时将通知交回给之前的 executor
// 执行通知的不是当前线程, 封装成一个任务, 由之前提交的线程完成通知(回调)
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
总结
Netty 的 Future 异步机制是在操作完成后,将通知封装成Task,由Promise所属线程(Executors)执行.