Netty源码深度解析-Pipeline(1) Pipeline的构造

 2023-01-28
原文作者:王建新199003 原文地址:https://juejin.cn/post/6969167133930422309

导读

原创文章,转载请注明出处。

本文源码地址:netty-source-code-analysis

本文所使用的netty版本4.1.6.Final:带注释的netty源码

Pipeline这个词翻译过来就是“流水线”的意思,读到这里有了解过设计模式的同学应该已经想到了,这里用到的是“责任链模式”。本文我们将以DefaultChannelPipeline为例看一下Pipeline的构造以及其中重要的数据结构。

1 和Pipeline相关的其他组件

1.1 ChannnelHandler

这是ChannelHandler中的注释,翻译过来就是“处理IO事件或者拦截IO操作,并且将其向ChannelPipeline中的下一个handler传递”,说白了就是在责任链中注册的一系列回调方法。

Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in its ChannelPipeline

这里的I/O event就是很多书中提到的“入站事件”,而I/O operation就是很多书中提到的“出站事件”,前面我说过,这里我并不准备这么叫,按我的理解我习惯把这两者称之为“事件”和“命令”。很显然这里eventoperation的含义是不一样的,event更多地多表示事件发生了,我们被动地收到,而operation则表示我们主动地发起一个动作或者命令。

1.2 ChannelHandlerContext

每一个ChannelHandler在被添加进ChannelPipeline时会被包装进一个ChannelHandlerContext。有两个特殊的ChannelHandlerContext除外,分别是HeadContextTailContextHeadContext继承了ChannelInBoundHandlerChannelOutBoundHandler,而TailContext继承了ChannelInBoundHandler。 每个ChannelHandlerContext中有两个指针nextprev,这是用来将多个ChannelHandlerContext构成双向链表的。

2 Pipeline的构造方法

我们以DefaultChannelPipeline为例,从它的构造方法开始。这里首先将Channel保存到Pipeline的属性中,又初始化了两个属性succeedFuturevoidPromise。这是两个特殊的可以共享的Promise,这两个Promise不是重点,不理解也没关系。

接下来的tailhead是两个特殊的ChannelHandlerContext,这两个是Pipeline中的重要组件。

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }

Pipeline在执行完构造方法以后的结构如下图所示,headtail构成了最简单的双向链表。

202212302216278641.png 图中蓝色填充的就是ChannelHandlerContext,目前只有HeadContextTailContextChannelHandlerContext中的较窄的矩形表示ChannelHandler,由于HeadContextTailContext并没有包含ChannelHandler,而是继承ChannelHandler,所以这里我们用虚线表示。上下贯通的ChannelHandler表示既是ChannelInBoundHandler又是ChannelOutBoundHandler,只有上半部分的表示是ChannelInBoundHandler,只有下半部分的表示是ChannelOutBoundHandler

3 添加ChannelHandler

ChannelPipeline中有很多以add开头的方法,这些方法就是向ChannelPipeline中添加ChannelHandler的方法。

  • addAfter:向某个ChannelHandler后边添加
  • addBefore:向某个ChannelHandler前面添加
  • addFirst:添加到头部,不能在head的前面,而是紧挨着head,在head的后面
  • addLast:添加到尾部,不能在tail的后面,而是紧挨着tail,在tail的前面

我们以最常用的的addLast方法为例来分析一下Pipeline中添加ChannelHandler的操作。 这里所贴出的addLast方法其实我们已经在“服务端启动流程”这篇文章中打过照面了。方法参数中的EventExecutorGroup意味着我们可以为这个ChannelHandler单独设置Excutor而不使用Channel所绑定的EventLoop,一般情况下我们不这么做,所以group参数为null

这里先把ChannelHandler包装成ChannelHandlerContext,再添加到尾部,随后调用ChannelHandlerHandlerAdded方法。

