2023-08-02  阅读(1)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

通过上一章的讲解,大家应该对Netty的基本使用以及整体架构有了初步了解。从本章开始,我将正式分析Netty的底层原理并对部分源码进行讲解。

我们在使用Netty时,首先就需要使用Bootstrap(客户端)和ServerBootstrap(服务端)来对Netty中的各类核心组件进行装配。本章,我就对Bootstrap和ServerBootstrap的底层原理进行讲解。

一、Bootstrap启动流程

我们先来回顾一下Netty中的 AbstractBootstrap 类,它是BootstrapServerBootstrap的抽象父类,封装了一些公有方法:

202308022224568971.png

事实上,即时我们不使用Bootstrap,也可以手动创建Channel、完成各种设置和启动、注册到EventLoop,但是整个过程会比较麻烦。所以,通常都是基于Bootstrap工具类完成Netty核心组件的拼装。

Bootstrap的启动流程,也就是Netty组件的组装、配置,以及Netty服务端或客户端的启动流程。本节,我以服务端ServerBootstrap的使用为例,讲解Netty Server的启动流程,一共可以分为以下几个步骤:

  • 配置 EventLoopGroup 线程组;
  • 配置 Channel 的类型;
  • 设置 ServerSocketChannel 对应的 Handler;
  • 设置网络监听的端口;
  • 设置 SocketChannel 对应的 Handler;
  • 配置 Channel 参数;
  • 启动 Netty Server。

202308022224581602.png

1.1 分配Reactor线程池

首先,创建一个服务端的ServerBootstrap实例,ServerBootstrap支持无参构造函数(后续小节分析ServerBootstrap源码时我会详细讲解它的内部构造):

    // 创建一个服务端的启动器
    ServerBootstrap bootstrap = new ServerBootstrap();

接着,创建两个Reactor线程池,并赋值给ServerBootstrap启动器实例:

    // boss线程池
    EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
    // worker线程池
    EventLoopGroup workerLoopGroup = new NioEventLoopGroup(10);
    // 分配线程池
    bootstrap.group(bossLoopGroup, workerLoopGroup);

上述这两个NioEventLoopGroup线程池,一个负责监听客户端的连接事件并建立连接,名为bossLoopGroup;另一个负责监听读写IO事件和Handler业务处理,名为workerLoopGroup。

事实上,ServerBootstrap支持在单线程多线程主从多线程间切换,也就是也可以只配置一个线程池,并且可以控制线程池中的线程数量。这种灵活的配置方式可以最大程度地满足不同用户的个性化定制需求。

1.2 设置父Channel类型

Netty不止支持Java NIO,也支持阻塞式IO(也叫BIO或OIO)。下面配置的是Java NIO类型的Channel,方法如下:

    bootstrap.channel(NioServerSocketChannel.class);

当然,我们可以指定OioServerSocketChannel.class等等。

ServerBootstrap内部会通过反射的方式创建NioServerSocketChannel对象,NioServerSocketChannel本质是对java.nio.channels.ServerSocketChannel的封装。

1.3 设置父Channel的Handler

设置 ServerSocketChannel 对应的 Handler:

    bootstrap.handler(new LoggingHandler(LogLevel.INFO));    // LoggingHandler是自定义的Handler

1.4 设置父Channel监听端口

底层本质是通过java.net.ServerSocket监听端口:

    bootstrap.localAddress(new InetSocketAddress(port));

1.5 配置父Channel参数

通过ServerBootstrap的option()方法,可以对NioServerSocketChannel进行参数配置,底层本质是设置ServerSocket的参数:

    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

1.6 配置子Channel参数

通过ServerBootstrap的childOption()方法,可以对每一个NioSocketChannel进行参数配置,本质也是设置底层的Socket参数:

    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

1.7 配置子Channel的Pipeline

父Channel会负责连接的建立,连接建立完成后都会创建一个新的子Channel,即 NioSocketChannel 。每一个子Channel都拥有自己的独立Pipeline,我们可以对它进行装配:

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        // 建立客户端连接时,会创建一个子通道并初始化
        protected void initChannel(SocketChannel ch) throws Exception {
            // 向子通道的流水线添加Handler业务处理器
            ch.pipeline().addLast(new XXXXHandler());
        }
    });

