2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104182571

简单介绍

这个类是比较核心的类,这个类定义了很多关键的操作,算比较底层的了,其实以前我们有讲过他的两个子类,一个就是AbstractNioMessageChannelNioMessageUnsafe,这个是用于NioServerSocketChannelunsafe类,另外一个AbstractNioByteChannelNioByteUnsafe,用于NioSocketChannel,其实也就是一些读取方法不一样,毕竟一个是取接受连接,一个是读取数据,不一样。我们前面其实有分析过一些他们的read方法。今天主要来分析下NioSocketChannelUnsafewriteflush方法,。
先看下这个NioSocketChannelUnsafe的结构,他是专门用来操作写和刷新的:

202309132159467231.png

write(Object msg, ChannelPromise promise)

会将数据msg封装成直接缓冲区,然后添加到出站缓冲区中:

      @Override
            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, newClosedChannelException(initialCloseCause));
                    // release message now to prevent resource-leak
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                int size;
                try {
                    msg = filterOutboundMessage(msg);//封装成直接缓冲区
                    size = pipeline.estimatorHandle().size(msg);//获取缓冲区大小
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                outboundBuffer.addMessage(msg, size, promise);//往出站缓冲区添加消息
            }

filterOutboundMessage(Object msg)

这个是专门把数据封装成直接缓冲区,以便于进行零拷贝,利用堆外内存,提高效率,关于零拷贝,可以看看这篇文章,其实这里可以立即为没有用CPU将数据从内核拷贝到Java虚拟机中:

      @Override
        protected final Object filterOutboundMessage(Object msg) {
            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                if (buf.isDirect()) {//如果是直接缓冲区就返回
                    return msg;
                }
    
                return newDirectBuffer(buf);//否则封装成直接缓冲区就可以零拷贝
            }
    
            if (msg instanceof FileRegion) {//文件缓冲区也可以零拷贝
                return msg;
            }
    //剩下的就不支持了
            throw new UnsupportedOperationException(
                    "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
        }

AbstractNioChannel的newDirectBuffer(ByteBuf buf)

其实就是获得一个新的直接缓冲区,把旧的缓冲区释放了:

    protected final ByteBuf newDirectBuffer(ByteBuf buf) {
            final int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {//如果没有数据,就释放,返回一个空的
                ReferenceCountUtil.safeRelease(buf);
                return Unpooled.EMPTY_BUFFER;
            }
    //字节缓冲区分配器
            final ByteBufAllocator alloc = alloc();
            if (alloc.isDirectBufferPooled()) {//是直接缓冲区池化的
                ByteBuf directBuf = alloc.directBuffer(readableBytes);//申请直接缓冲区
                directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);//写入直接缓冲区
                ReferenceCountUtil.safeRelease(buf);//释放原来的缓冲区
                return directBuf;//返回直接缓冲区
            }
    
            final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();//线程中的直接缓冲区
            if (directBuf != null) {
                directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
                ReferenceCountUtil.safeRelease(buf);
                return directBuf;
            }
    
            // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
            return buf;//如果申请或者释放未池化的直接缓冲区消耗太大,就直接返回原来的
        }

ChannelOutboundBuffer的addMessage

这里才是将直接缓冲区添加到出站缓冲区中,不过是会创建一个实体Entry,然后用一个单链表结构来存取的,这个后面会讲:

     public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);//创建实体
            if (tailEntry == null) {
                flushedEntry = null;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
            }
            tailEntry = entry;
            if (unflushedEntry == null) {
                unflushedEntry = entry;//指向第一个未冲刷的实体
            }
    
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息
        }

链表结构大致是这样:

202309132159476302.png

ChannelOutboundBuffer的incrementPendingOutboundBytes

这个就是增加待出站的字节数,如果超过上限的话,就设置不可以写了:

     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小
                setUnwritable(invokeLater);//设置不可写
            }
        }

ChannelOutboundBuffer的setUnwritable

我们先来看看可写是怎么判断的:

       public boolean isWritable() {
            return unwritable == 0;
        }

然后设置不可写就是这样,将unwritable 原子操作改为非0,然后触发fireChannelWritabilityChanged,也就是写能力改变了,不可写了:

      private void setUnwritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;//可写的时候是0
                final int newValue = oldValue | 1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue == 0 && newValue != 0) {//开始是0,后来变为非0,就是不可写了
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }

如果是立即改变,就会调用pipeline.fireChannelWritabilityChanged();,就会从头结点开始传递这个事件,否则就给通道的事件循环提交个任务:

        private void fireChannelWritabilityChanged(boolean invokeLater) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (invokeLater) {
                Runnable task = fireChannelWritabilityChangedTask;
                if (task == null) {
                    fireChannelWritabilityChangedTask = task = new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelWritabilityChanged();
                        }
                    };
                }
                channel.eventLoop().execute(task);
            } else {
                pipeline.fireChannelWritabilityChanged();
            }
        }