在调用HandlerAdded方法时有一点问题,添加ChannelHandler的操作不需要在EventLoop线程中进行,而HandlerAdded方法则必须在EventLoop线程中进行。也就是说存在添加Handler时还未绑定EventLoop的情况,此时则调用newCtx.setAddPending()将当前HandlerContext设置为ADD_PENDING状态,并且调用callHandlerCallbackLater(newCtx, true)将一个异步任务添加到一个单向队链表中,即pendingHandlerCallbackHead这个链表。

如果当前已经绑定了EventLoop,则看当前调用线程是否为EventLoop线程,如果不是则向EventLoop提交一个异步任务调用callHandlerAdded0方法,否则直接调用callHandlerAdded0方法。

下面咱们依次分析一下newContextcallHandlerCallbackLatercallHandlerAdd0方法。

    @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
               //先把`handler`包装成`HandlerContext`
                newCtx = newContext(group, filterName(name, handler), handler);
                //添加到尾部
                addLast0(newCtx);
                //如果还未绑定`EventLoop`则稍后再发起对`HandlerAdded`方法的调用。
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
                //如果已经绑定了EventLoop,并且当前线程非EventLoop线程的话就提交一个异步任务,就发起一个异步任务去调用HandlerAdded方法。
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            //如果当前线程是EventLoop线程,就直接调用HandlerAdded方法。
            callHandlerAdded0(newCtx);
            return this;
        }

3.1 newContext

先看来一下newContext方法,这里直接调用了DefaultChannelHandlerContext的构造方法,咱们跟进去看看。

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

DefaultChannelHandlerContext的构造方法中又调用了父类AbstractChannelHandlerContext的构造方法,保存了handler属性。在调用父类构造方法之前调用了isInboudisOutbound方法判断当前的Handler是否为ChannelInBoundHandler或者ChannelOutBoundHandler,这两个方法很简单,不再展开。

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

接下来看AbstractChannelHandlerContext的构造方法,这里非常简单,保存了几个属性,咱们看一下ordered这个属性。ordered表示EventExecutor在执行异步任务时是否按添加顺序执行,这里一般情况下executornull,表示使用Channel所绑定的EventLoop线程,而EventLoop线程都是OrderedEventExecutor的实现类。所以这里我们不考虑orderedfalse的情况。

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

上面提到了ChannelHandlerContext可以在构造方法里单独指定EventExecutor,如果没有单独指定的话就使用Channel所绑定的EventLoop,代码在哪里呢,就在AbstractChannelHandlerContext#executor方法,非常简单,如果没有为当前ChannelHandler指定excutor则返回Channel所绑定的EventLoop

    @Override
    public EventExecutor executor() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }

3.2 callHandlerCallbackLater

在添加完ChannelHandler之后将调用ChannledHandlerhandlerAdded方法,但是此时有可能还未绑定EventLoop,而handlerAdded方法的调用必须在EventLoop线程内执行,此时就需要调用callHandlerCallbackLater方法在pendingHandlerCallbackHead链表中添加一个PendingHandlerAddedTask

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
    
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

接下来咱们看一下PendingHandlerAddedTask的代码,逻辑在execute方法里,这里直接调用了callHandlerAdded0

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {
    
        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }
    
        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }
    
        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    
                }
            }
        }
    }

3.3 callHandlerAdded0

不管是在未绑定EventLoop的情况下延迟调用handlerAdded还是在已经绑定了EventLoop的情况下立即调用HandlerAdded,最终都会调用到callHandlerAdded0方法。这里干了两件事,一是调用ChannelHandlerhandlerAdded方法,二是将HandlerContext的状态设置为ADD_COMPLETE状态。

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
               
    }

3.4 添加多个ChannelHandler后的Pipeline

还记得咱们的“Netty整体架构图”吗,在这里咱们把Pipeline部分单独放大拿出来看一下,在添加完多个ChannelHandler之后,Pipeline的结构是这样的。

202212302216287652.png

4 删除ChannelHandler

