2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104193635

简单介绍

他是一个通道的出站缓冲区,所有要写的数据都会先存在这里,等到要刷新的时候才会真的写出去。

内部类Entry

其实消息都是封装成内部的Entry类的,存储结构是一个单链表。我来看看这个类,其实他是一个对象池,可以复用:

        static final class Entry {
        //Entry对象池
            private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
                @Override
                public Entry newObject(Handle<Entry> handle) {
                    return new Entry(handle);
                }
            });
    
            private final Handle<Entry> handle;//池化操作的处理器
            Entry next;//链表的下一个
            Object msg;//信息
            ByteBuffer[] bufs;//缓存字节缓冲区数组,为了复用提高效率
            ByteBuffer buf;//缓存字节缓冲区,为了复用提高效率
            ChannelPromise promise;//回调
            long progress;//当前进度,即已经传了多少数据
            long total;//总共的数据大小
            int pendingSize;//待冲刷的评估大小,要加上96
            int count = -1;
            boolean cancelled;//是否被取消了
    
            private Entry(Handle<Entry> handle) {
                this.handle = handle;
            }
    //用对象池的方式创建实体
            static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
                Entry entry = RECYCLER.get();//从池子里获取
                entry.msg = msg;//消息
                entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;//评估的大小
                entry.total = total;//具体大小
                entry.promise = promise;
                return entry;
            }
    		//取消了,返回待冲刷的评估大小
            int cancel() {
                if (!cancelled) {
                    cancelled = true;//取消标识
                    int pSize = pendingSize;
    
                    // release message and replace with an empty buffer
                    ReferenceCountUtil.safeRelease(msg);//释放
                    msg = Unpooled.EMPTY_BUFFER;
    
                    pendingSize = 0;
                    total = 0;
                    progress = 0;
                    bufs = null;
                    buf = null;
                    return pSize;
                }
                return 0;
            }
    		//用完初始化后再放回收到池子里
            void recycle() {
                next = null;//设置成null
                bufs = null;
                buf = null;
                msg = null;
                promise = null;
                progress = 0;
                total = 0;
                pendingSize = 0;
                count = -1;
                cancelled = false;
                handle.recycle(this);
            }
    		//回收当前实体并获取下一个实体,为什么要先获取下一个再回收呢,因为回收的时候把next设置null啦
            Entry recycleAndGetNext() {
                Entry next = this.next;
                recycle();
                return next;
            }
        }

重要属性

    	static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =//出站实体的额外开销96字节
                SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
        //保存线程对应的缓冲区,默认是1024个ByteBuffer数组,FastThreadLocal比一般的ThreadLocal要快,他是利用数组,内部用的是常量索引的数组,不是hash算法
        private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
            @Override
            protected ByteBuffer[] initialValue() throws Exception {
                return new ByteBuffer[1024];
            }
        };
    // 单链表结构
        // The Entry that is the first in the linked-list structure that was flushed
        private Entry flushedEntry;//第一个要冲刷的实体
        // The Entry which is the first unflushed in the linked-list structure
        private Entry unflushedEntry;//第一个未冲刷的实体
        // The Entry which represents the tail of the buffer
        private Entry tailEntry;//尾结点实体
        // The number of flushed entries that are not written yet
        private int flushed;//要冲刷的数量,但是还没真正冲刷出去,就是出站缓冲区大小
    
     	private int nioBufferCount;//可以冲刷的缓冲区个数
        private long nioBufferSize;//可以写出的总的缓冲区数组数据大小
        private boolean inFail;//是否冲刷失败
    
    //原子操作totalPendingSize
        private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
                AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile long totalPendingSize;//待冲刷缓冲区的字节总数
    
    //原子操作unwritable
        private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
    
        @SuppressWarnings("UnusedDeclaration")
        private volatile int unwritable;
    
        private volatile Runnable fireChannelWritabilityChangedTask;//写能力改变的任务

decrementPendingOutboundBytes

减少待出站的字节数,默认是提交任务延迟触发的:

     void decrementPendingOutboundBytes(long size) {
            decrementPendingOutboundBytes(size, true, true);
        }
    
        private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
            if (size == 0) {
                return;
            }
    		//总数减少
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
            if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {//如果小于阈值就说明可写了,提交触发通道的任务
                setWritable(invokeLater);
            }
        }

setWritable

原子操作修改成可写状态:

     private void setWritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & ~1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {//可写的值由非0到0,能写了
                        fireChannelWritabilityChanged(invokeLater);//能写了就要触发事件了
                    }
                    break;
                }
            }
        }

fireChannelWritabilityChanged

