Netty4.1源码阅读——核心(DefaultChannelPipeline)

 2023-01-29
原文作者:飞天御剑流 原文地址:https://juejin.cn/post/6932740953451003911

前言

本想从Bootstrap开始,跟着netty的启动流程一路分析下去,但是netty的系统的确复杂,在阅读了整个流程后,决定从ChannelPipeline开始,ChannelPipeline的特点明显而方法不多,是整个netty的大动脉。

正文

netty示例代码

    EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
    
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new HttpResponseEncoder());
                            ch.pipeline().addLast(new 业务逻辑())
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = server.bind(8080).sync();
            f.channel().closeFuture().sync();

注意initChannel这个方法,我们调用了addLast的方法,给ChannelPipeline添加了解码器,一般在最后会添加一个自定义业务逻辑。

按照我们对这个方法的预期设想,ChannelPipeline内部应该是链表组成,我们不断添加节点,当请求到来的时候,会再每一层调用相应的方法直到尾部

ChannelPipeline接口

ChannelPipeline继承关系

202212302223438971.png

ChannelInboundInvoker和ChannelOutboundInvoker分别代表了入站事件和出站事件,非常明显的是作为服务器或者客户端要去交互,必须得有消息进入和发出;而*Iterable<Entry<String, ChannelHandler>>*表示了ChannelPipeline是一个可迭代的类,内部的链表节点是ChannelHandler

ChannelInboundInvoker

    public interface ChannelInboundInvoker {
    
        /**
         * Channel注册到EventLoop上调用
         * 调用这个方法会链式调用ChannelInboundHandler的channelRegistered方法
         */
        ChannelInboundInvoker fireChannelRegistered();
    
        /**
         * Channel取消注册注册到EventLoop上调用
         * 调用这个方法会链式调用ChannelInboundHandler的channelUnregistered方法
         */
        ChannelInboundInvoker fireChannelUnregistered();
    
        /**
         * Channel处于激活状态,代表连接已经建立完成
         * 调用这个方法会链式调用ChannelInboundHandler的channelActive
         */
        ChannelInboundInvoker fireChannelActive();
    
        /**
         * fireChannelActive的反义
         */
        ChannelInboundInvoker fireChannelInactive();
    
        /**
         * 异常相关 不重要
         */
        ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    
        ChannelInboundInvoker fireUserEventTriggered(Object event);
    
        /**
         * Channel接收到了消息
         * 调用这个方法会链式调用ChannelInboundHandler的channelRead
         */
        ChannelInboundInvoker fireChannelRead(Object msg);
    
        /**
         * 阅读完毕,同上
         */
        ChannelInboundInvoker fireChannelReadComplete();
        
        ChannelInboundInvoker fireChannelWritabilityChanged();
    }

根据我写的注释,可以看到ChannelInboundInvoker会去调用ChannelInboundHandler里面的方法,可以猜到ChannelPipeline内部链表有两种类型: Inbound和Outbound ,而Invoker接口方法的含义是一个入口,会链式调用相同类型的节点

ChannelOutboundInvoker

202212302223444522.png

ChannelOutboundInvoker里面的方法我们很熟悉了,其实就传统IO的方法

In/Out总结

看一下netty官方的注释

     *                                                 I/O Request
     *                                            via {@link Channel} or
     *                                        {@link ChannelHandlerContext}
     *                                                      |
     *  +---------------------------------------------------+---------------+
     *  |                           ChannelPipeline         |               |
     *  |                                                  \|/              |
     *  |    +---------------------+            +-----------+----------+    |
     *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  .               |
     *  |               .                                   .               |
     *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
     *  |        [ method call]                       [method call]         |
     *  |               .                                   .               |
     *  |               .                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  +---------------+-----------------------------------+---------------+
     *                  |                                  \|/
     *  +---------------+-----------------------------------+---------------+
     *  |               |                                   |               |
     *  |       [ Socket.read() ]                    [ Socket.write() ]     |
     *  |                                                                   |
     *  |  Netty Internal I/O Threads (Transport Implementation)            |
     *  +-------------------------------------------------------------------+

根据官方的注释以及这两个接口的方法可以总结以下几点:

  • Inbound和Outbound的调用方向是不同的
  • Inbound是一个通知事件,表示某一件事情已经就绪
  • Outbound是一个请求事件,对应了传统I/O的请求

通过Inbound方法,可以总结出Channel的几个生命周期

  • channelRegistered Channel已经注册到一个EventLoop
  • channelActive Channel是活跃的,可以收发消息了
  • channelInactive Channel是非活跃的
  • channelUnregistered Channel已经创建,但是没有注册

按照给出的顺序,这几个状态是以该顺序循环转换的,而ChannelPipeline收到状态变化请求的时候,会链式调用Inbound类型的节点,可以自定义对该状态感兴趣的代码来处理。

DefaultChannelPipeline

了解到了ChannelPipeline接口方法的含义,看一下实现类DefaultChannelPipeline的细节

     final AbstractChannelHandlerContext head;
     final AbstractChannelHandlerContext tail;
    
    protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }

用AbstractChannelHandlerContext类型的head和tail来表示链表的头节点和尾节点,而head和tail却又不是标准的AbstractChannelHandlerContext实现类,而是内部实现的HeadContext和TailContext

HeadContext

    final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, HeadContext.class);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }

HeadContext既实现了ChannelOutboundHandler又实现了ChannelInboundHandler

    @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
    @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                ctx.fireChannelInactive();
            }

HeadContext作为一个Outbound,将实现交给unsafe处理,可以把unsafe理解为java底层I/O交互方式;作为一个Inbound,仅仅是把这个请求传递下去。

TailContext

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
            TailContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, TAIL_NAME, TailContext.class);
                setAddComplete();
            }
    
            @Override
            public ChannelHandler handler() {
                return this;
            }
    
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) { }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelActive();
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                onUnhandledInboundChannelInactive();
            }

tail作为尾节点,只继承了ChannelInboundHandler,而且里面的方法都是空方法。

有HeadContext和TailContext内部可以看出来:

Inbound是由head到tail,head用于传递,tail进行收尾不负责任何实现

OutBound是由tail到head,head交给unsafe处理

DefaultChannelPipeline方法

看一下最常用的addLast

    public final ChannelPipeline addLast(ChannelHandler handler) {
            return addLast(null, handler);
        }
    
    
    @Override
        public final ChannelPipeline addLast(String name, ChannelHandler handler) {
            return addLast(null, name, handler);
        }
    
    @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
            ObjectUtil.checkNotNull(handlers, "handlers");
    
            for (ChannelHandler h: handlers) {
                if (h == null) {
                    break;
                }
                addLast(executor, null, h);
            }
    
            return this;
        }

可以看出来addLast可以指定group和name

    @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                //检查是否有重复的
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventLoop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    callHandlerAddedInEventLoop(newCtx, executor);
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }

流程如下:

1、检查重复性

    private static void checkMultiplicity(ChannelHandler handler) {
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
                if (!h.isSharable() && h.added) {
                    throw new ChannelPipelineException(
                            h.getClass().getName() +
                            " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                h.added = true;
            }
        }

同一个ChannelHandler用add来标记是否添加,一般不支持重复添加,可以添加@Sharable注解来支持

    public boolean isSharable() {
            Class<?> clazz = getClass();
            Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
            Boolean sharable = cache.get(clazz);
            if (sharable == null) {
                sharable = clazz.isAnnotationPresent(Sharable.class);
                cache.put(clazz, sharable);
            }
            return sharable;
        }

2、根据handler创建一个AbstractChannelHandlerContext

     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }

3、调用addLast0将节点添加到链表中

    private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }

4、触发添加成功的回调callHandlerAdded0,如果这个管道还没有注册,就延迟回调;如果不是当前的eventLoop,开辟一个新线程回调

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            try {
                ctx.callHandlerAdded();
            } catch (Throwable t) {
                boolean removed = false;
                try {
                    atomicRemoveFromHandlerList(ctx);
                    ctx.callHandlerRemoved();
                    removed = true;
                } catch (Throwable t2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                    }
                }
    
                if (removed) {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; removed.", t));
                } else {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; also failed to remove.", t));
                }
            }
        }
    
    final void callHandlerAdded() throws Exception {
            //在调用handlerAdded之前,必须先调用setAddComplete。否则,如果handlerAdded方法生成
            // 任何管道事件ctx.handler()都会错过它们,因为状态不允许这样做。
            if (setAddComplete()) {
                handler().handlerAdded(this);
            }
        }

调用自定义的handlerAdded方法,如果其中抛出了异常,会将这个节点给删除了,并且触发删除回调

    private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
            AbstractChannelHandlerContext prev = ctx.prev;
            AbstractChannelHandlerContext next = ctx.next;
            prev.next = next;
            next.prev = prev;
        }

删除节点


看一下fire开头的方法,前面提到过,例如fireChannelActive是用于触发Channel生命周期事件,而生命周期的开始是HeadContext传递。看一下fireChannelActive实现细节可以知道其他几个fire的细节了

    @Override
        public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }

调用invokeChannelActive,传入的参数是head,表示从head开始

    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }

调用head的invokeChannelActive方法,这个方法是AbstractChannelHandlerContext的公有方法

    private void invokeChannelActive() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelActive(this);
                } catch (Throwable t) {
                    invokeExceptionCaught(t);
                }
            } else {
                fireChannelActive();
            }
        }

调用head的channelActive方法

    @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
            }

传入的ctx是head自身,所以又回到了AbstractChannelHandlerContext的 fireChannelActive

    @Override
        public ChannelHandlerContext fireChannelActive() {
            invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
            return this;
        }
    
    //找到Inbound
        private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            EventExecutor currentExecutor = executor();
            do {
                ctx = ctx.next;
            } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
            return ctx;
        }
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelActive();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelActive();
                    }
                });
            }
        }

注意findContextInbound,这个方法就是找到下一个Inbound节点,不断用next遍历整个链表节点来找到属于Inbound的,然后又回到invokeChannelActive来调用channelActive

Inbound是不断next找到下一个,而Outbound就是从tail出发,不断的prev遍历直到head,实现方法大同小异


     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }

注意到新建AbstractChannelHandlerContext的代码中,ChannelPipeline将自己放入的了context中,这表示我们可以自定义ChannelHandler,并且调用ChannelPipeline去动态删除Handler

比如需要一次性handler,可以在自定义方法里面使用后后用remove掉自己

ChannelInitializer细节

在使用ServerBootstrap的时候,添加自定义handler的时候是new了一个ChannelInitializer类,并重写了initChannel方法,但实现很简单

    protected abstract void initChannel(C ch) throws Exception;
    
        @Override
        @SuppressWarnings("unchecked")
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            if (initChannel(ctx)) {
    
                ctx.pipeline().fireChannelRegistered();
                removeState(ctx);
            } else {
                ctx.fireChannelRegistered();
            }
        }

ChannelInitializer是一个Inbound添加到head后面,head将注册事件传到下一个Inbound的时候会调用initChannel方法

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.add(ctx)) {
                try {
                    initChannel((C) ctx.channel());
                } catch (Throwable cause) {
                    exceptionCaught(ctx, cause);
                } finally {
                    ChannelPipeline pipeline = ctx.pipeline();
                    if (pipeline.context(this) != null) {
                        pipeline.remove(this);
                    }
                }
                return true;
            }
            return false;
        }

initChannel会调用我们重写的initChannel(C ch)方法,initMap是为了防止重复调用,当initChannel调用完成后,finally处会将自身remove掉,这也是上面提到了动态删除。

总结

DefaultChannelPipeline内部由于AbstractChannelHandlerContext的链表组成,而链表又分为ChannelOutboundHandler和ChannelInboundHandler,根据不同的事件找到不同的bound链式调用方法,这也是ChannelPipeline的作用。