Pipeline中有几个以remove开头的方法,这些方法的作用就是删除ChannelHandler

  • remove(ChannelHandler handler):从headtail查找,用==判断是否为同一实例,只删除第1个。
  • remove(Class<T> handlerType):从headtail查找,用isAssignableFrom方法判断是否为符合条件的类型,只删除第1个。
  • remove(String name):从headtail查找,用name精确匹配查找,只删除第1个,因为name不能重复,所以这里删除第1个也是唯一的1个。
  • removeFirst:删除head的后一个,不能删除tail
  • removeLast:删除tail的前一个,不能删除head

上述无论哪种删除方式在查找到对应的HandlerContext后都会调用到remove(final AbstractChannelHandlerContext ctx)方法,查找过程比较简单,咱们不再展开,直接看remove(final AbstractChannelHandlerContext ctx)方法。

看看这个方法的实现,是不是和addLast(EventExecutorGroup group, String name, ChannelHandler handler)很相似,非常相似。首先从双向链表中删除ChannelHandlerContext,再调用callHandlerRemoved0方法,callHandlerRemoved0方法内会调用handlerRemoved方法,这个调用必须在EventLoop线程内进行。如果删除时还未绑定EventLoop则添加一个异步任务到链表pendingHandlerCallbackHead中。

如果已经绑定了EventLoop并且当前线程非EventLoop线程则向EventLoop提交一个异步任务,否则直接调用callHandlerRemoved0方法。

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        synchronized (this) {
            //从双向链表中删除`ChannelHandlerContext`
            remove0(ctx);
            
            //如果还未绑定`EventLoop`,则稍后调用`handlerRemoved`方法
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }
            //如果已经绑定了`EventLoop`,但是当前线程非`EventLoop`线程的话,就发起一个异步任务调用callHandlerRemoved0方法
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        //如果当前线程就是`EventLoop`线程,则直接调用callHandlerRemoved0方法。
        callHandlerRemoved0(ctx);
        return ctx;
    }

callHandlerCallbackLater方法咱们前面已经分析过,和添加ChannelHandler时不同的是,这里向链表添加的是PendingHandlerRemovedTask,这个类也很简单,不再展开。

这里咱们只看一下callHandlerRemoved0方法。这个方法很简单,调用handlerRemoved方法,再把ChannelHandlerContext的状态设置为REMOVE_COMPLETE

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }

4 pendingHandlerCallbackHead链表中的任务什么时候调用

AbstractUnsaferegister0方法中,在绑定EventLoop以后,会调用pipeline.invokeHandlerAddedIfNeeded()方法,我们看一下pipeline.invokeHandlerAddedIfNeeded()方法。

    private void register0(ChannelPromise promise) {
    try {
        
        // 去完成那些在绑定EventLoop之前触发的添加handler操作,这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个链表
        pipeline.invokeHandlerAddedIfNeeded();
        
    }

invokeHandlerAddedIfNeeded方法调用了callHandlerAddedForAllHandlers方法,咱们接着看下去。

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
    
            callHandlerAddedForAllHandlers();
        }
    }

callHandlerAddedForAllHandlers方法的逻辑咱就不再展开来说了,非常简单,就是遍历pendingHandlerCallbackHead这个单向链表,依次调用每个元素的execute方法,并且清空这个单向链表。

    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
    
            registered = true;
    
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
    
            this.pendingHandlerCallbackHead = null;
        }
    
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

5 总结

Pipeline中的最重要的数据结构就是由多个ChannelHandlerContext组成的双向链表,而每个ChannelHandlerContext中包含一个ChannelHandlerChannelHandler既可以添加也可以删除。在Pipeline中有两个特殊的ChannelHandlerContext分别是HeadContextTailContext,这两个ChannelHandlerContext中不包含ChannelHandler,而是采用继承的方式。HeadContext实现了ChannelOutBoundHandlerChannelInBoundHandler,而TailContext实现了ChannelInBoundHandler


关于作者

王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。

原创文章,码字不易,点赞分享,手有余香。