根据传入的参数判断是否要立即触发还时提交任务:

      private void fireChannelWritabilityChanged(boolean invokeLater) {
            final ChannelPipeline pipeline = channel.pipeline();
            if (invokeLater) {
                Runnable task = fireChannelWritabilityChangedTask;
                if (task == null) {
                    fireChannelWritabilityChangedTask = task = new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelWritabilityChanged();
                        }
                    };
                }
                channel.eventLoop().execute(task);
            } else {
                pipeline.fireChannelWritabilityChanged();
            }
        }

total

获取要写出去消息的真正字节大小:

    //获取大小
        private static long total(Object msg) {
            if (msg instanceof ByteBuf) {
                return ((ByteBuf) msg).readableBytes();
            }
            if (msg instanceof FileRegion) {
                return ((FileRegion) msg).count();
            }
            if (msg instanceof ByteBufHolder) {
                return ((ByteBufHolder) msg).content().readableBytes();
            }
            return -1;
        }

这个就是真实的大小,其实和默认的评估大小类似,只是FileRegion类型不一样,这个具体后面会说:

202309132159519381.png

current

获取当前冲刷的实体消息:

     public Object current() {
            Entry entry = flushedEntry;
            if (entry == null) {
                return null;
            }
    
            return entry.msg;
        }

currentProgress

获取当前实体冲刷的进度:

    public long currentProgress() {
            Entry entry = flushedEntry;
            if (entry == null) {
                return 0;
            }
            return entry.progress;
        }

progress

通知当前实体冲刷的进度:

     public void progress(long amount) {
            Entry e = flushedEntry;
            assert e != null;
            ChannelPromise p = e.promise;
            long progress = e.progress + amount;
            e.progress = progress;
            if (p instanceof ChannelProgressivePromise) {
                ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
            }
        }

size

这个flushed就是出站缓冲区大小:

     public int size() {
            return flushed;
        }
        
     public boolean isEmpty() {
           return flushed == 0;
       }

clearNioBuffers

清除缓存区数组中的数据,都设置为null

        private void clearNioBuffers() {
            int count = nioBufferCount;
            if (count > 0) {
                nioBufferCount = 0;
                Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
            }
        }

removeEntry

从单链表中删除实体:

    //链表中删除实体
        private void removeEntry(Entry e) {
            if (-- flushed == 0) {//最后一个了
                // processed everything
                flushedEntry = null;
                if (e == tailEntry) {
                    tailEntry = null;
                    unflushedEntry = null;
                }
            } else {
                flushedEntry = e.next;//跳过e,指向下一个
            }
        }

remove

从当前已经标记为冲刷的实体:

     public boolean remove() {
            Entry e = flushedEntry;//删除当前消息
            if (e == null) {//没有要冲刷的了,就清除缓存
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            removeEntry(e);//从链表中删除实体
    
            if (!e.cancelled) {//没取消就要释放消息
                // only release message, notify and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);//释放消息缓存区
                safeSuccess(promise);//设置回调成功
                decrementPendingOutboundBytes(size, false, true);//减少待冲刷缓冲区大小,立即触发相应事件
            }
    
            // recycle the entry
            e.recycle();//回收实体
    
            return true;
        }

remove(Throwable cause)

失败后删除:

     	public boolean remove(Throwable cause) {
            return remove0(cause, true);
        }
    	//失败后删除,要抛出异常,立即触发事件,成功删除一个实体返回true,没有实体删除了就清楚缓存,返回false
        private boolean remove0(Throwable cause, boolean notifyWritability) {
            Entry e = flushedEntry;
            if (e == null) {//删完为止
                clearNioBuffers();
                return false;
            }
            Object msg = e.msg;
    
            ChannelPromise promise = e.promise;
            int size = e.pendingSize;
    
            removeEntry(e);
    
            if (!e.cancelled) {
                // only release message, fail and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);
    
                safeFail(promise, cause);//回调失败,把异常传递进去
                decrementPendingOutboundBytes(size, false, notifyWritability);//减少待冲刷缓冲区大小,立即触发相应事件
            }
    
            // recycle the entry
            e.recycle();
    
            return true;
        }

removeBytes(long writtenBytes)

删除所有已经冲刷出去的字节数据,假设缓存区中数据都是ByteBuf类型的,其实这个就是当数据全部冲刷出去之后,要把缓存清空:

      public void removeBytes(long writtenBytes) {
            for (;;) {
                Object msg = current();//获取当前要冲刷的实体数据
                if (!(msg instanceof ByteBuf)) {//没有实体或者类型不是ByteBuf的直接跳出循环
                    assert writtenBytes == 0;
                    break;
                }
    
                final ByteBuf buf = (ByteBuf) msg;
                final int readerIndex = buf.readerIndex();//读索引
                final int readableBytes = buf.writerIndex() - readerIndex;//可读的数据大小
    
                if (readableBytes <= writtenBytes) {//可读的数据小于等于要写的数据大小
                    if (writtenBytes != 0) {
                        progress(readableBytes);//刷新写进度
                        writtenBytes -= readableBytes;//处理完了就减去
                    }
                    remove();//删除当前的实体
                } else { // readableBytes > writtenBytes
                    if (writtenBytes != 0) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);//设置读索引
                        progress(writtenBytes);//刷新写进度
                    }
                    break;
                }
            }
            clearNioBuffers();//清楚缓存
        }

