2023-11-08
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://www.skjava.com/series/article/4619949496

经过前面几篇文章的介绍,我们掌握了 Netty 的 5 个核心组件,但是有了这 5 个核心组件 Netty 这个工厂还是无法很好的运转,因为缺少了一个最核心的组件:EventLoop,它 是 Netty 中最最核心的组件,也是 Netty 最精华的部分,它负责 Netty 中 I/O 事件的分发,也就是说,这个事件谁来做,它说了算。

再谈 Reactor 线程模型

要想弄明白 Netty 的 EventLoop,我们就必须先理解 Reactor 线程模型,Netty 高性能的奥秘就在于 Reactor 线程模型。

线程模型的优劣直接决定了系统的性能,一个好的线程模型比不好的线程模型性能会高出好多倍。而目前主流的网络框架选择的线程模型几乎都是 I/O 多路复用的方案,Netty 作为其中的佼佼者更是演绎地淋漓尽致。

Java 并发包(java.util.concurrent)大神 Doug Lea 在Scalable I/O in Java 一文中通过各个角度,循序渐进地阐述了服务端开发中 I/O 模型的演变过程,文中基于 Reactor 反应器模式的几种模型,被 Netty 、Mina 等大多数高性能 I/O 服务框架所采用的,下面就对这几种 Reactor 模型做一个简要说明。

Scalable IO in Java:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

单线程模型

  • 原版

  • 理解版

单线程模式是所有的 I/O 操作(建立连接、事件分发、数据读写等等)都是由一个线程完成,它的优点是模型够简单,实现起来容易,但是缺点也很明显:

  1. 单线程处理业务有限,无法充分利用多 CPU 的性能,性能方面存在瓶颈。
  2. 当多个事件同时触发,如果其中有某一个事件处理时间较长,则会导致其他事件无法被处理,容易造成消息积压。如果线程意外终止或者进入死循环了,将会导致整个系统不可用。

多线程模型

既然单线程模型存在性能瓶颈和可靠性问题,那我们就使用多线程方案。Reactor 多线程模型是将业务逻辑处理交个多个线程处理。

  • 原模型图

  • 理解模型图

多线程模型相比单线程模型,它可以充分利用多 CPU 的能力,性能方面有了较大的提升,同时也不会因为单一的业务线程处理较慢而影响整个系统,但是它依然是单线程去处理所有的事件监听及响应,在高并发下还是存在性能问题。

主从多线程模型

由于多线程模型对事件的监听和响应依然采用单线程方式,在高并发条件下还是存在性能瓶颈,所以衍生出了主从多线程模型,主从多线程模型有多个 Reactor 线程组成,每个 Reactor 有自己单独的 Selector 对象。MainReactor 负责客户端的 Accept 事件,建立连接后将其注册到 SubReactor 中,SubReactor 则负责具体的 I/O 事件。

  • 原型图:

  • 理解版

主从多线程模型足以应对高并发,轻轻松松达到成千上万规模的客户端连接。Netty 推荐采用主从多线程模型。需要明白的是一个 MainReactor 是可以对应多个 SubReactor ,在海量客户端并发请求的情况下,我们可以适当增加 SubReactor 线程的数量来提供系统的吞吐量。

对于 Reactor 模式,大明哥在【死磕 NIO】— Reactor 模式就一定意味着高性能吗?有详细的讲解,对其不是很理解的同学可以先去看看这篇文章。

EventLoop

为什么说 EventLoop 是 Netty 的精髓呢?因为 EventLoop 是 Netty Reactor 线程模型的核心处理引擎。要想明白 Netty 为什么可以轻松处理成千上万规模的客户端连接,就必须要掌握 EventLoop。由于本篇是入门篇,所以我们对其不会讲解很深入,掌握它的原理即可,为后面的进阶和源码分析打下基础。

EventLoop 从字面上来看就是 event + loop,也就是事件循环,事件就是 I/O 事件,循环则可以理解为 while(true)。直白点就是EventLoop 是通过循环方式来处理 I/O 事件的执行器。它的简要 UML 类图如下:

从上面我们可以看出 EventLoop 本质就是一个单线程执行器,这个执行器干嘛呢?就是处理 Channel 上的 IO 事件。