上述ChannelInitializer.initChannel()方法会在子Channel被创建后调用。另外,父Channel也就是NioServerSocketChannel,也是拥有Pipeline的,但是它的处理逻辑是固定的:接受新连接,创建子通道,初始化子通道,所以不需要特别配置。如果我们希望NioServerSocketChannel在建立连接后进行特殊的业务处理,可以使用ServerBootstrap.handler(ChannelHandler handler)方法,为父通道设置ChannelInitializer初始化器,后面分析源码时会详细讲解。

NioSocketChannel本质是对java.nio.channels.SocketChannel的封装。

1.8 绑定并启动

ServerBootstrap的bind()方法会绑定端口并返回一个ChannelFuture对象,因为整个过程是异步的,所以调用ChannelFuture.sync()同步等待绑定过程的完成:

    ChannelFuture channelFuture = bootstrap.bind().sync();
    Logger.info(" 服务器启动成功,监听端口: " + channelFuture.channel().localAddress());

至此,Netty Server启动完成。

Netty中的IO操作,都会返回异步任务实例(如ChannelFuture实例),可以通过阻塞等待或者增加事件监听器两种方式,获得IO操作的真正结果。

1.9 等待Channel关闭

如果要阻塞当前线程直到通道关闭,可以使用Channel.closeFuture()方法,获取通道关闭的异步任务。然后调用sync()方法,阻塞等待直到通道被关闭:

    ChannelFuture closeFuture = channelFuture.channel().closeFuture();
    closeFuture.sync();

另外还需要注意,关闭Channel后,需要释放Reactor线程池资源:

    // 释放掉所有资源,包括创建的反应器线程
    workerLoopGroup.shutdownGracefully();
    bossLoopGroup.shutdownGracefully();

关闭EventLoopGroup线程池时,会关闭内部的EventLoop线程,也会关闭内部的Selector选择器、轮询线程以及所有子通道。在子通道关闭后,会释放掉底层的资源,如TCP Socket文件描述符等。

二、ServerBootstrap源码分析

我们已经从使用层面了解了ServerBootstrap的启动流程,再来从源码层面分析ServerBootstrap的机理。ServerBootstrap本质是对父类AbstractBootstrap的增强,增加了对主从Reactor模式的支持。

2.1 构造

ServerBootstrap提供了两种类型的构造器,我们一般都是使用无参构造器:

    // ServerBootstrap.java
    
    public ServerBootstrap() { }
    
    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        childAttrs.putAll(bootstrap.childAttrs);
    }

父类AbstractBootstrap的构造:

    // AbstractBootstrap.java
    
    AbstractBootstrap() {
    }
    
    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        attrs.putAll(bootstrap.attrs);
    }

2.2 group方法

ServerBootstrap的group方法用于分配EventLoopGroup线程池,通过方法的重载信息可以看出,Netty可以在单线程多线程主从多线程之间切换:

    // ServerBootstrap.java
    
    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    
        private volatile EventLoopGroup childGroup;
    
        @Override
        public ServerBootstrap group(EventLoopGroup group) {
            // 如果只使用一个线程池,则主从Reactor模式退化为单Reactor模式
            return group(group, group);
        }
    
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
            return this;
        }
        //...
    }

来看父类AbstractBootstrap的group(EventLoopGroup group)方法:

    // AbstractBootstrap.java
    
    volatile EventLoopGroup group;
    
    public B group(EventLoopGroup group) {
        ObjectUtil.checkNotNull(group, "group");
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

也就是说,Main Reactor线程池本质是在父类中设置的,ServerBootstrap只是配置子线程池和相关参数。

2.3 channel方法

ServerBootstrap的channel方法继承自父类AbstractBootstrap:

    // AbstractBootstrap.java
    
    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
    
        this.channelFactory = channelFactory;
        return self();
    }

其实就是实例化了一个用于创建Channel的工厂,后续ServerBootstrap调用bind方法时会通过反射创建NioServerSocketChannel对象:

    // ReflectiveChannelFactory.java
    
    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Constructor<? extends T> constructor;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            ObjectUtil.checkNotNull(clazz, "clazz");
            try {
                this.constructor = clazz.getConstructor();
            } 
            //...
        }
    
        @Override
        public T newChannel() {
            try {
                // 通过反射调用Channel的无参构造函数,完成对象实例化
                return constructor.newInstance();
            } 
            //...
        }
    }

