2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104114379

worker组注册NioSocketChannel

先补一张比较好理解的注册图,这里暂且把所有新的通道注册到同一个worker组的某个事件循环上,实际上是负载均衡注册的:

202309132158171071.png
注册其实跟boss组一样的,只是这个时候参数是NioSocketChannel类型,会从worker组的NioEventLoop选一个进行注册:

202309132158221822.png
这里要注意的是,里面的Unsafe其实是NioSocketChannel.NioSocketChannelUnsafe类型的,继承了NioByteUnsafe一看这个就知道是跟字节相关的,所以是用于读取和写入数据:

202309132158232303.png
我们有看到两个unsafe子类,上面那个是NioServerSocketChannel用的,下面的是NioSocketChannel的,其实他们只是read不一样:

202309132158243154.png
register是父类的方法一样的。跟前面的NioServerSocketChannel注册一样的,就是提交注册任务,开启线程,你可以看到worker组的线程开启来了:

202309132158254295.png
之后就是worker组的线程进行执行注册任务,像选择器注册SocketChannel实例:

202309132158263776.png
然后进行pipeline.invokeHandlerAddedIfNeeded();的时候会执行我们自定义的childHandlerhandlerAdded方法,会执行ChannelInitializerinitChannel方法,讲我们自定义的处理器添加进去:

202309132158272937.png
然后出发相应的handlerAddedchannelRegisteredchannelActive方法。即处理器添加了,注册了,又激活了。
完成任务之后,就进入select阻塞了。

读取消息

一旦建立好的连接发送消息过来,就会处理,跟前面一样调用processSelectedKeys等一系列方法,直到read方法,主要还是doReadBytes(byteBuf)方法:

      public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();//字节缓冲区分配器
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));//这里是关键,读取出具
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);//传递读事件
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }

doReadBytes

        @Override
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取分配处理器
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());//设置可写的字节
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());//读取通道的数据,写入字节缓冲区
        }

writeBytes

向缓冲区写入数据这个是关键:

        @Override
        public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
            ensureWritable(length);
            int writtenBytes = setBytes(writerIndex, in, length);
            if (writtenBytes > 0) {
                writerIndex += writtenBytes;
            }
            return writtenBytes;
        }

setBytes

in.read就是通道从底层去读取socket缓冲区的数据到字节缓冲区里:

      @Override
        public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
            try {
                return in.read(internalNioBuffer(index, length));
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }
internalNioBuffer

这个方法其实返回的是ByteBuffer ,也就是说底层是封装了ByteBuffer 的:

     @Override
        public final ByteBuffer internalNioBuffer(int index, int length) {
            checkIndex(index, length);
            return _internalNioBuffer(index, length, false);
        }
_internalNioBuffer

果然是这样:

    final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
            index = idx(index);
            ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
            buffer.limit(index + length).position(index);
            return buffer;
        }

而且是直接缓冲区DirectByteBuffer,少了一次从内核到用户空间的数据拷贝:

202309132158281428.png

allocHandle.lastBytesRead(doReadBytes(byteBuf))

之后就做一些记录,比如这次我们读了11个字节,为什么是11个呢,因为我客户端发送了个hello world,后面会看到:

202309132158291489.png

pipeline.fireChannelRead(byteBuf)

接着就是把缓冲区数据传递到管道里,让处理器处理啦,这个上一篇讲过,怎么传递的,就不多说了,但是这里要注意的是:

2023091321583004610.png
如果是引用计数累心的话,会进行封装:

2023091321583148411.png

2023091321583241312.png
我们可以看到,netty自定义的所有的字节缓冲区都是引用计数类型的:

2023091321583358213.png
所以最后touch执行的是AbstractReferenceCountedByteBuftouch(java.lang.Object),貌似没啥改变:

2023091321583463914.png
最后传递到我自定义的处理器中,读取出来:

2023091321583543115.png
我客户端是用NIO发的:

2023091321583656316.png

总结

基本你上的流程跟上一篇的差不多,只是读取数据方面有点不一样。现在我们知道大致的流程了,后面会继续看细节,其实我讲的这些也都是个大概,但是很重要,骨架要清楚,不然开始就我往细节里钻,会出不来了。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文