当客户端与服务端建立连接后,服务端会创建一个 Channel,并将该 Channel 与 EventLoop 绑定。绑定后,该 Channel 在整个生命周期内所有发生的事件都由该 EventLoop 来处理,但是一个 EventLoop 可以绑定多个 Channel。对于 EventLoop 而言,它就是通过不断循环它所绑定的 Channel 事件列表,检测是否有事件发生,如果有,则将该事件分发给 worker 线程处理。运行模式如下图:

EventLoopGroup

EventLoopGroup 就是一组 EventLoop ,它主要是来维护和管理 EventLoop,一般我们是不会直接使用 EventLoop 的,而是通过 EventLoopGroup 来使用它。我们看 EventLoopGroup 的源码就知道 EventLoopGroup 就提供了两个方法:

  • register() :当服务端创建一个 Channel 后,会调用 EventLoopGroup 的 register() 将 Channel 绑定到其中一个 EventLoop 上。
  • next():返回 EventLoopGroup 中维护的 EventLoop。

所以 Channel、EventLoop、EventLoopGroup 之间的关系如下:

  • 一个 EventLoopGroup 包含一个或者多个 EventLoop。
  • 一个 Channel 在它的声明周期中只会与一个 EventLoop 绑定。
  • 一个 EventLoop 可以绑定多个 Channel。
  • 一个 EventLoop 只会与一个 Thread 绑定,同时该 EventLoop 的所有 IO 事件都由这一个 Thread 执行。

如下图:

到这里我相信小伙伴们基本上知道了 EventLoop、EventLoopGroup 是干啥的了,他们的基本原理也差不多了解了,在上面 Reactor 线程模型中,大明哥提到,Netty 推荐我们采用主从多线程模型,这了大明哥就主从多线程模型把 Channel、EventLoop、EventLoopGroup 再阐述一遍,便于小伙伴们理解更深。先看图:

  • BoosEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,会新建一个 Channel,同时将 Channel 注册到 WorkEventLoopGroup 中的某个 EventLoop 上。
  • WorkEventLoopGroup 中的 EventLoop 负责监听 Channel 的 read/write 事件,当有事件发生时,EventLoop 读取数据将交给与之绑定的 ChannelPipeline 的 ChannelHandler 处理。
  • 这里有两个一一对应的关系
    • 一个 Channel 对应一个 EventLoop,且整个生命周期内所有 IO 事件都是该 EventLoop 处理。
    • 一个 EventLoop 对应一个 Thread,该 EventLoop 中的所有 IO 事件都由该 Thread 处理。
    • 由于这两个一一对应的关系,Channel 生命周期的所有 IO 事件都是线程独立的。

我们在使用 EventLoop 的时候一般不会直接使用它,而是通过 EventLoopGroup 来使用,我们常用的 EventLoopGroup 有两种:

  • NioEventLoopGroup:它处理IO事件、普通任务、定时任务
  • DefaultEventLoopGroup:处理普通任务,定时任务

示例

上面大明哥详细阐述了 EventLoop、EventLoopGroup 的核心原理,下面我们来使用他们,进一步掌握他们。

获取 EventLoop

EventLoopGroup 提供了 next() 方法,用于我们获取它管理的 EventLoop。

public class EventLoopTest_1 {
    public static void main(String[] args) {
        // 创建两个线程
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);

        log.info("{}",eventLoopGroup.next());
        log.info("{}",eventLoopGroup.next());
        log.info("{}",eventLoopGroup.next());
        log.info("{}",eventLoopGroup.next());
    }
}

执行结果

...EventLoopTest_1 - io.netty.channel.nio.NioEventLoop@6ea6d14e
...EventLoopTest_1 - io.netty.channel.nio.NioEventLoop@6ad5c04e
...EventLoopTest_1 - io.netty.channel.nio.NioEventLoop@6ea6d14e
...EventLoopTest_1 - io.netty.channel.nio.NioEventLoop@6ad5c04e

从执行结果可以看出,next() 按照顺序的方式返回 NioEventLoop,其实next() 是通过一个算法来选择返回哪一个 NioEventLoop 的。

new NioEventLoopGroup(2) 指定创建的 NioEventLoop 线程数,如果我们不指定则默认为 cpu * 2。

执行普通任务

EventLoop 继承 Executor,可定可以执行任务拉。它提供了 submit()execute() 两个方法来执行任务。

public class EventLoopTest_2 {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);

        eventLoopGroup.next().submit(() -> {
            log.info("hello,我是大明哥,这是 死磕 Netty 系列-111");
        });

        eventLoopGroup.next().submit(() -> {
            log.info("hello,我是大明哥,这是 死磕 Netty 系列-222");
        });

        log.info("这是主线程拉..");
    }
 }

