2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104175253

有待处理的任务invokeHandlerAddedIfNeeded

触发通道未注册时候添加的处理器的handlerAdded事件:

        final void invokeHandlerAddedIfNeeded() {
            assert channel.eventLoop().inEventLoop();
            if (firstRegistration) {//只处理一次
                firstRegistration = false;
    
                callHandlerAddedForAllHandlers();
            }
        }

这方法哪里调用呢,当然是注册完了就调用啦:

202309132159162481.png

callHandlerAddedForAllHandlers

    //回调待添加的所有处理器HandlerAdded方法
        private void callHandlerAddedForAllHandlers() {
            final PendingHandlerCallback pendingHandlerCallbackHead;
            synchronized (this) {//需要同步,只能执行一次
                assert !registered;
    
                // This Channel itself was registered.
                registered = true;
    
                pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
                // Null out so it can be GC'ed.便于回收
                this.pendingHandlerCallbackHead = null;
            }
    
            // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
            // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
            // the EventLoop. 需要在synchronized (this)外,否则在其他线程中的处理器的handlerAdded方法又添加另外一个处理器,会对管道对象进行synchronized加锁,于是就死锁了,卡在这里了
            PendingHandlerCallback task = pendingHandlerCallbackHead;
            while (task != null) {//遍历链表执行
                task.execute();
                task = task.next;
            }
        }

PendingHandlerCallback待处理任务

其实就是一个链表的结点结构,但是有个抽象的执行方法需要子类实现,因为有添加和删除,执行实现是不一样的:

    //待处理任务
        private abstract static class PendingHandlerCallback implements Runnable {
            final AbstractChannelHandlerContext ctx;
            PendingHandlerCallback next;//单链表下一个
    
            PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            abstract void execute();
        }

PendingHandlerAddedTask待处理添加事件任务

可以看到最后处理还是调用callHandlerAdded0

    //待添加的处理器任务
        private final class PendingHandlerAddedTask extends PendingHandlerCallback {
    
            PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
                super(ctx);
            }
    		//给执行器执行的
            @Override
            public void run() {
                callHandlerAdded0(ctx);
            }
    		
            @Override
            void execute() {
                EventExecutor executor = ctx.executor();
                if (executor.inEventLoop()) {//为了防止多线程问题,只用单线程
                    callHandlerAdded0(ctx);//直接触发
                } else {
                    try {
                        executor.execute(this);//让执行器处理
                    } catch (RejectedExecutionException e) {
                        if (logger.isWarnEnabled()) {
                            logger.warn(
                                    "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                    executor, ctx.name(), e);
                        }
                        atomicRemoveFromHandlerList(ctx);
                        ctx.setRemoved();
                    }
                }
            }
        }

PendingHandlerRemovedTask 待处理删除事件任务

和添加类似,所以就不多说了:

    //待删除的任务
        private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
    
            PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            public void run() {
                callHandlerRemoved0(ctx);
            }
    
            @Override
            void execute() {
                EventExecutor executor = ctx.executor();
                if (executor.inEventLoop()) {
                    callHandlerRemoved0(ctx);
                } else {
                    try {
                        executor.execute(this);
                    } catch (RejectedExecutionException e) {
                        if (logger.isWarnEnabled()) {
                            logger.warn(
                                    "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
                                            " removing handler {}.", executor, ctx.name(), e);
                        }
                        // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
                        ctx.setRemoved();
                    }
                }
            }
        }
    }

入站事件的一些方法

入站事件有很多个,都是类似的原理:

202309132159172472.png

202309132159182793.png

传递入站事件fireChannelRead(Object msg)

可以看到,都是从head开始传递事件,我拿一个典型的fireChannelRead(Object msg)分析:

     @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }

AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

调用传入上下文的invokeChannelRead方法:

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//看是不是引用计数接口类型,不是就直接返回,是就返回相应的接口类型
            EventExecutor executor = next.executor();//获取next的执行器
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);//如果执行器线程就是当前线程,就调用管道上下文的处理方法
            } else {//否则给executor提交任务
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }

AbstractChannelHandlerContext的invokeChannelRead(Object msg)

获取处理器,触发相应事件:

        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);//调用通道上下文的channelRead方法
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }

head的channelRead

其实他什么都没做,就是向后传递:

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.fireChannelRead(msg);//触发fireChannelRead
            }

AbstractChannelHandlerContext的fireChannelRead

