2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104784155

Netty的Unsafe接口

这个Unsafe可不是JDK原生的Unsafe哦,主要就是一些直接跟IO底层直接相关的通用操作:

       interface Unsafe {
    
            // 接受数据的时候用于分配字节缓冲区的处理器
    
            RecvByteBufAllocator.Handle recvBufAllocHandle();
    
            // 本地地址
    
            SocketAddress localAddress();
    
            // 远程地址
    
            SocketAddress remoteAddress();
    
            //向事件循环注册通道,完成后回调
    
            void register(EventLoop eventLoop, ChannelPromise promise);
    
            // 绑定本地地址,完成后回调
    
            void bind(SocketAddress localAddress, ChannelPromise promise);
    
            // 连接
    
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    
            // 断开连接,完成回调
    
            void disconnect(ChannelPromise promise);
    
            // 关闭连接,完成回调
    
            void close(ChannelPromise promise);
    
            // 立即关闭,不触发任何事件
    
            void closeForcibly();
    
            // 注销,完成回调
    
            void deregister(ChannelPromise promise);
    
            // 开始读操作
    
            void beginRead();
    
            // 写操作
             
            void write(Object msg, ChannelPromise promise);
    
            // 冲刷所有的出站数据
             
            void flush();
    
            // 特殊的占位符,不接受通知
    
            ChannelPromise voidPromise();
    
            //写操作的出站缓冲区
    
            ChannelOutboundBuffer outboundBuffer();
        }

AbstractUnsafe基本抽象实现

属性

这些就是一些基本的属性,要进行数据的读写,需要有接收缓冲区,所以有了recvHandle处理器,写出去的时候需要有写缓冲区ChannelOutboundBuffer ,注意ChannelOutboundBuffer是初始化的时候就会创建,就创建一次。

    //出站字节缓冲区
            private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
            private RecvByteBufAllocator.Handle recvHandle;//接受数据缓冲分配器的处理器
            private boolean inFlush0;//是否正在缓冲
            /** true if the channel has never been registered, false otherwise */
            private boolean neverRegistered = true;//通道没注册过
    
            private void assertEventLoop() {//断言还没注册,或者当前线程是IO线程
                assert !registered || eventLoop.inEventLoop();
            }

recvBufAllocHandle接受缓冲区处理器

缓冲区分配上次说过了,出站缓冲区以前文章也有讲过。

     @Override
            public RecvByteBufAllocator.Handle recvBufAllocHandle() {
                if (recvHandle == null) {
                    recvHandle = config().getRecvByteBufAllocator().newHandle();
                }
                return recvHandle;
            }

register注册到事件循环