执行结果

22:50:16.221 [main] INFO ...EventLoopTest_2 - 这是主线程拉..
22:50:16.222 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_2 - hello,我是大明哥,这是 死磕 Netty 系列-111
22:50:16.222 [nioEventLoopGroup-2-2] INFO ...EventLoopTest_2 - hello,我是大明哥,这是 死磕 Netty 系列-222

执行定时任务

EventLoop 也继承 ScheduledExecutorService ,所以它也具备执行定时任务的功能,它提供了四个 scheduleXxx() 方法来执行定时任务。

public class EventLoopTest_3 {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);

        eventLoopGroup.next().scheduleAtFixedRate(() ->{
            log.info("hello,我是大明哥,这是 死磕 Netty 系列");
        },1,2, TimeUnit.SECONDS);

        log.info("这是主线程拉..");
    }
}

执行结果

23:09:07.615 [main] INFO ...EventLoopTest_3 - 这是主线程拉..
23:09:08.616 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:10.616 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:12.616 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:14.615 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:16.615 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:18.615 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列
23:09:20.614 [nioEventLoopGroup-2-1] INFO ...EventLoopTest_3 - hello,我是大明哥,这是 死磕 Netty 系列

从执行结果中可以看出,延后 1 秒后,每隔 2 秒就执行一次任务。

处理 IO 事件

处理 IO 事件才是 EventLoop 的本职工作。

// 服务端
public class EventLoopTest_4_server {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info(byteBuf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8081);
    }
}

// 客户端
public class EventLoopTest_4_client {
    public static void main(String[] args) throws Exception {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("127.0.0.1",8081)
                .sync()
                .channel();

        for (int i = 1 ; i <= 5 ; i++) {
            channel.writeAndFlush("hello,我是 sike-java - " + i);
            TimeUnit.SECONDS.sleep(2);
        }
    }
}

服务端接收客户端信息打印出来,客户端每个 2 秒向服务端发送一次消息。

执行结果如下:

23:35:59.412 [nioEventLoopGroup-2-2] ...EventLoopTest_4_server - hello,我是 sike-java - 1
23:36:01.349 [nioEventLoopGroup-2-2] ...EventLoopTest_4_server - hello,我是 sike-java - 2
23:36:03.354 [nioEventLoopGroup-2-2] ...EventLoopTest_4_server - hello,我是 sike-java - 3
23:36:05.357 [nioEventLoopGroup-2-2] ...EventLoopTest_4_server - hello,我是 sike-java - 4
23:36:07.357 [nioEventLoopGroup-2-2] ...EventLoopTest_4_server - hello,我是 sike-java - 5

这里看到了服务端打印日志的线程一直都是 nioEventLoopGroup-2-2 ,这就充分表明了 Channel 是绑定在 EventLoop 的,它所有的 IO 时间都由该 EventLoop 处理。不信的话我们修改下客户端发送的消息:“hello,我是 sike-netty - i”,再来观察服务端打印的日志:

观察下蓝色部分和红色部分,是不是线程名不一样?那一个 EventLoop 绑定多个 Channel 呢?这个任务就交给小伙伴去验证吧。

EventLoop 线程细分

Netty 推荐我们采用主从多线程模型,但是从上面的示例其实使用的一直都是一个 EventLoopGroup,也就是多线程模式,其实切换成主从多线程的模式也是非常简单的。Netty 提供了 group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,我们利用这个方法可以将 BossEventLoopGroup 和 WorkerEventLoopGroup 进行分开,其中** BossEventLoopGroup 专门负责 ServerSocketChannel 上的 Accept 事件,WorkerEventLoopGroup 则负责 SocketChannel 上的读写**。

这种分工比较简单,大明哥就不演示了,我们来说说另外一种分工,我们知道 EventLoop 是需要绑定很多 Channel 的,且它是单线程处理的,假如它处理任务当中某个部分耗时非常长,则势必会影响到其他 Channel,降低整个系统的吞吐量,那怎么办呢?有两种方案:

  1. 将这部分逻辑抽离出来,采用异步方式,用其他线程来处理这部分逻辑
  2. 与上面方式一样,同样是将这部分逻辑抽离处理,只不过我们采用不同的 EventLoop 来执行这部分逻辑