我们来看下NioServerSocketChannel的构造就会明白,它的底层实际是封装了JDK的ServerSocketChannel:

    // NioServerSocketChannel.java
    
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    public NioServerSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                "Failed to open a server socket.", e);
        }
    }

SelectorProvider 是 JDK NIO 中的抽象类实现,通过openServerSocketChannel()方法可以创建服务端的 ServerSocketChannel。SelectorProvider 会根据OS类型和版本的不同,返回不同的实现类。

2.4 option和attr方法

option用于对父Channel(即NioServerSocketChannel)本身的底层TCP参数进行配置,而attr用于配置父Channel的自定义属性(后续可以可以在Pipineline中使用这些属性)。

这个两个方法同样是父类AbstractBootstrap的方法:

    // AbstractBootstrap.java
    
    public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
        private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
        private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    
        public <T> B option(ChannelOption<T> option, T value) {
            ObjectUtil.checkNotNull(option, "option");
            synchronized (options) {
                if (value == null) {
                    options.remove(option);
                } else {
                    options.put(option, value);
                }
            }
            return self();
        }
    
        public <T> B attr(AttributeKey<T> key, T value) {
            ObjectUtil.checkNotNull(key, "key");
            if (value == null) {
                attrs.remove(key);
            } else {
                attrs.put(key, value);
            }
            return self();
        }    
    }

AbstractBootstrap只是将这些配置值保存到内部的字段中,后续初始化Channel时再使用它们。

AttributeKey

通过ServerBootstrap.attr()设置的Channel自定义属性,都保存到了AbstractBootstrap内部的一个ConcurrentHashMap中,这个Map的键是AttributeKey。ServerBootstrap在后续的初始化Channel过程中,会将这些属性值设置到Channel中。

Netty中有一个AttributeMap接口,根据接口契约,它的实现类必须是线程安全的,attr方法的入参就是一个AttributeKey对象,泛型用来指明Value值类型,返回的是一个Attribute对象,内部封装了实际的Value值:

    public interface AttributeMap {
        /**
         * 获取指定Key对应的Value,Value的类型即泛型T
         */
        <T> Attribute<T> attr(AttributeKey<T> key);
    
        /**
         * 指定的Key是否存在
         */
        <T> boolean hasAttr(AttributeKey<T> key);
    }

我们使用AttributeMap时,本质把原始Key封装成AttributeKey,原始Value封装成Attribute:

202308022224599653.png

Netty中的所有Channel都实现了AttributeMap接口:

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { }

所以,我们可以在自定义的ChannelHandler业务处理器中直接使用AttributeKey,比如:

    public class DispatcherHandler extends ChannelInboundHandlerAdapter {
    
        private AttributeKey<String> key = AttributeKey.valueOf("Id");
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Attribute<String> channelAttr = ctx.channel().attr(key);
            // 将Value值设置为1
            channelAttr.set("1");
    
            Attribute<String> contextAttr = ctx.attr(key);
            assert contextAttr.get() == "1"
        }
    }

注意,在Netty 4.1版本以后ChannelHandlerContextChannelattr方法设置的属性作用域是完全相同的,也就是说:

    Channel.attr() == ChannelHandlerContext.attr()

我们可以看下AbstractChannelHandlerContext的attr方法,内部就是先获取所属的Channel:

    // AbstractChannelHandlerContext.java
    
    public <T> Attribute<T> attr(AttributeKey<T> key) {
        return channel().attr(key);
    }

ChannelOption

再来看ChannelOption, ChannelOption主要是用于配置Channel底层的原生参数,比如TCP参数等等。这些参数的Key已经在ChannelOption中以静态变量的方式写死了,我们只能指定其value值,如果通过ChannelOption设置了一个不存在的Key,Netty会以日志形式提示错误信息,但是不会抛出异常。

我们通过ServerBootstrap.option()的代码可以看到,ServerBootstrap会将属于Channel的option对象和value存放到内部的LinkedHashMap中。后续当ServerBootstrap 绑定到具体的端口时,在其init()方法当中,会将这个Map中的每一项绑定到具体Channel中:

    // ServerBootstrap.java
    
    @Override
    void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger);
        //...
    }
    // AbstractBootstrap.java
    
    static void setChannelOptions(
        Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
        // 遍历并为Channel设置参数
        for (Map.Entry<ChannelOption<?>, Object> e: options) {
            setChannelOption(channel, e.getKey(), e.getValue(), logger);
        }
    }
    
    private static void setChannelOption(
        Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
        try {
            if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
                logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
            }
        }
        //...
    }