nioBuffers

获取直接数组缓冲区,就是要冲刷的时候用的:

        public ByteBuffer[] nioBuffers() {
            return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
        }

nioBuffers(int maxCount, long maxBytes)

这个方法会将缓冲区数组数据限制在传入参数中,其实内部就是获取每一个实体中的消息ByteBuf,将他们放入缓冲区数组,统计有多少可读的缓冲区和总的数据大小:

     public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
            assert maxCount > 0;//最大缓冲区数量
            assert maxBytes > 0;//最大字节数
            long nioBufferSize = 0;//缓冲区字节数
            int nioBufferCount = 0;//缓冲区数量
            final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);//快速获取当前线程对应的缓冲区数组
            Entry entry = flushedEntry;//获得要冲刷的实体
            while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {//有冲刷标记的,且实体消息是ByteBuf类型
                if (!entry.cancelled) {//没有取消的
                    ByteBuf buf = (ByteBuf) entry.msg;//获得消息
                    final int readerIndex = buf.readerIndex();//读索引
                    final int readableBytes = buf.writerIndex() - readerIndex;//可读字节
    
                    if (readableBytes > 0) {
                        if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {//超过最大字节数且缓冲区数量不为0
                            // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
                            // we stop populate the ByteBuffer array. This is done for 2 reasons:
                            // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
                            // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
                            // on the architecture and kernel but to be safe we also enforce the limit here.
                            // 2. There is no sense in putting more data in the array than is likely to be accepted by the
                            // OS.
                            // 底层不允许写的字节数大于Integer.MAX_VALUE,否则会报错。
                            // See also:
                            // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                            // - http://linux.die.net/man/2/writev
                            break;
                        }
                        nioBufferSize += readableBytes;//累加字节数
                        int count = entry.count;//数量,默认是-1
                        if (count == -1) {
                            //noinspection ConstantValueVariableUse
                            entry.count = count = buf.nioBufferCount();//默认返回是1个缓冲区,也可能是其他,那可能就是entry的bufs
                        }
                        int neededSpace = min(maxCount, nioBufferCount + count);//需要的空间,取最小
                        if (neededSpace > nioBuffers.length) {//如果大于1024就进行扩张
                            nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                            NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                        }
                        if (count == 1) {
                            ByteBuffer nioBuf = entry.buf;//默认buf是空的
                            if (nioBuf == null) {
                                // 缓存一个缓冲区,实体可以复用,到时候就不用创建了
                                entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                            }
                            nioBuffers[nioBufferCount++] = nioBuf;//用实体的缓冲区,添加到缓存区数组里
                        } else {//为了内联,又封装了一个方法来获得缓冲区个数,不过这个条件分支不会经常进来
                            //用实体的缓冲区数组
                            nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                        }
                        if (nioBufferCount == maxCount) {//达到上限就跳出循环
                            break;
                        }
                    }
                }
                entry = entry.next;
            }
            this.nioBufferCount = nioBufferCount;//缓冲区个数
            this.nioBufferSize = nioBufferSize;//缓冲区总数据大小
    
            return nioBuffers;
        }

expandNioBufferArray

进行缓冲区数组的扩张,每次是原来的2倍大小:

     private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
            int newCapacity = array.length;
            do {
                // double capacity until it is big enough
                // See https://github.com/netty/netty/issues/1890
                newCapacity <<= 1;//每次扩张为原来的2倍
    
                if (newCapacity < 0) {
                    throw new IllegalStateException();
                }
    
            } while (neededSpace > newCapacity);
    
            ByteBuffer[] newArray = new ByteBuffer[newCapacity];
            System.arraycopy(array, 0, newArray, 0, size);//复制原来的数据到新数组中
    
            return newArray;
        }

nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount)

这里是用实体的缓冲区数组,统计可以读的缓冲区个数,以及将实体缓冲区数组中的可读的缓冲区放入缓冲区数组中:

    private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
            ByteBuffer[] nioBufs = entry.bufs;//默认都是空
            if (nioBufs == null) {
                // cached ByteBuffers as they may be expensive to create in terms
                // of Object allocation //缓存一份,因为创建成本高,而且这个实体是池化的,所以要复用
                entry.bufs = nioBufs = buf.nioBuffers();
            }
            //遍历实体的缓冲区数组
            for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
                ByteBuffer nioBuf = nioBufs[i];
                if (nioBuf == null) {//获得空了就说明后面就没有了,跳出循环即可
                    break;
                } else if (!nioBuf.hasRemaining()) {//缓冲区不可读了,就继续下一个
                    continue;
                }
                nioBuffers[nioBufferCount++] = nioBuf;//将可读的缓冲区添加到缓冲区数组,然后nioBufferCount+1
            }
            return nioBufferCount;
        }

所以这里可能 缓冲区个数nioBufferCount可能是会比实体的消息个数多

用户定义写标志

这个我就不多说了,就是用户自定来定义哪一位代表可不可写入,第131位,默认是第0位的:

     /**
         * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
         * not exceed the write watermark of the {@link Channel} and
         * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
         * {@code false}.
         */
        public boolean isWritable() {
            return unwritable == 0;
        }
    
        /** 用户定义标志位代表可不可写
         * Returns {@code true} if and only if the user-defined writability flag at the specified index is set to
         * {@code true}.
         */
        public boolean getUserDefinedWritability(int index) {
            return (unwritable & writabilityMask(index)) == 0;
        }
    
        /**
         * Sets a user-defined writability flag at the specified index.
         */
        public void setUserDefinedWritability(int index, boolean writable) {
            if (writable) {
                setUserDefinedWritability(index);
            } else {
                clearUserDefinedWritability(index);
            }
        }
    
        private void setUserDefinedWritability(int index) {
            final int mask = ~writabilityMask(index);
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & mask;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {
                        fireChannelWritabilityChanged(true);
                    }
                    break;
                }
            }
        }
    
        private void clearUserDefinedWritability(int index) {
            final int mask = writabilityMask(index);
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue | mask;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue == 0 && newValue != 0) {
                        fireChannelWritabilityChanged(true);
                    }
                    break;
                }
            }
        }
    
        private static int writabilityMask(int index) {
            if (index < 1 || index > 31) {
                throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
            }
            return 1 << index;
        }

failFlushed

冲刷失败了就将实体全部删除:

    void failFlushed(Throwable cause, boolean notify) {
            // Make sure that this method does not reenter.  A listener added to the current promise can be notified by the
            // current thread in the tryFailure() call of the loop below, and the listener can trigger another fail() call
            // indirectly (usually by closing the channel.)
            //
            // See https://github.com/netty/netty/issues/1501
            if (inFail) {//失败就返回了
                return;
            }
    
            try {
                inFail = true;
                for (;;) {
                //这里就是前面的失败就删除,默认删除一个会返回true,循环继续,继续删除下一个,直到全部删除位置才返回false,才跳出循环
                    if (!remove0(cause, notify)) {
                        break;
                    }
                }
            } finally {
                inFail = false;
            }
        }

bytesBeforeUnwritable

不可写之前还有多少字节可以写:

     public long bytesBeforeUnwritable() {
            long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
            // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check writability.
            // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
            // together. totalPendingSize will be updated before isWritable().
            if (bytes > 0) {
                return isWritable() ? bytes : 0;
            }
            return 0;
        }

bytesBeforeWritable

可写之前还有多少个字节在等待排队要写的:

     public long bytesBeforeWritable() {
            long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
            // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
            // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
            // together. totalPendingSize will be updated before isWritable().
            if (bytes > 0) {
                return isWritable() ? 0 : bytes;
            }
            return 0;
        }

forEachFlushedMessage

刷新每一个消息冲刷的进度:

     public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
            ObjectUtil.checkNotNull(processor, "processor");
    
            Entry entry = flushedEntry;
            if (entry == null) {
                return;
            }
    
            do {
                if (!entry.cancelled) {
                    if (!processor.processMessage(entry.msg)) {
                        return;
                    }
                }
                entry = entry.next;
            } while (isFlushedEntry(entry));//是否标记了冲刷的
        }
    
    //有冲刷标志的
        private boolean isFlushedEntry(Entry e) {
            return e != null && e != unflushedEntry;
        }

至此方法都讲完了,其实这个就是一个出站的数据缓存的地方,你来一个数据,我就给你封装成一个实体,然后最后要写出去之前给你打好标记,然后装进我的缓存区数组里,让通道写出,最后再删除写完实体,清空缓存区数组。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文