这里大明哥只演示第 2 中方案。ChannelPipeline 提供了一个方法 addLast(EventExecutorGroup group, String name, ChannelHandler handler); 该方法接受三个参数:

  1. group:指定执行的 group
  2. name:name
  3. handler:处理 handler

我们可以利用该方法来实现方案 2 ,达到线程细分。如下:

public class EventLoopTest_5_server {
    public static void main(String[] args) {
        // 处理耗时较长的 EventLoopGroup
        EventLoopGroup otherGroup = new DefaultEventLoopGroup(2);

        new ServerBootstrap()
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                String objectMsg = byteBuf.toString(Charset.defaultCharset());
                                log.info(objectMsg);

                                ctx.fireChannelRead(objectMsg);
                            }
                        });
                        ch.pipeline().addLast(otherGroup,"other-eventGroup",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                TimeUnit.SECONDS.sleep(10);
                                log.info("{}",msg);
                            }
                        });
                    }
                })
                .bind(8081);
    }
}

服务到端,在处理第二步的时候,使用了一个 otherGroup,在 handler 内部我们等待 10 秒。然后我们需要 3 个客户端:

  • 客户端1:发送“ hello,我是 sike-netty...”
  • 客户端2:发送“ hello,我是 sike-java...”
  • 客户端3:发送“ hello,我是 sike-redis...”

执行结果如下:

10:09:43.761 [nioEventLoopGroup-4-1] ...EventLoopTest_5_server - hello,我是 sike-netty...
10:09:49.889 [nioEventLoopGroup-4-2] ...EventLoopTest_5_server - hello,我是 sike-java...
10:09:53.763 [defaultEventLoopGroup-2-1] ...EventLoopTest_5_server - hello,我是 sike-netty...
10:09:56.974 [nioEventLoopGroup-4-1] ...EventLoopTest_5_server - hello,我是 sike-redis...
10:09:59.890 [defaultEventLoopGroup-2-2] ...EventLoopTest_5_server - hello,我是 sike-java...
10:10:06.977 [defaultEventLoopGroup-2-1] ...EventLoopTest_5_server - hello,我是 sike-redis...

从执行结果中我们可以看出,同一个客户端的消息是由两个不同的 EventLoop 来执行的,一个是 nioEventLoopGroup,一个是 defaultEventLoopGroup,同时 defaultEventLoopGroup 等待 10 秒,并不影响 nioEventLoopGroup 处理任务。这样就已经将耗时较长的任务隔离开了,达到了线程细分的任务。

总结

到这里 Netty 最精华的部分 EventLoop 就已经介绍完毕了,希望小伙伴们有所收获,这里大明哥简要总结下:

  1. EventLoop 是 Netty 实现 Reactor 线程模型的核心处理引擎。
  2. Netty 推荐采用主从多线程模型,其中 BossEventLoopGroup 负责 ServerSocketChannel 的 Accept 事件,WorkerEventLoopGroup 负责 SocketChannel 的读写事件。
  3. 当客户端与服务端建立连接时,服务端会为该客户端新建一个 Channel,并将该 Channel 与一个 EventLoop 绑定起来,在该 Channel 的整个生命周期中,它所有的 IO 事件都由该 EventLoop 处理。
  4. EventLoop 其本质就是一个单线程任务执行器,它内部维护着一个 Selector ,通过 while(true) 的方式来判断所绑定的 Channel 是否有 I/O 事件发生,如果有则交给 ChannelHandler 处理。
  5. EventLoop 与一个 Thread 绑定,该 EventLoop 所有的 I/O 事件都是由该 Thread 来处理,所以我们需要将耗时较长的逻辑分离出来,以免影响到其他 Channel 的事件处理。
  6. 对应关系如下:
    • 一个 EventLoopGroup 包含一个或者多个 EventLoop。
    • 一个 Channel 在它的声明周期中只会与一个 EventLoop 绑定。
    • 一个 EventLoop 可以绑定多个 Channel。
    • 一个 EventLoop 只会与一个 Thread 绑定。

看到这里可能有小伙伴觉得 EventLoop 有点儿简单,觉得 EventLoop 不就是继承了 Executor 的线程执行器么?这里怪我,因为上面的 EventLoop UML 类图也确实是简单,这里大明哥贴出来一个完整的,各位小伙伴先感受下:

够复杂吧!这里大明哥就不展开阐述了,感兴趣的小伙伴可以先研究研究,后面大明哥会专门写文章来分析这个继承关系的。

源码地址:http://b.nxw.so/1YfuVb

阅读全文