2023-09-13  阅读(15)
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104069043

boss组如何将连接分发给worker组

我觉得我们可以先看一个简单的客户端连接到boss组的接收器分发,然后到客户端发送数据,worker组处理,大致的了解下,这样讲到细节的地方才好理解,当然也只是大致的流程,很多细节画出来就有点乱了,下面会讲一些,先上图吧:

202309132158024721.png

processSelectedKeys

当我们服务器启动后,就等待着事件,如果有连接事件了,就会去执行processSelectedKeys方法:
这里的selectedKeys是做了封装的,类型是SelectedSelectionKeySet,方便操作。

       private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();//优化处理
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }

processSelectedKeysOptimized

我抽出了部分重要的,他会取出key,然后取出附件ServerSocketChannelImpl,一起传入处理:

     private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                selectedKeys.keys[i] = null;//拿出来后设为null一旦通道关闭就可以释放,否则可能内存泄露
                final Object a = k.attachment();//从附件中获得通道
    			...
                processSelectedKey(k, (AbstractNioChannel) a);//处理key
    			,,,
    
               
            }
        }

processSelectedKey

主要的就是找到读或者事件,然后读取数据,这里的unsafeio.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe类型,通道创建的时候创建的:

     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
           	...
            try {
                int readyOps = k.readyOps();
    			...
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();//读取
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }

read

    
      private final List<Object> readBuf = new ArrayList<Object>();
    
      public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//缓冲区分配器
                allocHandle.reset(config);//重置参数
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);//读取数据到readBuf
    						...
                            allocHandle.incMessagesRead(localRead);//总共的消息数+1
                        } while (allocHandle.continueReading());//是否还要继续读取
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));//遍历传播读事件,参数是NioSocketChannel
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();//传播读完成事件
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                  
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }

读取连接的数据,封装成NioSocketChannel,然后进行管道的事件传递:

202309132158040772.png

doReadMessages

接受一个SocketChannel ,封装成NioSocketChannel然后加入缓冲区里:

      protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());//进行接收,返回SocketChannel
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));//创建NioSocketChannel
                    return 1;
                }
            } catch (Throwable t) {
               ...
            }
    
            return 0;
        }
SocketUtils.accept

其实这个最终就是调用了serverSocketChannel.accept(),只是现在已经有连接了,所以不会阻塞,直接返回一个SocketChannel了。

     public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
            try {
                return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                    @Override
                    public SocketChannel run() throws IOException {
                        return serverSocketChannel.accept();
                    }
                });
            } catch (PrivilegedActionException e) {
                throw (IOException) e.getCause();
            }
        }

pipeline.fireChannelRead(readBuf.get(i))

管道传递读事件,参数就是刚封装的NioSocketChannel,首先调用了通道上下文的invokeChannelRead方法,传入了头上下文head,也就是初始化时候的HeadContext,也就是从头开始传递:

        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

可以传入上下文对象和消息体,让上下文对象去传递消息体:

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//看msg是不是引用计数接口ReferenceCounted类型,不是就直接返回msg,其实是做资源泄露检测
            EventExecutor executor = next.executor();//获取next的执行器,如果为null就是通道的NioEventLoop
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);//如果执行器线程就是当前线程,就调用invokeChannelRead,传入NioSocketChannel
            } else {//否则给executor提交任务
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
HeadContext的invokeChannelRead(Object msg)

直接的上下文对象传递消息体:

     private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {//是否已经被添加到管道
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);//调用处理器的channelRead方法
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);//直接传递到下一个可以处理消息的通道上下文
            }
        }
HeadContext的channelRead(ChannelHandlerContext ctx, Object msg)

直接调用父类的方法,传递到下一个去:

       public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.fireChannelRead(msg);//直接传递到下一个可以处理消息的通道上下文
            }
AbstractChannelHandlerContext的fireChannelRead(final Object msg)
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
            return this;
        }
AbstractChannelHandlerContext的findContextInbound(int mask)

先看一个标记mask,不同的上下文可能有不标记,可以理解为响应不同的事件:

202309132158050363.png
定义了出站入站的总mask

202309132158061364.png
这个会在通道上下文初始化的时候创建:

202309132158071285.png
最终处理是这个方法,默认入站和出站都是全部的标记:

202309132158081286.png

202309132158093157.png
还有个判断函数,是否要略过,根据反射出的方法和类的标注,声明了Skip标注,才会去除某个mask事件:

202309132158103228.png
因此这个方法就会去找下一个能不能处理这个事件的入站上下文,找不到就返回自己:

       private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while ((ctx.executionMask & mask) == 0);//寻找下一个能处理相应事件的
            return ctx;
        }

这里就会去找HeadContext的下一个,也就是我们放进去的含有ServerBootstrapAcceptor的上下文DefaultChannelHandlerContext

又是AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

这次的nextDefaultChannelHandlerContext

DefaultChannelHandlerContext的invokeChannelRead(Object msg)

这次我们可以看到处理器就是ServerBootstrapAcceptor

202309132158112859.png

ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)

这里就是将NioSocketChannel注册到worker组里的代码啦:

      public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;//得到NioSocketChannel
    
                child.pipeline().addLast(childHandler);//在得到NioSocketChannel的管道中添加我们自定义的初始化处理器
    
                setChannelOptions(child, childOptions, logger);//设置选项
                setAttributes(child, childAttrs);//设置属性
    
                try {//向workerGroupt注册NioSocketChannel并添加完成监听
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
                    forceClose(child, t);
                }
            }

boss组接受连接的事基本讲完了,剩下的worker组干的事我们后面讲吧,完美待续…

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文