内部调用了Channel的config()方法,返回一个ChannelConfig对象,然后调用该对象的setOption方法:

    // DefaultServerSocketChannelConfig.java
    
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);
        // 限定了Option只能是指定范围的类型
        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            return super.setOption(option, value);
        }
        return true;
    }

最后,我们来看下常用的ChannelOption有哪些:

SO_RCVBUF / SO_SNDBUF
此为TCP参数。每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的这两个缓冲区大小的。TCP的全双工的工作模式以及TCP的滑动窗口便是依赖于这两个独立的缓冲区及其填充的状态。

TCP_NODELAY
此为TCP参数。表示是否立即发送数据,默认值为True(Netty默认为true,而操作系统默认为false)。该值用于设置是否关闭Nagle算法,该算法将小的碎片数据连接成更大的报文(或数据包)来最小化所发送报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输的延时。

SO_KEEPALIVE
此为TCP参数。表示底层TCP协议的心跳机制。true为连接保持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s,即2小时。Netty默认关闭该功能。

SO_REUSEADDR
此为TCP参数。设置为true时表示地址复用,默认值为false。有四种情况需要用到这个参数设置:

  1. 当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而我们希望启动的程序的socket2要占用该地址和端口。例如在重启服务且保持先前端口时;
  2. 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同;
  3. 单个进程绑定相同的端口到多个socket(套接字)上,但每个socket绑定的IP地址不同;
  4. 完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。

SO_LINGER
此为TCP参数。表示关闭socket的延迟时间,默认值为-1,表示禁用该功能:

  • -1表示socket.close()方法立即返回,但操作系统底层会将发送缓冲区全部发送到对端;
  • 0表示socket.close()方法立即返回,操作系统放弃发送缓冲区的数据,直接向对端发送RST包,对端收到复位错误。
  • 正整数值表示调用socket.close()方法的线程被阻塞,直到延迟时间到来、发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

SO_BACKLOG
此为TCP参数。表示服务器端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。Linux中默认为128。如果连接建立频繁,服务器处理新连接较慢,可以适当调大这个参数。

SO_BROADCAST
此为TCP参数。表示设置广播模式。

这里提一句,笔者本人非常不喜欢Netty的AttributeKey和ChannelOption的设计(包含很多其它开源框架也有类似问题,比如Eureka),明明简单的KV存储就可以搞定的事情,非得整了一大套框架代码,搞得过于晦涩!其实完全可以像Kafka那样只使用常量、枚举、原生Map作为配置项,辅以文档说明,清晰简洁。

2.5 childOption和childAttr方法

childOption和childAttr这两个方法与上一节中的option和attr方法类似,只不过它们是作用余子Channel,也就是NioSocketChannel:

    // ServerBootstrap.java
    
    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    
        private final Map<ChannelOption<?>, Object> childOptions = 
            new LinkedHashMap<ChannelOption<?>, Object>();
    
        private final Map<AttributeKey<?>, Object> childAttrs = 
            new ConcurrentHashMap<AttributeKey<?>, Object>();
    
        public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
            ObjectUtil.checkNotNull(childOption, "childOption");
            synchronized (childOptions) {
                if (value == null) {
                    childOptions.remove(childOption);
                } else {
                    childOptions.put(childOption, value);
                }
            }
            return this;
        }
    
        public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
            ObjectUtil.checkNotNull(childKey, "childKey");
            if (value == null) {
                childAttrs.remove(childKey);
            } else {
                childAttrs.put(childKey, value);
            }
            return this;
        }    
    }

ServerBootstrap只是将这些配置值保存到内部的字段中,后续初始化Channel时再使用它们。

2.6 childHandler方法

ServerBootstrap的childHandler方法用于给子Channel分配一个业务处理器:

    // ServerBootstrap.java
    
    private volatile ChannelHandler childHandler;
    
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

