简单介绍
这个类是比较核心的类,这个类定义了很多关键的操作,算比较底层的了,其实以前我们有讲过他的两个子类,一个就是AbstractNioMessageChannel
的NioMessageUnsafe
,这个是用于NioServerSocketChannel
的unsafe
类,另外一个AbstractNioByteChannel
的NioByteUnsafe
,用于NioSocketChannel
,其实也就是一些读取方法不一样,毕竟一个是取接受连接,一个是读取数据,不一样。我们前面其实有分析过一些他们的read
方法。今天主要来分析下NioSocketChannelUnsafe
的write
和flush
方法,。
先看下这个NioSocketChannelUnsafe
的结构,他是专门用来操作写和刷新的:
write(Object msg, ChannelPromise promise)
会将数据msg
封装成直接缓冲区,然后添加到出站缓冲区中:
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);//封装成直接缓冲区
size = pipeline.estimatorHandle().size(msg);//获取缓冲区大小
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);//往出站缓冲区添加消息
}
filterOutboundMessage(Object msg)
这个是专门把数据封装成直接缓冲区,以便于进行零拷贝,利用堆外内存,提高效率,关于零拷贝,可以看看这篇文章,其实这里可以立即为没有用CPU
将数据从内核拷贝到Java
虚拟机中:
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {//如果是直接缓冲区就返回
return msg;
}
return newDirectBuffer(buf);//否则封装成直接缓冲区就可以零拷贝
}
if (msg instanceof FileRegion) {//文件缓冲区也可以零拷贝
return msg;
}
//剩下的就不支持了
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
AbstractNioChannel的newDirectBuffer(ByteBuf buf)
其实就是获得一个新的直接缓冲区,把旧的缓冲区释放了:
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {//如果没有数据,就释放,返回一个空的
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
//字节缓冲区分配器
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {//是直接缓冲区池化的
ByteBuf directBuf = alloc.directBuffer(readableBytes);//申请直接缓冲区
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);//写入直接缓冲区
ReferenceCountUtil.safeRelease(buf);//释放原来的缓冲区
return directBuf;//返回直接缓冲区
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();//线程中的直接缓冲区
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;//如果申请或者释放未池化的直接缓冲区消耗太大,就直接返回原来的
}
ChannelOutboundBuffer的addMessage
这里才是将直接缓冲区添加到出站缓冲区中,不过是会创建一个实体Entry,然后用一个单链表结构来存取的,这个后面会讲:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);//创建实体
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;//指向第一个未冲刷的实体
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);//增加待冲刷的消息
}
链表结构大致是这样:
ChannelOutboundBuffer的incrementPendingOutboundBytes
这个就是增加待出站的字节数,如果超过上限的话,就设置不可以写了:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {//如果大于配置的16位大小
setUnwritable(invokeLater);//设置不可写
}
}
ChannelOutboundBuffer的setUnwritable
我们先来看看可写是怎么判断的:
public boolean isWritable() {
return unwritable == 0;
}
然后设置不可写就是这样,将unwritable
原子操作改为非0
,然后触发fireChannelWritabilityChanged
,也就是写能力改变了,不可写了:
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;//可写的时候是0
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {//开始是0,后来变为非0,就是不可写了
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
如果是立即改变,就会调用pipeline.fireChannelWritabilityChanged();
,就会从头结点开始传递这个事件,否则就给通道的事件循环提交个任务:
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
至此,write
写数据就完成了,其实就是写入出站缓冲区里面,并没有将数据冲刷到对端,要进行flush才会将数据发出去。
flush()
可以看到这里才会开始冲刷,会先进行打标记,然后冲刷:
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;//获得出站缓冲区
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();//添加冲刷计数
flush0();//冲刷
}
ChannelOutboundBuffer的addFlush
这里会将flushedEntry
设置为要冲刷的第一个entry
,然后遍历链表,冲刷计数flushed
+1,如果此时请求取消的话,就进行取消和出站字节数的减少,最后将为冲刷实体unflushedEntry
设为空,表示这些都已经要冲刷的了,后续会根据flushed
来进行冲刷;
public void addFlush() {
Entry entry = unflushedEntry;//第一个没冲刷的数据,也是链表的第一个
if (entry != null) {//有数据才刷了
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;//设置第一个要冲刷的实体
}
do {
flushed ++;//冲刷数+1
if (!entry.promise.setUncancellable()) {//如果取消的话需要回收内存
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);//遍历冲刷是否有取消的
// All flushed so reset unflushedEntry
unflushedEntry = null;//重置未冲刷的
}
}
AbstractNioUnsafe的flush0
要冲刷了:
@Override
protected final void flush0() {
if (!isFlushPending()) {//没有待冲刷的操作
super.flush0();
}
}
AbstractNioUnsafe的isFlushPending
先判断下是否已经有待冲刷存在,也就是有设置OP_WRITE
事件:
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
AbstractUnsafe的flush0
这里就要开始真正的冲刷了,省略了非核心的操作:
protected void flush0() {
...
doWrite(outboundBuffer);
...
}
NioSocketChannel的doWrite(ChannelOutboundBuffer in)
最终当然还是封装了NIO
的SocketChannel
的write
方法来进行写数据啦,他会进行16
次自旋尝试,来写消息,直到出站缓冲区的数据全部写出去了,然后就clearOpWrite
清除OP_WRITE
设置,返回,否则要去设置任务是否写操作incompleteWrite
:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();//内部还是用NIO的操作的
int writeSpinCount = config().getWriteSpinCount();//写自旋的次数,默认是16次
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();//全部写完后,进行写事件的清除
// Directly return here so incompleteWrite(...) is not called.
return;//直接返回,不需要调用incompleteWrite
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//获取最大的待写字节数16384
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);//获取ByteBuffer数组
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write 一个就不用gathering
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero. 0字节的不会放进缓冲区里,不用检查
ByteBuffer buffer = nioBuffers[0];//只有一个
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);//用NIO的SocketChannel写出去
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();//获取发多少字节数据
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//一起写出去了
if (localWrittenBytes <= 0) {//没完成
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
上面这个方法涉及到的东西比较多,讲起来东西很多,因为还涉及到ChannelOutboundBuffer
,还有一些ByteBuf
相关的东西等等,下一篇详细的来介绍下这些东西,不然很难讲清楚doWrite
。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。