Netty的Unsafe接口
这个Unsafe
可不是JDK
原生的Unsafe
哦,主要就是一些直接跟IO
底层直接相关的通用操作:
interface Unsafe {
// 接受数据的时候用于分配字节缓冲区的处理器
RecvByteBufAllocator.Handle recvBufAllocHandle();
// 本地地址
SocketAddress localAddress();
// 远程地址
SocketAddress remoteAddress();
//向事件循环注册通道,完成后回调
void register(EventLoop eventLoop, ChannelPromise promise);
// 绑定本地地址,完成后回调
void bind(SocketAddress localAddress, ChannelPromise promise);
// 连接
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
// 断开连接,完成回调
void disconnect(ChannelPromise promise);
// 关闭连接,完成回调
void close(ChannelPromise promise);
// 立即关闭,不触发任何事件
void closeForcibly();
// 注销,完成回调
void deregister(ChannelPromise promise);
// 开始读操作
void beginRead();
// 写操作
void write(Object msg, ChannelPromise promise);
// 冲刷所有的出站数据
void flush();
// 特殊的占位符,不接受通知
ChannelPromise voidPromise();
//写操作的出站缓冲区
ChannelOutboundBuffer outboundBuffer();
}
AbstractUnsafe基本抽象实现
属性
这些就是一些基本的属性,要进行数据的读写,需要有接收缓冲区,所以有了recvHandle
处理器,写出去的时候需要有写缓冲区ChannelOutboundBuffer
,注意ChannelOutboundBuffer
是初始化的时候就会创建,就创建一次。
//出站字节缓冲区
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;//接受数据缓冲分配器的处理器
private boolean inFlush0;//是否正在缓冲
/** true if the channel has never been registered, false otherwise */
private boolean neverRegistered = true;//通道没注册过
private void assertEventLoop() {//断言还没注册,或者当前线程是IO线程
assert !registered || eventLoop.inEventLoop();
}
recvBufAllocHandle接受缓冲区处理器
缓冲区分配上次说过了,出站缓冲区以前文章也有讲过。
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
register注册到事件循环
注册方法其实就是判断是否当前线程就是IO
线程,是的话就直接执行,不是就包装成一个任务提交给IO
线程,这样就避免多线程的问题,始终是单线程操作。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {//是否已经注册人到一个eventLoop
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {//是否是NioEventLoop类型
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里
if (eventLoop.inEventLoop()) {
register0(promise);
} else {//否则就当做任务提交给eventLoop的任务队列
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0执行注册逻辑
这里是注册过程要做的事,进行真正的注册逻辑doRegister
,其实就是将NIO
通道注册到Selector
上,然后进行处理器的待添加事件的处理,注册回调成功,管道传递注册事件,如果是第一次注册,管道传递通道激活事件,否则是设置自动读的话就注册读监听。
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {//确保是不可取消和通道打开着,否则就返回
return;
}
boolean firstRegistration = neverRegistered;//设置注册标记
doRegister();//进行注册逻辑
neverRegistered = false;//AbstractUnsafe的已注册标记
registered = true;//channel的已注册标记
pipeline.invokeHandlerAddedIfNeeded();//如果在注册前有处理器添加,还没进行HandlerAdded回调,注册成功后要回调
safeSetSuccess(promise);//回调注册成功
pipeline.fireChannelRegistered();//通道注册事件传递
if (isActive()) {//通道激活的话
if (firstRegistration) {//第一次注册要进行激活事件传递
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {//否则如果设置了自动读,就进行读监听
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();//强制关闭
closeFuture.setClosed();//关闭回调
safeSetFailure(promise, t);//设置失败
}
}
bind绑定地址
省略了部分。看主逻辑,做具体的doBind
,如果通道开始没激活,绑定后激活的话,就开一个延时的任务,进行激活事件传递,最后回调绑定成功。
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {//绑定前没激活,绑定后激活了
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
disconnect断开连接
调用doDisconnect
,断开连接,如果开始激活的,断开后失效了,就传递失效事件。如果通道关闭了,还要处理关闭事件closeIfClosed
。
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();
// Reset remoteAddress and localAddress
remoteAddress = null;
localAddress = null;
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
close关闭通道和出站缓冲区
进行通道的关闭,主要还是出站缓冲区的处理和传递通道失效和注销事件。
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
ClosedChannelException closedChannelException = new ClosedChannelException();
close(promise, closedChannelException, closedChannelException, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {//如果已经发起关闭了
if (closeFuture.isDone()) {//判断是否关闭完成
// Closed already.
safeSetSuccess(promise);//回调
} else if (!(promise instanceof VoidChannelPromise)) {
closeFuture.addListener(new ChannelFutureListener() {//如果不是VoidChannelPromise,添加关闭监听
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;//已经开始关闭了
//处理出站缓冲区关闭
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise);
} finally {
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
doClose0关闭通道
具体的关闭逻辑和回调,具体逻辑是在通道中实现的,后面会讲。
private void doClose0(ChannelPromise promise) {
try {
doClose();
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
fireChannelInactiveAndDeregister传递通道失效和注销事件
传递通道失效和注销事件。
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
doDeregister注销事件
提交一个任务,进行注销doDeregister
,然后根据情况传递通道失效和注销事件。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
shutdownOutput出站缓冲区关闭处理
清理出站缓冲区ChannelOutboundBuffer
,并传递fireUserEventTriggered
事件。
@UnstableApi
public final void shutdownOutput(final ChannelPromise promise) {
assertEventLoop();
shutdownOutput(promise, null);
}
private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
if (!promise.setUncancellable()) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {//如果出站缓冲区为null的话,就回调失败
promise.setFailure(new ClosedChannelException());
return;
}
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.禁止添加数据到出站缓冲区了
final Throwable shutdownCause = cause == null ?//根据异常创建ChannelOutputShutdownException
new ChannelOutputShutdownException("Channel output shutdown") :
new ChannelOutputShutdownException("Channel output shutdown", cause);
Executor closeExecutor = prepareToClose();//有关闭执行器
if (closeExecutor != null) {//提交一个任务
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the shutdown.
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {//出站缓冲区事件任务
// Dispatch to the EventLoop
eventLoop().execute(new Runnable() {
@Override
public void run() {//出站缓冲区事件处理
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
});
}
}
});
} else {
try {//直接处理关闭
// Execute the shutdown.
doShutdownOutput();
promise.setSuccess();
} catch (Throwable err) {
promise.setFailure(err);
} finally {
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
}
}
}
private void closeOutboundBufferForShutdown(
ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
buffer.failFlushed(cause, false);//不能冲刷
buffer.close(cause, true);//关闭出站缓冲区
pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);//传递事件
}
好像有点长了,下一篇继续吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。