我们在装配ServerBootstrap时,一般会使用 ChannelInitializer 这个类,关于它的作用我这里简单提一下,后续对ChannelHandler源码讲解时再深入说明。

Netty中的每一个Channel通道,都拥有一条自己的Pipeline流水线,流水线负责装配自己的Handler业务处理器。那么,什么时候进行装配呢?一般是在Channel初始化时就完成的。比如下面的示例代码:

    // 装配子Channel流水线
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        // 有连接到达时,会创建一个NioSocketChannel
        protected void initChannel(SocketChannel ch) throws Exception {
            // 为这个NioSocketChannel的Pipeline流水线添加一个Handler业务处理器
            ch.pipeline().addLast(new NettyDiscardHandler());
        }
    });

2.7 bind方法

ServerBootstrap的bind方法最为复杂,核心流程可以分为四个阶段:

  1. 创建NioServerSocketChannel对象;
  2. 初始化NioServerSocketChannel对象,比如设置Channel参数,装配Pipeline等等;
  3. 将NioServerSocketChannel注册到EventLoopGroup的某个EventLoop中;
  4. 为NioServerSocketChannel执行端口绑定。

整个流程大量运用了ChannelFuture,所以比较晦涩,我们来通过源码看一下:

    // AbstractBootstrap.java
    
    public ChannelFuture bind() {
        validate();
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress);
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 1.创建、初始化、注册Channel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        // 2.1 如果注册完成
        if (regFuture.isDone()) {
            // 3.绑定端口
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } 
        // 2.2 如果没有注册完成
        else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 添加一个回调监听器
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        // 3.绑定端口
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

其实核心步骤就是在initAndRegisterdoBind0这两个方法中完成的: initAndRegister() 负责 Channel 初始化和注册,doBind0() 用于端口绑定。

initAndRegister

initAndRegister方法分为三个步骤:

  1. 通过ChannelFactory工厂创建了Channel;
  2. 对Channel进行初始化配置;
  3. 从EventLoopGroup中选择一个EventLoop,注册该Channel。
    // AbstractBootstrap.java
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 1.创建一个Channel,对于ServerBootstrap来说,一般是NioServerSocketChannel
            channel = channelFactory.newChannel();
            // 2.初始化Channel
            init(channel);
        } catch (Throwable t) {
            //...
        }
        // 3.注册Channel到Selector(每个NioEventLoop内部都有一个java.nio.channels.Selector)
        ChannelFuture regFuture = config().group().register(channel);
        return regFuture;
    }

EventLoopGroup中的每一个EventLoop对象内部都封装了java.nio.channels.Selector。

我们来看下ServerBootstrap.init()方法是如何对Channel进行初始化的:

    // ServerBootstrap.java
    
    @Override
    void init(Channel channel) {
        // 1.为ServerSocketChannel配置TCP等参数
        setChannelOptions(channel, newOptionsArray(), logger);
        // 2.为ServerSocketChannel配置自定义属性
        setAttributes(channel, newAttributesArray());
    
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    
        // 3.装配pipeline流水线
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                // 注意:这里的ch就是上面的ServerSocketChannel
                final ChannelPipeline pipeline = ch.pipeline();
                // 这个Handler就是我们通过ServerBootstrap.handler(XXX)装配的
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    // 将自定义的Handler添加到Pipeline中
                    pipeline.addLast(handler);
                }
                // 向ServerSocketChannel(父Channel)所属的EventLoop中提交一个异步任务
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // ServerBootstrapAcceptor用于将建立连接的SocketChannel转发给子Reactor线程池
                        pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

上面的重点是对ServerSocketChannel的pipeline流水线的装配:

  1. 首先,ServerSocketChannel初始化时,会在流水线中添加一个ChannelInitializer处理器。这是一个特殊的 入站 处理器,它的initChannel方法会在该ServerSocketChannel注册完成后被调用;
  2. ChannelInitializer的initChannel方法,会向Pipieline添加我们自定义的Handler;
  3. 接着,向所属的EventLoop中提交一个异步任务,这个任务的作用就是在ServerSocketChannel的流水线中添加一个ServerBootstrapAcceptor处理器。ServerBootstrapAcceptor也是一个特殊的 入站 处理器,它的作用就是当建立新的SocketChannel连接时,将SocketChannel注册到子Reactor线程池中的一个EventLoop上;
  4. 最后,ChannelInitializer会被从ServerSocketChannel的流水线中移除,防止多次执行。

