netty服务端启动流程解析

 2023-01-20
原文作者:进击的阿尉 原文地址:https://juejin.cn/post/7032868709320523783

以下思路来自黑马netty教程

JDK的nio服务端启动

因为netty底层使用的是nio,所以先回顾nio代码的流程,然后再看netty的启动的流程。 JDK的NIO代码启动主要步骤

  1. 创建ServerSocketChannel,channel配置为非阻塞,并且绑定端口
  2. 把channel关注的accept事件注册到select
  3. 循环调用select,拿到就绪的key
  4. 如果是accept事件,建立SocketChannel,并且把这个chanel关注的读写事件
  5. 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。
    public class WriteServer {
        public static void main(String[] args) throws IOException {
            // 1. channel配置为非阻塞,并且绑定端口
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(8080));
            
            // 2. 把channel关注的accept时间注册到select
            Selector selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);
    
            while(true) {
                // 3. 拿到就绪的key 
                selector.select();
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    
                    if (key.isAcceptable()) {
                        // 4. 如果是accept事件,建立socketChannel,并且把这个chanel关注的读写事件注册到select
                    } else if (key.isWritable() || key.isReadable()) {
                        // 5. 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。
                    }
                }
            }
        }
    }

使用netty的主干代码

    //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
    Selector selector = Selector.open(); 
    
    //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
    NioServerSocketChannel attachment = new NioServerSocketChannel();
    
    //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
    serverSocketChannel.configureBlocking(false);
    
    //4 启动 nio boss 线程执行接下来的操作
    
    //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
    
    //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
    
    //7 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(8080));
    
    //8 触发 channel active 事件,在 head 中关注 op_accept 事件
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);

netty的启动流程

服务端启动demo

    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.group(boss, worker);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
            }
        });
        serverSocketChannel = serverBootstrap.bind(port).sync().channel();
        serverSocketChannel.closeFuture().sync();
    } catch (InterruptedException e) {
    } finally {
    }

NioServerSocketChannel

netty的NioServerSocketChannel封装了JDK的ServerSocketChannel,NioServerSocketChannel底层还是会open这个JDK的channel。serverBootstrap.channel(NioServerSocketChannel.class) 指定了bootStrap类中的channelFactory 返回NioServerSocketChannel这个类型的channel。

bind()方法 开始启动

主要的三个步骤

  1. 初始化channel
  2. 注册channel
  3. 执行dobind0回调
    // 主干代码
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    
        // 注册channel到select
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }

初始化channel

channelFactory.newChannel() 通过channelFactory会返回NioServerSocketChannel

init()中向流水线添加了一个channel初始化处理器,这个处理器只会被执行一次,那么什么时候被执行?这个放到后面讲

    channel.pipeline().addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
    
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });

注册channel

channel首先会进行一次线程切换。当前运行的是主线程,但是注册和初始化channel操作是由eventGroup中的nio线程来执行。返回的是一个promise对象用来异步拿到结果。 eventLoop.inEventLoop() 是判断当前线程是不是nio线程,显然不是,那么主线程就会封装一个任务提交给nio线程来执行。

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        }
    }

接下来继续看 register0(promise)。JavaChannel() 拿到 NioServerSocketChannel创建的JDK ServerSocketChannel。把这个原生channel注册到eventLoop的select上,绑定附件是NioServerSocketChannel,之后可以通过附件拿到这个channel,并且这个附件应该是唯一的。

202212302204048641.png

注册完毕之后通过pipeline.invokeHandlerAddedIfNeeded(); 调用init中注册的初始化处理器,向nioChannel中添加了一个handler,在accept事件发生后执行这个handler建立连接。ServerBootstrapAcceptor继承自chnnelInboundHandlerAdapter

    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

执行doBind0回调

前面提到过注册和初始化操作是主线程提交一个任务由nio线程异步执行,提交之后返回一个promise对象。在初始化channel完成、注册channel完成之后,nio线程通过safeSetSuccess();向promise中设置一个成功值,使得该promise的回调被执行。

202212302204057342.png

    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();
    
                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });

回调中执行doBind0()方法,调用链最后就是调用原生channel进行端口绑定

    protected void doBind(SocketAddress localAddress) throws Exception {
       if (PlatformDependent.javaVersion() >= 7) {
           javaChannel().bind(localAddress, config.getBacklog());
       } else {
           javaChannel().socket().bind(localAddress, config.getBacklog());
       }
    }

在执行完doBind方法后,向select注册channel的accept事件,这里看上去是read事件呢,其实做完位运算得到的是16对应accept事件。

202212302204067423.png

202212302204082464.png

202212302204090445.png

然后到accept时间侦听到连接之后,会执行ServerBootstrapAcceptor这个handler这个处理器。

至此启动流程结束

总结

总结一下启动流程

  1. 初始化channel
  2. 把这个channel注册到select上,这个注册关注的事件是0
  3. 前面两步是异步操作,当执行完之后执行回调
  4. 在回调方法中 绑定指定的端口
  5. 注册accept事件
  6. 当由accept事件之后调用ServerBootstrapAcceptor handler建立连接