这个我前面有讲过,就是获取相应入站MASK_CHANNEL_READ的处理器上下文,然后调用invokeChannelRead获取相应的处理器来处理读事件:

        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
            return this;
        }
    //获取处理相应事件的入站处理器上下文
        private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;//寻找下一个能处理相应事件的
            } while ((ctx.executionMask & mask) == 0);//是否处理该mask
            return ctx;
        }

其实只要在处理器里调用 ctx.fireChannelRead就可以把事件往后传,很灵活,而且你再调用出站的方法,又会往前传递了,出站事件的原理类似,我就不多讲了。

HeadContext的作用

我们继续上一篇,我们知道,在管道的头尾是不一样的处理器上下文和处理器,他们都集中于一个类啦,我们先来看下头上下文:

202309132159193494.png
基本上把我上一篇介绍的接口和上下文抽象类都涵盖了,所以他自身既是上下文,也是出入站处理器。

构造方法

先调用父类的上下文构造方法,然后获得通道的unsafe对象,后续的一些操作都是他做的,然后设置添加完成,因为他是在管道构造函数中创建的,所以创建了就是添加完成的:

    		private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, HeadContext.class);
                unsafe = pipeline.channel().unsafe();//通道的unsafe
                setAddComplete();
            }

接下去是他的处理器就是自身,然后添加和删除事件都是空实现:

202309132159208675.png

channelRegistered

通道注册事件里会调用一次invokeHandlerAddedIfNeeded,检查是否有处理器的待添加任务,然后就往后传递了,其实一般情况下,没做什么事,只是做了向后传递:

            @Override
            public void channelRegistered(ChannelHandlerContext ctx) {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }

channelUnregistered

通道卸载事件先会进行传递,如果最后通道关闭了就销毁所有的处理器,其实也就是删除每一个处理器上下文,除了头和尾节点外:

            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) {
                ctx.fireChannelUnregistered();
    
                // Remove all handlers sequentially if channel is closed and unregistered.
                if (!channel.isOpen()) {
                    destroy();
                }
            }

channelActive

先往后传递通道激活事件,最后判断是否可以自动读,因为通道注册完了之后就会触发激活,所以就可以开始读取数据了,读取到有连接也是读取事件:

     @Override
            public void channelActive(ChannelHandlerContext ctx) {
                ctx.fireChannelActive();
    
                readIfIsAutoRead();
            }

readIfIsAutoRead

如果配置了自动读,就会开始去设置监听通道事件:

       private void readIfIsAutoRead() {
                if (channel.config().isAutoRead()) {
                    channel.read();
                }
            }

默认是自动读的:

202309132159224666.png
调用通道的read,还是会调用管道的read

202309132159235297.png
最终调用了尾结点tailread

202309132159240588.png
其实也就是AbstractChannelHandlerContextread

     @Override
        public ChannelHandlerContext read() {
        //从尾到头找到一个出站的读,开始初始化的时候next就是head,如果你自定了,没处理好的话,可能后面就读不到数据了
            final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {//如果是同一个线程,就开始读
                next.invokeRead();
            } else {//否则就添加一个任务
                Tasks tasks = next.invokeTasks;
                if (tasks == null) {
                    next.invokeTasks = tasks = new Tasks(next);
                }
                executor.execute(tasks.invokeReadTask);
            }
    
            return this;
        }

202309132159247499.png
最终还是回到headread
其实这个很重要,如果你自定义的出站处理器的read方法没有处理好,那后面可能就读不到数据了,所以一般还是交给unsafe去处理读:

     @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
     @Override
            public final void beginRead() {
                assertEventLoop();
    
                if (!isActive()) {
                    return;
                }
    
                try {
                    doBeginRead();
                } catch (final Exception e) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireExceptionCaught(e);
                        }
                    });
                    close(voidPromise());
                }
            }

最后其实就是这段,以前分析过,就是初始化的时候事件循环执行的最后一个任务,其实就是设置相应的监听事件,当然可以监听读写事件:

     @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }

channelReadComplete

刚好一起把这个也说了,等读数据处理完了,会再次去监听:

    @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.fireChannelReadComplete();
                //传递读事件完成后自动读
                readIfIsAutoRead();
            }

所以这里要注意就是自己自定义的处理器不要随意去覆盖read方法,因为处理器上下文的read方法会从尾部开始遍历,找到第一个可以处理read的处理器,如果是你自定义的,你又没处理好,那就可能再也收不到消息了。
我画了一个headread设置监听的大致情况:

2023091321592564910.png
这里要注意,就是无论是接受连接还是读取数据,都是属于读的,只是设置的事件标记不一样。

篇幅有点长了,下一篇继续吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文