也就是说,当ServerSocketChannel初始化并注册完成后,它的Pipeline流水线最终只有我们自定义的Handler和一个ServerBootstrapAcceptor处理器:

202308022225011094.png

doBind0

ServerSocketChannel的初始化和注册完成后,还需要进行最后一步操作——绑定端口。整个流程的核心操作就是:调用 JDK 底层进行端口绑定,并触发Pipeline的channelActive事件,把OP_ACCEPT事件注册到 Channel 的事件集合中。

    // AbstractBootstrap.java
    
    private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    
        // 向ServerSocketChannel所属的EventLoop中提交一个异步任务
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

上述整个流程是异步的,也就是说只是向EventLoop(确切说是NioEventLoop)提交了一个异步任务,EventLoop内部包含了一个任务队列,以及唯一个工作线程,会不断的从队列取出任务执行。

我们来看实际的端口绑定操作:

    // DefaultChannelPipeline.java
    
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // 选择Pipeline中的队尾节点进行端口绑定
        return tail.bind(localAddress, promise);
    }

很奇怪,端口绑定操作竟然是在Pipeline中执行的,而且是选择了尾部的Handler执行:

    // AbstractChannelHandlerContext.java
    
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        //...
        // 从tail开始往前,找到第一个出站的Handler,此时只有ServerBootstrapAcceptor满足
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 绑定端口
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }
    
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                // 调用出站Handler的bind方法
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

最后看invokeBind操作,又回到了DefaultChannelPipeline:

    // DefaultChannelPipeline.java
    
    public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        // 关键是这里
        unsafe.bind(localAddress, promise);
    }

上面的unsafe其实是一个定义在io.netty.channel.Channel中的类,封装了Channel对底层的NIO操作,所以bind操作本质就是ServerSocketChannel的bind操作:

    // AbstractChannel.AbstractUnsafe.java
    
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
    
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        boolean wasActive = isActive();
        try {
            // 利用原生的NIO ServerSocketChannel完成端口绑定
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (!wasActive && isActive()) {
            // 完成端口绑定后,Channel处于Active状态,调用pipeline.fireChannelActive()触发channelActive事件
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
    
        safeSetSuccess(promise);
    }

最终会执行NioServerSocketChannel的doBind方法:

    // NioServerSocketChannel.java
    
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        // 获取Java NIO中的原生ServerSocketChannel绑定端口
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

从上面代码可以看出,绕了这么一大圈,本质就是用Java NIO的ServerSocketChannel完成端口的绑定。Netty之所以绕这么一大圈,是因为 端口绑定 这一操作在Netty里定义为 出站 操作,Netty中Channel相关的所有操作都会通过Pipeline流水线触发,这也是为什么了在Pipeline接口中定义bind方法的原因。

此外,端口绑定完成后,会触发Pipeline的channelActive事件:

    // DefaultChannelPipeline.java
    
    public final ChannelPipeline fireChannelActive() {
        // 从head开始触发channelActive事件
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    // AbstractChannelHandlerContext.java
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

可以看到,事件从Head节点开始触发,执行完 channelActive 事件传播之后,Head节点会调用readIfIsAutoRead()方法触发 Channel 的 read 事件:

    // DefaultChannelPipeline.HeadContext.java
    
    public void channelActive(ChannelHandlerContext ctx) {
        // 传播channelActive事件
        ctx.fireChannelActive();
    
        readIfIsAutoRead();
    }
    
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

最终调用到 AbstractNioChannel 中的 read() 方法,又从Pipieline的tail节点开始触发传递read事件,注意这个read是一个Outbound出站事件:

    // AbstractNioChannel.java
    
    public Channel read() {
        pipeline.read();
        return this;
    }
    // DefaultChannelPipeline.java
    
    public final ChannelPipeline read() {
        // 触发传递read出站事件
        tail.read();
        return this;
    }
    // AbstractChannelHandlerContext.java
    
    public ChannelHandlerContext read() {
        // 获取下一个Outbound Handler
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 触发read事件
            next.invokeRead();
        } else {
            Tasks tasks = next.invokeTasks;
            if (tasks == null) {
                next.invokeTasks = tasks = new Tasks(next);
            }
            executor.execute(tasks.invokeReadTask);
        }
        return this;
    }

最终会传递到head节点:

    // DefaultChannelPipeline.HeadContext.java
    
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

底层最终调用AbstractNioChannel的doBeginRead方法,:

    // AbstractNioChannel.java
    
    protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

上述的readInterestOp参数就是在前面初始化 ServerSocketChannel 所传入的SelectionKey.OP_ACCEPT事件,所以至此EventLoop才会开始监听该ServerSocketChannel上的OP_ACCEPT事件。

2.8 ServerBootstrapAcceptor

最后,我们来看下ServerBootstrapAcceptor这个 入站 Handler处理器,它的作用就是当NioServerSocketChannel完成新的SocketChannel连接建立后,将这些Channel注册到子Reactor线程池中:

    // ServerBootstrap.java
    
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
        private final Runnable enableAutoReadTask;
    
        ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
            enableAutoReadTask = new Runnable() {
                @Override
                public void run() {
                    channel.config().setAutoRead(true);
                }
            };
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 1.收到一个新建立连接的SocketChannel
            final Channel child = (Channel) msg;
            // 2.在Channel的pipeline中添加业务处理器
            child.pipeline().addLast(childHandler);
            // 3.配置Channel的参数和自定义属性
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);
    
            try {
                // 4.向子Reactor线程池(也就是子EventLoopGroup)注册该Channel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        //...
    }