注册方法其实就是判断是否当前线程就是IO线程,是的话就直接执行,不是就包装成一个任务提交给IO线程,这样就避免多线程的问题,始终是单线程操作。

      @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
                if (isRegistered()) {//是否已经注册人到一个eventLoop
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {//是否是NioEventLoop类型
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    //只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {//否则就当做任务提交给eventLoop的任务队列
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }

register0执行注册逻辑

这里是注册过程要做的事,进行真正的注册逻辑doRegister,其实就是将NIO通道注册到Selector上,然后进行处理器的待添加事件的处理,注册回调成功,管道传递注册事件,如果是第一次注册,管道传递通道激活事件,否则是设置自动读的话就注册读监听。

     private void register0(ChannelPromise promise) {
                try {
                
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {//确保是不可取消和通道打开着,否则就返回
                        return;
                    }
                    boolean firstRegistration = neverRegistered;//设置注册标记
                    doRegister();//进行注册逻辑
                    neverRegistered = false;//AbstractUnsafe的已注册标记
                    registered = true;//channel的已注册标记
    
                
                    pipeline.invokeHandlerAddedIfNeeded();//如果在注册前有处理器添加,还没进行HandlerAdded回调,注册成功后要回调
    
                    safeSetSuccess(promise);//回调注册成功
                    pipeline.fireChannelRegistered();//通道注册事件传递
                  
                    if (isActive()) {//通道激活的话
                        if (firstRegistration) {//第一次注册要进行激活事件传递
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {//否则如果设置了自动读,就进行读监听
                          
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                 
                    closeForcibly();//强制关闭
                    closeFuture.setClosed();//关闭回调
                    safeSetFailure(promise, t);//设置失败
                }
            }

bind绑定地址

省略了部分。看主逻辑,做具体的doBind,如果通道开始没激活,绑定后激活的话,就开一个延时的任务,进行激活事件传递,最后回调绑定成功。

           @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    			...
    
                boolean wasActive = isActive();
                try {
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                if (!wasActive && isActive()) {//绑定前没激活,绑定后激活了
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
            }

disconnect断开连接

调用doDisconnect,断开连接,如果开始激活的,断开后失效了,就传递失效事件。如果通道关闭了,还要处理关闭事件closeIfClosed

       @Override
            public final void disconnect(final ChannelPromise promise) {
                assertEventLoop();
    
                if (!promise.setUncancellable()) {
                    return;
                }
    
                boolean wasActive = isActive();
                try {
                    doDisconnect();
                    // Reset remoteAddress and localAddress
                    remoteAddress = null;
                    localAddress = null;
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                if (wasActive && !isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelInactive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
                closeIfClosed(); // doDisconnect() might have closed the channel
            }

close关闭通道和出站缓冲区

进行通道的关闭,主要还是出站缓冲区的处理和传递通道失效和注销事件。

     @Override
            public final void close(final ChannelPromise promise) {
                assertEventLoop();
    
                ClosedChannelException closedChannelException = new ClosedChannelException();
                close(promise, closedChannelException, closedChannelException, false);
            }
    
       		private void close(final ChannelPromise promise, final Throwable cause,
                               final ClosedChannelException closeCause, final boolean notify) {
                if (!promise.setUncancellable()) {
                    return;
                }
    
                if (closeInitiated) {//如果已经发起关闭了
                    if (closeFuture.isDone()) {//判断是否关闭完成
                        // Closed already.
                        safeSetSuccess(promise);//回调
                    } else if (!(promise instanceof VoidChannelPromise)) { 
                        closeFuture.addListener(new ChannelFutureListener() {//如果不是VoidChannelPromise,添加关闭监听
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                promise.setSuccess();
                            }
                        });
                    }
                    return;
                }
    
                closeInitiated = true;//已经开始关闭了
                //处理出站缓冲区关闭
                final boolean wasActive = isActive();
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
                Executor closeExecutor = prepareToClose();
                if (closeExecutor != null) {
                    closeExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // Execute the close.
                                doClose0(promise);
                            } finally {
                              
                                invokeLater(new Runnable() {
                                    @Override
                                    public void run() {
                                        if (outboundBuffer != null) {
                                            // Fail all the queued messages
                                            outboundBuffer.failFlushed(cause, notify);
                                            outboundBuffer.close(closeCause);
                                        }
                                        fireChannelInactiveAndDeregister(wasActive);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    try {
                     
                        doClose0(promise);
                    } finally {
                        if (outboundBuffer != null) {
                            // Fail all the queued messages.
                            outboundBuffer.failFlushed(cause, notify);
                            outboundBuffer.close(closeCause);
                        }
                    }
                    if (inFlush0) {
                        invokeLater(new Runnable() {
                            @Override
                            public void run() {
                                fireChannelInactiveAndDeregister(wasActive);
                            }
                        });
                    } else {
                        fireChannelInactiveAndDeregister(wasActive);
                    }
                }
            }
doClose0关闭通道

具体的关闭逻辑和回调,具体逻辑是在通道中实现的,后面会讲。

    private void doClose0(ChannelPromise promise) {
                try {
                    doClose();
                    closeFuture.setClosed();
                    safeSetSuccess(promise);
                } catch (Throwable t) {
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
fireChannelInactiveAndDeregister传递通道失效和注销事件

传递通道失效和注销事件。

     private void fireChannelInactiveAndDeregister(final boolean wasActive) {
                deregister(voidPromise(), wasActive && !isActive());
            }
doDeregister注销事件

提交一个任务,进行注销doDeregister,然后根据情况传递通道失效和注销事件。

     private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
                if (!promise.setUncancellable()) {
                    return;
                }
    
                if (!registered) {
                    safeSetSuccess(promise);
                    return;
                }
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            doDeregister();
                        } catch (Throwable t) {
                            logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                        } finally {
                            if (fireChannelInactive) {
                                pipeline.fireChannelInactive();
                            }
    
                            if (registered) {
                                registered = false;
                                pipeline.fireChannelUnregistered();
                            }
                            safeSetSuccess(promise);
                        }
                    }
                });
            }

shutdownOutput出站缓冲区关闭处理

清理出站缓冲区ChannelOutboundBuffer ,并传递fireUserEventTriggered事件。

            @UnstableApi
            public final void shutdownOutput(final ChannelPromise promise) {
                assertEventLoop();
                shutdownOutput(promise, null);
            }
    
           	private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
                if (!promise.setUncancellable()) {
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {//如果出站缓冲区为null的话,就回调失败
                    promise.setFailure(new ClosedChannelException());
                    return;
                }
                this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.禁止添加数据到出站缓冲区了
    
                final Throwable shutdownCause = cause == null ?//根据异常创建ChannelOutputShutdownException
                        new ChannelOutputShutdownException("Channel output shutdown") :
                        new ChannelOutputShutdownException("Channel output shutdown", cause);
                Executor closeExecutor = prepareToClose();//有关闭执行器
                if (closeExecutor != null) {//提交一个任务
                    closeExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // Execute the shutdown.
                                doShutdownOutput();
                                promise.setSuccess();
                            } catch (Throwable err) {
                                promise.setFailure(err);
                            } finally {//出站缓冲区事件任务
                                // Dispatch to the EventLoop
                                eventLoop().execute(new Runnable() {
                                    @Override
                                    public void run() {//出站缓冲区事件处理
                                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                    }
                                });
                            }
                        }
                    });
                } else {
                    try {//直接处理关闭
                        // Execute the shutdown.
                        doShutdownOutput();
                        promise.setSuccess();
                    } catch (Throwable err) {
                        promise.setFailure(err);
                    } finally {
                        closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                    }
                }
            }
    
            private void closeOutboundBufferForShutdown(
                    ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
                buffer.failFlushed(cause, false);//不能冲刷
                buffer.close(cause, true);//关闭出站缓冲区
                pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);//传递事件
            }

好像有点长了,下一篇继续吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文