至此,write写数据就完成了,其实就是写入出站缓冲区里面,并没有将数据冲刷到对端,要进行flush才会将数据发出去。

flush()

可以看到这里才会开始冲刷,会先进行打标记,然后冲刷:

    @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;//获得出站缓冲区
                if (outboundBuffer == null) {
                    return;
                }
    
                outboundBuffer.addFlush();//添加冲刷计数
                flush0();//冲刷
            }

ChannelOutboundBuffer的addFlush

这里会将flushedEntry设置为要冲刷的第一个entry,然后遍历链表,冲刷计数flushed+1,如果此时请求取消的话,就进行取消和出站字节数的减少,最后将为冲刷实体unflushedEntry设为空,表示这些都已经要冲刷的了,后续会根据flushed来进行冲刷;

      public void addFlush() {
    
            Entry entry = unflushedEntry;//第一个没冲刷的数据,也是链表的第一个
            if (entry != null) {//有数据才刷了
                if (flushedEntry == null) {
                    // there is no flushedEntry yet, so start with the entry
                    flushedEntry = entry;//设置第一个要冲刷的实体
                }
                do {
                    flushed ++;//冲刷数+1
                    if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存
                        // Was cancelled so make sure we free up memory and notify about the freed bytes
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false, true);
                    }
                    entry = entry.next;
                } while (entry != null);//遍历冲刷是否有取消的
    
                // All flushed so reset unflushedEntry
                unflushedEntry = null;//重置未冲刷的
            }
        }

AbstractNioUnsafe的flush0

要冲刷了:

     @Override
            protected final void flush0() {
                if (!isFlushPending()) {//没有待冲刷的操作
                    super.flush0();
                }
            }

AbstractNioUnsafe的isFlushPending

先判断下是否已经有待冲刷存在,也就是有设置OP_WRITE事件:

     private boolean isFlushPending() {
                SelectionKey selectionKey = selectionKey();
                return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
            }

AbstractUnsafe的flush0

这里就要开始真正的冲刷了,省略了非核心的操作:

      protected void flush0() {
     			...
                    doWrite(outboundBuffer);
    			...
            }

NioSocketChannel的doWrite(ChannelOutboundBuffer in)

最终当然还是封装了NIOSocketChannelwrite方法来进行写数据啦,他会进行16次自旋尝试,来写消息,直到出站缓冲区的数据全部写出去了,然后就clearOpWrite清除OP_WRITE设置,返回,否则要去设置任务是否写操作incompleteWrite

     @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            SocketChannel ch = javaChannel();//内部还是用NIO的操作的
            int writeSpinCount = config().getWriteSpinCount();//写自旋的次数,默认是16次
            do {
                if (in.isEmpty()) {
                    // All written so clear OP_WRITE
                    clearOpWrite();//全部写完后,进行写事件的清除
                    // Directly return here so incompleteWrite(...) is not called.
                    return;//直接返回,不需要调用incompleteWrite
                }
    
                // Ensure the pending writes are made of ByteBufs only.
                int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//获取最大的待写字节数16384
                ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);//获取ByteBuffer数组
                int nioBufferCnt = in.nioBufferCount();
    
                // Always us nioBuffers() to workaround data-corruption.
                // See https://github.com/netty/netty/issues/2761
                switch (nioBufferCnt) {
                    case 0:
                        // We have something else beside ByteBuffers to write so fallback to normal writes.
                        writeSpinCount -= doWrite0(in);
                        break;
                    case 1: {
                        // Only one ByteBuf so use non-gathering write 一个就不用gathering
                        // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                        // to check if the total size of all the buffers is non-zero. 0字节的不会放进缓冲区里,不用检查
                        ByteBuffer buffer = nioBuffers[0];//只有一个
                        int attemptedBytes = buffer.remaining();
                        final int localWrittenBytes = ch.write(buffer);//用NIO的SocketChannel写出去
                        if (localWrittenBytes <= 0) {
                            incompleteWrite(true);
                            return;
                        }
                        adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                    default: {
                        // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                        // to check if the total size of all the buffers is non-zero.
                        // We limit the max amount to int above so cast is safe
                        long attemptedBytes = in.nioBufferSize();//获取发多少字节数据
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//一起写出去了
                        if (localWrittenBytes <= 0) {//没完成
                            incompleteWrite(true);
                            return;
                        }
                        // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                        adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                                maxBytesPerGatheringWrite);
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                }
            } while (writeSpinCount > 0);
    
            incompleteWrite(writeSpinCount < 0);
        }

上面这个方法涉及到的东西比较多,讲起来东西很多,因为还涉及到ChannelOutboundBuffer,还有一些ByteBuf相关的东西等等,下一篇详细的来介绍下这些东西,不然很难讲清楚doWrite

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文