服务端ServerSocketChannel 的 channelRead 事件只会在新连接接入时触发。

ServerBootstrapAcceptor 通过childGroup.register()方法,将 NioSocketChannel 注册到 Worker 工作线程中,并注册OP_READ事件到 NioSocketChannel 的事件集合。

关于服务端如何处理客户端新建连接的具体源码(ServerBootstrap.register()),我这里不贴了,它的内部会调用pipeline.fireChannelRegistered()方法传播channelRegistered事件,然后再调用 pipeline.fireChannelActive()方法传播channelActive事件,最终会将SelectionKey.OP_READ事件注册到 Channel 的事件集合。


至此,ServerBootstrap的源码就分析完了,从源码层面看Netty Server的启动流程, 对后续Netty的深入使用非常有帮助,我总结一下整个流程:

  • 创建服务端 Channel :本质是创建 JDK 底层原生的 Channel,并初始化几个重要的属性,包括 id、unsafe、pipeline 等;
  • 初始化服务端 Channel :设置 Socket 参数以及用户自定义属性,并添加两个特殊的处理器 ChannelInitializer 和 ServerBootstrapAcceptor;
  • 注册服务端 Channel :调用 JDK 底层将 Channel 注册到 Selector 上;
  • 端口绑定 :调用 JDK 底层进行端口绑定,并触发 channelActive 事件,把OP_ACCEPT事件注册到 Channel 的事件集合中。

至于客户端使用的Bootstrap,底层源码和ServerBootstrap是类似的,我就不开展了,读者可以自行阅读Netty源码。

三、总结

本章,我对Bootstrap装配类的启动流程以及ServerBootstrap源码进行了深入分析和讲解。最后,来总结一下:

  1. ServerBootstrap支持主从Reactor模式,我们可以配置主从Reactor线程池,本质都是EventLoopGroup对象;
  2. EventLoopGroup内部包含了很多EventLoop对象,而每个EventLoop都封装了Java NIO中的Selector选择器,同时包含一个工作线程,一个任务队列,一个任务执行器;
  3. ServerBootstrap会创建并初始化ServerSocketChannel,然后注册到主Reactor线程池内部的一个EventLoop中,EventLoop中的工作线程会监听该Channel上的OP_ACCEPT事件,当发生该事件时会进行新连接的建立,并将建立成功后新连接——SocketChannel,在ServerSocketChannel的pipeline中传播处理;
  4. ServerSocketChannel的pipeline中的尾部有一个特殊的 入站 处理器——ServerBootstrapAcceptor,当它接收到SocketChannel后,会将该Channel注册到子Reactor线程池中的一个EventLoop中,由该EventLoop的pipeline进行处理。

可以用下面这张图来描述上述的整个流程,里卖弄还有很多关于NioEventLoopGroup和NioEventLoop的细节,我会在下一章详解讲解:

202308022225020575.png


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文