Netty4源码初学习-客户端连接

 2023-01-28
原文作者:东南小马哥 原文地址:https://juejin.cn/post/7028565848579112997

在上篇文章中,分析了启动的流程,遗留一个内部类ServerBootstrapAcceptor,

202212302204429161.png

这个类其实是一个inBound,我们服务器启动的eveentloop线程会回调其channelRead方法

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
        //这里看到的代码和服务端注册异曲同工
        //此处是按照顺序遍历获取eventloop,保证一个客户端channel对应一个eventloop
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

这部分代码得结合服务端启动的代码才更好理解,在服务端启动中,我们eventloop开启了一个线程,这个线程执行代码的关键逻辑如下,是个死循环:

    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                //此处就是监听io和执行任务cpu分配时间占比
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

ioTime/taskTime = ioRatio/(100-ioRatio);

ioTime: 监听客户端io时间花费的时间 taskTime: 任务执行花费时间 ioRatio:监听客户端io时间占比 (100-ioRatio):任务执行时间占比

按照下面的流程一路追踪,就能找到ServerBootstrapAcceptor的channelRead被执行

202212302204439782.png

read方法如下:

      @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            
            **//服务端pipeline**
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);
    
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                 //此处是关键,会回调添加到pipechannel的handler,当前就剩ServerBootstrapAcceptor这一个自定义业务
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
    
                if (exception != null) {
                    closed = closeOnReadError(exception);
    
                    pipeline.fireExceptionCaught(exception);
                }
    
                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

在ServerBootstrapAcceptor的read方法中,和服务端注册差不多,只不过channel由服务端channel,即serverSocketChannel变为客户端channenl,即socketChannel

只不过这里的区别是, 会有多个线程池(exccutor)执行客户端socket的请求,每个线程池的就一个线程 , 可以理解我多线程,我理解,采用线程池,可以通知根据线程池状态来控制线程的创建,这里是通过CAS来防止并发。

SingleThreadEventExecutor

    private void startThread() {
    //此处根据状态决定是否重新创建新线程
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }