Netty 异步高性能通信框架

 2023-02-12
原文作者:CoderJie 原文地址:https://juejin.cn/post/6993602106355613709

Netty 异步高性能通信框架

互联网行业:RPC框架大量引入Netty,Dubbo 中默认使用Netty做通信框架,大型网络游戏,地图服务器,在大数据领域(AVRO实现数据文件共享)默认采用Netty做跨界点通信,Netty Service 对Netty二次封装...

代码地址:gitee.com/zhuhuijie/n…

1. IO模型

1.1 BIO 模型

特点:每建立一个连接就会创建一个线程,没有连接就会阻塞等待

    package com.zhj.test.bio;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author zhj
     */
    public class BIOServer {
    
        public static void main(String[] args) throws IOException {
            // 线程池机制
    
            // 思路
            // 1. 创建一个线程
            // 2. 如果有客户端连接,就创建一个线程,与之通信(单独写一个方法)
    
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            // 创建ServerSocket
            ServerSocket serverSocket = new ServerSocket(6666);
            System.out.println("服务器程序启动!");
    
            while (true) {
                // 监听,等待客户端连接
                System.out.println("等待连接!!!");
                final Socket socket = serverSocket.accept();
                System.out.println("连接一个客户端(socket)!");
    
                // 创建一个线程与之通讯
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 可以与客户端通讯
                        handler(socket);
                    }
                });
            }
        }
    
        /**
         * 与客户端通讯
         */
        public static void handler(Socket socket) {
            byte[] bytes = new byte[1024];
            try {
                InputStream inputStream = socket.getInputStream();
                // 循环读取客户端读取的数据
                while (true) {
                    System.out.println("等待输入数据!!!");
                    int read = inputStream.read(bytes);
                    if (read != -1) {
                        System.out.println(Thread.currentThread().getName() + " : " + Thread.currentThread().getId());
                        System.out.println("接收:" + new String(bytes, 0, read));
                    } else {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                System.out.println("关闭与客户端的连接!");
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

1.2 NIO

NIO 全称java non-blocking IO 是指JDK提供的新的API。从JDK1.4开始,Java提供了一系列改进输入输出的新特性,被统称NIO(New IO),是同步非阻塞的。

三大核心部分:Channel(通道),Buffer缓存区),Selector(选择题)

NIO是面向缓冲区,或者面向块编程的,数据读到一个它稍后处理的缓冲区,需要时可在缓冲区前后移动,这就增加了它处理过程中的灵活性,使他可以提供非阻塞式的高伸缩性网络。

特点:

  • 非阻塞 不需要线程一直等待,有别的任务线程也可以去执行
  • 一个线程可以处理多个连接,当大量请求到服务器,不需要每个连接开一个线程

HTTP2.0采用多路复用技术,同一个连接处理多个请求。

三大核心组件的关系

  • 每个Channel都会对应一个Buffer
  • Selector对应一个线程,一个线程对应多个Channel连接
  • 该图反应了三个Channel 注册到改Selector 程序
  • 程序切换到那个Channel是由事件决定的,Event就是一个重要的概念
  • Selector会根据不同的时间再各个通道上切换
  • Buffer就是一个内容块,底层是与一个数组的
  • 数据的读取写入是通过Buffer,这个与BIO有本质区别,BIO要么是输入流,要么是输出流,不能是双向的,NIO的Buffer是可以读也可以写的,需要flip方法切换
  • Channel是双向的,可以返回底层操作系统的情况,Linux底层的操作系统就是双向的

1.2.1 Buffer缓冲区的使用

  • Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
  • Limit 表示缓冲区当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
  • Position 位置,下一个要读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写操作做准备
  • Mark 标记
    package com.zhj.test.bio;
    
    import java.nio.IntBuffer;
    
    /**
     * @author zhj
     */
    public class BasicBuffer {
    
        public static void main(String[] args) {
            // 举例说明Buffer 的使用
            // 创建一个Buffer
            IntBuffer intBuffer = IntBuffer.allocate(5);
            // 向Buffer 存数据
            for (int i = 0; i < intBuffer.capacity(); i++) {
                intBuffer.put(i * 2);
            }
            // 从Buffer 读取数据
            // 将Buffer转换,读写切换
            /*
            public final Buffer flip() {
                limit = position;
                position = 0;
                mark = -1;
                return this;
            }
            */
            intBuffer.flip();
            // 设置读取位置
            intBuffer.position(2);
            // 设置读取结束位置
            intBuffer.limit(4);
    
            while (intBuffer.hasRemaining()) {
                System.out.println(intBuffer.get());
            }
        }
    }
    
    public class NIOByteBufferPutGet {
        public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(64);
            buffer.putInt(100);
            buffer.putLong(9L);
            buffer.putChar('强');
            buffer.putShort((short) 4);
            buffer.flip();
            System.out.println(buffer.getInt());
            System.out.println(buffer.getLong());
            System.out.println(buffer.getChar());
            System.out.println(buffer.getShort());
        }
    }
    public class ReadOnlyBuffer {
        public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(64);
    
            for (int i = 0; i < 64; i++) {
                buffer.put((byte) i);
            }
            buffer.flip();
            ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
            System.out.println(readOnlyBuffer.getClass());
    
            while (readOnlyBuffer.hasRemaining()) {
                System.out.println(readOnlyBuffer.get());
            }
            // 只读不能放数据
            // readOnlyBuffer.put((byte) 1);
        }
    }
    /**
     * MappedByteBuffer 说明
     * 1. 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
     * @author zhj
     */
    public class MappedByteBufferTest {
        public static void main(String[] args) throws Exception {
            File file1 = new File("E:\\data_file\\log1.txt");
            File file2 = new File("E:\\data_file\\log2.txt");
            RandomAccessFile randomAccessFile = new RandomAccessFile(file1, "rw");
            FileChannel fileChannel = randomAccessFile.getChannel();
            /**
             *  参数(1读写模式,2起始位置,3映射到内存大小)
             */
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5);
            mappedByteBuffer.put(0, (byte) 'H');
            mappedByteBuffer.put(3, (byte) '9');
    
            randomAccessFile.close();
            System.out.println("修改成功~");
        }
    }

1.2.2 Channel通道的使用

基本介绍

1)NIO的通道类似与流,但区别如下

  • 通道可以同时进行读写,而流只能进行读或者写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲区读取数据,也可以写数据到缓冲区

2)BIO中的stream 是单向的,如FileinputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读,也可以写

3)Channel 在NIO中是一个接口

4)常用的Channel类有 FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel

5)FileChannel用于文件的数据读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写

FileChannel 类

  • read 将通道数据读取到缓冲区中
  • write 把缓冲区的数据写到通道
  • transferFrom() 从目标通道中复制数据到当前通道
  • transferTo() 把数据从当前通道复制给目标通道

1.3 NIO与BIO的比较

  1. BIO是以流的方式处理的,而NIO以块的方式处理数据,块IO的效率比流IO的高很多
  2. BIO是阻塞的,NIO是非阻塞的
  3. BIO基于字节流和字符流进行操作,而NIO基于Channel通道和Buffer缓冲区进行操作,数据总是从通道读到缓冲区中,或者从缓冲区写入到通道中。Sellector选择器用于监听多个通道的事件比如连接请求,数据到达等,因此使用单个线程就可以监听多个客户端通道

4 AIO 了解

  1. JDK 7 引入Asynchronous I/O ,即AIO.在进行I/O编程中,常用到两种模式;Reactor 和 Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应处理
  2. AIO即NIO 2.0 ,叫异步不阻塞IO.AIO引入异步通道的概念,采用了proactor模式,简化了程序的编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
  3. 目前AIO没有被广泛应用,Netty也是基于NIO,而不是AIO
BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 负载
可靠性
吞吐量

5 Netty 概述

异步的基于事件驱动的网络应用的框架,用于快速开发高性能,高可靠的网络IO程序

原生NIO存在的问题

  1. NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel,SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到 Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。一
  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
  4. JDK NIO的Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector空轮询,最终导致CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决。

Netty 的优点

Netty对JDK自带的NIO的API进行了封装,解决了上述问题。

  1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池
  2. 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK 5 (Netty3.x或6 (Netty 4.x)就足够了。
  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
  4. 安全:完整的SSL/TLS和StartTLS支持。
  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更多的新功能会被加入

Netty版本说明

  1. netty版本分为netty3.x和netty4.x、netty5.x
  2. 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本
  3. 目前在官网可下载的版本netty3.x netty4.0.x和netty4.1.x4)
  4. 在本套课程中,我们讲解Netty4.1.x版本
  5. netty下载地址:bintray.com/netty/downl…

6 Netty 线程模型

6.1 线程模型

传统阻塞I/O服务模型 和 Reactor模式(单Reactor单线程、单Reactor多线程、主从Reactor多线程)

Netty基于主从Reactor多线程模型

传统IO模型

缺点

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费

Reactor(反应器模式,分发者模式,通知者模式)

针对传统阻塞I/o服务模型的2个缺点,解决方案:

  1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
  2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

202212302209123851.png

Reactor模式中核心组成:

  1. Reactor: Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对I0O事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
  2. Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。

单Reactor单线程

202212302209177012.png

单Reactor多线程

202212302209193653.png

方案说明

  1. Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
  2. 如果建立连接请求,则右Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
  3. 如果不是连接请求,则由reactor分发调用连接对 应的handler来处理
  4. handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
  5. worker线程池会分配独立线程完成真正的业务,并将结果返回给handler
  6. handler收到响应后,通过send将结果返回给client

主从Reactor多线程

202212302209206934.png

方案优缺点说明:

  1. 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
  2. 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
  3. 缺点:编程复杂度较高

结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持

Netty模型

202212302209230535.png

6.2 Netty简单案例

    <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.20.Final</version>
            </dependency>
    </dependencies>
    // 服务端
    public class SimpleNettyServer {
        public static void main(String[] args) throws InterruptedException {
            // 创建两个线程组 BossGroup 和 WorkerGroup
            // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
            // 两个线程组都是无限循环
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(32);
            // 创建服务器端启动对象,配置参数
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 链式编程
                serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
                        .channel(NioServerSocketChannel.class) // 设置NIO通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            // 创建通道初始化对象
                            // 给pipeline 设置处理器
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new SimpleNettyServerHandler());
                            }
                        }); // 给workerGroup管道设置处理器
                System.out.println("服务器初始化完毕!!!");
                // 启动服务器,并绑定端口,并且同步处理
                ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
    
                // 对关闭通道进行监听 (异步模型)
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    // 服务端处理器
    public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 读取数据
         * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
         * @param msg 客户端发送的数据 默认是Object
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("服务器读取线程 " + Thread.currentThread().getName());
            System.out.println("server ctx = " + ctx);
            // 将msg转为一个ByteBuffer
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址为:" + ctx.channel().remoteAddress());
                    // 比如 这有一个非常耗时的业务 需要异步执行
            // 解决方案1 用户自定义普通任务 taskQueue
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5*1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2",CharsetUtil.UTF_8));
                    } catch (InterruptedException e) {
                        System.out.println("服务端发生异常了!!!");
                    }
                }
            });
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 15 秒
                        Thread.sleep(10*1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵3",CharsetUtil.UTF_8));
                    } catch (InterruptedException e) {
                        System.out.println("服务端发生异常了!!!");
                    }
                }
            });
            // 解决方案2 用户自定义定时任务 scheduleTaskQueue
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 20 秒
                        Thread.sleep(5*1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4",CharsetUtil.UTF_8));
                    } catch (InterruptedException e) {
                        System.out.println("服务端发生异常了!!!");
                    }
                }
            },5, TimeUnit.SECONDS);
            // 解决方案3 非当前Reactor 线程调用Channel的各种方法
            System.out.println("Go to...");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // write + flush 将数据写入缓冲并刷新
            // 发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 发生异常,关闭通道
            ctx.close();
        }
    }
    // 客户端
    public class SimpleNettyClient {
        public static void main(String[] args) throws InterruptedException {
            // 客户端需要一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            // 创建客户端启动对象
            try {
                Bootstrap bootstrap = new Bootstrap();
                // 设置相关参数
                bootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new SimpleNettyClientHandler());
                            }
                        });
                System.out.println("客户端启动完成!!!");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    }
    // 客户端处理器
    public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 通道就绪就可以发送消息
            System.out.println("client active ctx = " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务器!",CharsetUtil.UTF_8));
        }
    
        /**
         * 通道有读取数据时,会触发
         * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
         * @param msg 客户端发送的数据 默认是Object
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("client read ctx = " + ctx);
            // 将msg转为一个ByteBuffer
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服务器回复的消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器端地址为:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            // 发生异常,关闭通道
            ctx.close();
        }
    }

7 Netty异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。
  3. 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
  4. Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)

8 Netty入门实例 Http服务

    public class TestHttpServer {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new TestHttpServerInitializer());
                ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    public class TestHttpServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            // 向管道加入处理器
            // 得到管道
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 加入一个netty 提供的httpServerCodec => [coder - decoder]
            // HttpServerCodec 说明
            // 1.HttpServerCodec 是netty提供的处理http的编码解码器
            pipeline.addLast("MyHttpServerCodeC", new HttpServerCodec());
            // 2.增加自定义handler
            pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
        }
    }
    /**
     * 说明
     * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
     * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
     * @author zhj
     */
    public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
        // 读取客户端数据
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            if (msg instanceof HttpRequest) {
                HttpRequest httpRequest = (HttpRequest) msg;
                URI uri = new URI(httpRequest.uri());
                if ("/favicon.ico".equals(uri.getPath())) {
                    System.out.println("请求了favicon.ico资源 不做处理!!!");
                    return;
                }
                // 每次请求都会产生新的
                System.out.println("pipeline hashcode" + ctx.pipeline().hashCode());
                System.out.println("TestHttpServerHandler hashcode" + this.hashCode());
    
                System.out.println("msg 类型 : " + msg.getClass());
                System.out.println("客户端地址 : " + ctx.channel().remoteAddress());
    
                // 回复信息给浏览器 [http协议]
                ByteBuf content = Unpooled.copiedBuffer("Hello,服务器", CharsetUtil.UTF_8);
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
                // 将构建好 response 返回
                ctx.writeAndFlush(response);
            }
        }
    }

9 Netty 核心模块

Bootstrap、ServerBootstrap

  1. Bootstrap意思是引导,一个Netty应用通常由一个 Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类

  2. 常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoopGrouppublic B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现public B option(ChannelOption option, Tvalue),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption, Tvalue),用来给接收到的通道添加配置

    public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) WorkerGroup

    public ServerBootstrap Handler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) BossGroup public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端

Future、ChannelFuture

  1. Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和—ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
  2. 常见的方法有Channel channel(),返回当前正在进行IO操作的通道ChannelFutyre sync(),等待异步操作执行完毕

Channel

  1. Netty 网络通信的组件,能够用于执行网络I/o操作。

  2. 通过Channel可获得当前网络连接的通道的状态

  3. 通过Channel可获得网络连接的配置参数(例如接收缓冲区大小)

  4. Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成

  5. 调用立即返回一个 ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方

  6. 支持关联I/O操作与对应的处理程序

  7. 不同协议、不同的阻塞类型的连接都有不同的 Channel类型与之对应

    常用的Channel类型:

    • NioSocketChannel,异步的客户端 TCP Socket 连接。
    • NioServerSocketChannel,异步的服务器端TCP Socket连接。
    • NioDatagramChannel,异步的UDP连接。
    • NioSctpChannel,异步的客户端 Sctp连接。
    • NioSctpServerChannel,异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO。

Selector

  1. Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的 Channel事件。
  2. 当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select) 这些注册的Channel是否有己就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel

10 Netty 群聊

    // 服务端
    public class GroupChatServer {
        private int port;
        public GroupChatServer(int port) {
            this.port = port;
        }
        public void run() throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 加入解码器
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupChatServerHandler());
                            }
                        });
                ChannelFuture cf = serverBootstrap.bind(port).sync();
                System.out.println("【服务器】启动完成~");
                cf.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new GroupChatServer(7000).run();
        }
    }
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        // 定义一个channelGroup 管理所有的channel
        // GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单列
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        
        // 私聊
        // private static Map<String, Channel> channelMap = new HashMap<>();
    
        private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        // 连接建立,第一个被执行 将 channel 加入 channelGroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            // 将客户加入聊天的信息推送其他客户端
            // 不需要自己遍历
            channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天室。" + sdf.format(new Date()) + "\n");
            channelGroup.add(channel);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            // 可以自动执行 channelGroup.remove(channel);
            channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开聊天室。" + sdf.format(new Date()) + "\n");
        }
    
        // channel 处于活动状态
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 上线了。\n");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 跑路了。\n");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.forEach(ch -> {
                if (channel != ch) {
                    ch.writeAndFlush("【客户端】" + channel.remoteAddress() + " : " + s + "\n");
                } else {
                    // 回显
                    ch.writeAndFlush("【我】" + " : " + s + "\n");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("【服务端】 开小差了~");
            ctx.close();
        }
    }
    // 客户端
    public class GroupChatClient {
        private final String host;
        private final int port;
    
        public GroupChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public void run() throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(bossGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 加入解码器
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupChatClientHandler());
                            }
                        });
                ChannelFuture cf = bootstrap.connect(host, port).sync();
                Channel channel = cf.channel();
                System.out.println("【客户端】" + channel.localAddress() + " 启动完成~");
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(msg + "\r\n");
                }
            } finally {
                bossGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new GroupChatClient("127.0.0.1", 7000).run();
        }
    }
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            System.out.println(s.trim());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("【客户端】 开小差了~");
            ctx.close();
        }
    }

11 Netty 实现Websocket

改变http协议的状态码为101,升级成为ws协议实现全双工长连接通信

服务端

    package com.zhj.test.netty.websocket;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * @author zhj
     */
    public class WebSocketServer {
        public static void main(String[] args) throws InterruptedException {
            // 创建两个线程组 BossGroup 和 WorkerGroup
            // BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
            // 两个线程组都是无限循环
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            // 创建服务器端启动对象,配置参数
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 链式编程
                serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
                        .channel(NioServerSocketChannel.class) // 设置NIO通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            // 创建通道初始化对象
                            // 给pipeline 设置处理器
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                // 基于Http协议的,添加Http编解码器
                                pipeline.addLast(new HttpServerCodec());
                                // 以块的方式写,添加ChunkedWriteHandler
                                pipeline.addLast(new ChunkedWriteHandler());
                                // http数据在传输是分段的 HttpObjectAggregator 可以将多段聚合
                                pipeline.addLast(new HttpObjectAggregator(8192));
                                // websocket 数据以帧(frame)形式传递
                                // websocket 有六个子类
                                // 浏览器请求时 ws://localhost:7000/xxx 表示请求的uri
                                // websocket 核心功能是将http协议升级为ws 保持长连接
                                pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
    
                                // 自定义handler ,处理业务逻辑
                                pipeline.addLast(new WebSocketHandler());
                            }
                        }); // 给workerGroup管道设置处理器
                System.out.println("服务器初始化完毕!!!");
                // 启动服务器,并绑定端口,并且同步处理
                ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
                // 对关闭通道进行监听 (异步模型)
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    package com.zhj.test.netty.websocket;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    
    import java.time.LocalDateTime;
    
    /**
     * 这里TextWebSocketFrame 类型,表示一个文本帧(frame)
     * @author zhj
     */
    public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            System.out.println("服务器接收消息:" + msg.text());
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器响应时间:" + LocalDateTime.now() + msg.text()));
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            // id 表示唯一的一个值
            System.out.println("handler added 被调用 " + ctx.channel().id().asLongText());
            System.out.println("handler added 被调用 " + ctx.channel().id().asShortText());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handler removed 被调用 " + ctx.channel().id().asLongText());
            System.out.println("handler removed 被调用 " + ctx.channel().id().asShortText());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("异常发生 " + cause.getMessage());
            ctx.close();
        }
    }

客户端

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>websocket</title>
    </head>
    <body>
    <script>
        var socket;
        // 判断当前浏览器是否支持websocket编程
        if (window.WebSocket) {
            // go on
            socket = new WebSocket("ws://localhost:7000/hello")
            // ev收到服务器端发送的消息
            socket.onmessage = function (ev) {
                var rt = document.getElementById('responseText')
                rt.value = rt.value + "\n" + ev.data
            }
            // 相当于连接开启
            socket.onopen = function (ev) {
                var rt = document.getElementById('responseText')
                rt.value = '连接开启~'
            }
            socket.onclose = function (ev) {
                var rt = document.getElementById('responseText')
                rt.value = rt.value + '\n' + '连接关闭~'
            }
        } else {
            alert("您的浏览器太菜了,不支持websocket")
        }
    
        function send(message) {
            // 发送消息到服务器
            if (!window.socket) {
                return
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            }
        }
    </script>
        <form onsubmit="return false">
            <textarea name="message" style="height: 300px;width: 300px" ></textarea>
            <input type="button" value="发送消息" onclick="send(this.form.message.value)">
            <textarea id="responseText" style="height: 300px;width: 300px"></textarea>
            <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
        </form>
    </body>
    </html>

12 Protobuf 序列化数据

Protobuf基本介绍

  1. Protobuf是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC[远程过程调用remote procedure call ]数据交换格式。 自前很多公司http+json ---> tcp+protobuf
  2. 参考文档:developers.google.com/protocol-bu… 语言指南
  3. Protobuf是以 message的方式来管理数据的.
  4. 支持跨平台、跨语言,即客户端和服务器端可以是不同的语言编写的(支持目前绝 大多数语言,例如C++、C#、Java、python等)
  5. 高性能,高可靠性
  6. 使用propobuf编译能自动生成代码,Protobuf是将类定义使用.proto文件进行描述。说明,在idea中编写.proto文件时,会自动提示是否下载.ptotot编写插件.可以让语法高亮。
  7. 然后通过protoc.exe编译器根据.proto自动生成.java文件
  8. protobuf使用 user.proto -> protoc.exe ->user.java 编码 传递二进制 服务端解码
    protoc.exe --java_out=. Student.proto
    syntax = "proto3"; // 版本
    option java_outer_classname = "StudentPOJO"; // 生成的外部类名 同时也是文件名
    // protobuf 使用message 管理数据
    message Student { // 内部类 真正发送的POJO对象 1表示属性序号
      int32 id = 1;
      string name = 2;
    }
    
    syntax = "proto3"; // 版本
    option optimize_for = SPEED; // 加快解析
    option java_package = "com.zhj.test.netty.codec2"; // 指生成到哪个包下
    option java_outer_classname = "DataInfo"; // 生成的外部类名 同时也是文件名
    // protobuf 使用message 管理数据 其他的message
    message MyMessage {
      // 定义一个枚举
      enum DataType {
        StudentType = 0; // 在proto3 要求enum 编号从0开始
        workerType = 1;
      }
    
      // 用data_type来标识传的是哪一个枚举类型
      DataType data_type = 1;
    
      // 表示每次枚举类型只能出现其中的一个,节省空间
      oneof dataBody {
        Student student = 2;
        Worker worker = 3;
      }
    
    }
    message Student { // 内部类 真正发送的POJO对象 1表示属性序号
      int32 id = 1;
      string name = 2;
    }
    message Worker {
      string name = 1;
      int32 age = 2;
    }

13 Netty 编解码

入站先解码,再执行自己的业务处理器,出站先执行自己的业务处理器,再编码

解码器-ReplayingDecoder

  1. public abstract class ReplayingDecoder extends ByteToMessageDecoder
  2. ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数T指定了用户状态管理的类型,其中Void代表不需要状态管理
  3. ReplayingDecoder使用方便,但它也有一些局限性:并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedoperationException。ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

其它解码器

  1. LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(In或者Irln)作为分隔符来解析数据。
  2. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  3. HttpObjectDecoder:一个HTTP数据的解码器
  4. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

14 TCP粘包和拆包基本介绍

  1. TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有—一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为 面向流的通信是无消息保护边界的
  2. 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘

202212302209249066.png

传输对象(协议包)

    package com.zhj.test.netty.protocoltcp;
    
    import java.util.Arrays;
    
    /**
     * 协议包
     * @author zhj
     */
    public class MessageProtocol {
        private int len;
        private byte[] content;
    
        public MessageProtocol() {
    
        }
    
        public MessageProtocol(int len, byte[] content) {
            this.len = len;
            this.content = content;
        }
    
        public int getLen() {
            return len;
        }
    
        public void setLen(int len) {
            this.len = len;
        }
    
        public byte[] getContent() {
            return content;
        }
    
        public void setContent(byte[] content) {
            this.content = content;
        }
    
        @Override
        public String toString() {
            return "MessageProtocol{" +
                    "len=" + len +
                    ", content=" + Arrays.toString(content) +
                    '}';
        }
    }

编解码器

    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ReplayingDecoder;
    
    import java.util.List;
    
    /**
     * @author zhj
     */
    public class MyMessageDecoder extends ReplayingDecoder<Void> {
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            System.out.println("MyMessageDecoder decode 方法被调用");
            int length = byteBuf.readInt();
            byte[] content = new byte[length];
            byteBuf.readBytes(content);
            MessageProtocol messageProtocol = new MessageProtocol(length, content);
            list.add(messageProtocol);
        }
    }
    
    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * @author zhj
     */
    public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
            System.out.println("MyMessageEncoder encoder方法被调用");
            byteBuf.writeInt(messageProtocol.getLen());
            byteBuf.writeBytes(messageProtocol.getContent());
        }
    }

服务端

    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author zhj
     */
    public class TcpClient {
        public static void main(String[] args) throws InterruptedException {
            // 客户端需要一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            // 创建客户端启动对象
            try {
                Bootstrap bootstrap = new Bootstrap();
                // 设置相关参数
                bootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new TcpClientInitializer());
                System.out.println("客户端启动完成!!!");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    }
    
    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    /**
     * @author zhj
     */
    public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            // 向管道加入处理器
            // 得到管道
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new MyMessageDecoder());
            pipeline.addLast(new MyMessageEncoder());
            // 2.增加自定义handler
            pipeline.addLast("MyTcpServerHandler", new TcpServerHandler());
        }
    }
    
    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    import java.nio.charset.StandardCharsets;
    import java.util.UUID;
    
    /**
     * 说明
     * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
     * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
     * @author zhj
     */
    public class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    
        private int count;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            // System.out.println(new String(msg.array(), CharsetUtil.UTF_8));
            System.out.println("服务端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
            System.out.println("服务端接收的数据长度 " + msg.getLen());
            System.out.println("服务器接收到的消息量:" + (++this.count));
    
            String responseContent = UUID.randomUUID().toString();
            int len = responseContent.getBytes(StandardCharsets.UTF_8).length;
            MessageProtocol messageProtocol = new MessageProtocol(len, responseContent.getBytes(StandardCharsets.UTF_8));
            ctx.writeAndFlush(messageProtocol);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("服务器异常:" + cause.getMessage());
            ctx.close();
        }
    }

客户端

    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author zhj
     */
    public class TcpClient {
        public static void main(String[] args) throws InterruptedException {
            // 客户端需要一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            // 创建客户端启动对象
            try {
                Bootstrap bootstrap = new Bootstrap();
                // 设置相关参数
                bootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)
                        .handler(new TcpClientInitializer());
                System.out.println("客户端启动完成!!!");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    }
    
    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    
    /**
     * @author zhj
     */
    public class TcpClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            // 向管道加入处理器
            // 得到管道
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new MyMessageEncoder());
            pipeline.addLast(new MyMessageDecoder());
            // 2.增加自定义handler
            pipeline.addLast("MyTestTcpClientHandler", new TcpClientHandler());
        }
    }
    
    package com.zhj.test.netty.protocoltcp;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    import java.nio.charset.StandardCharsets;
    
    /**
     * 说明
     * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
     * 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
     * @author zhj
     */
    public class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    
        private int count;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 使用客户端发送10条数据 hello server
            for (int i = 0; i < 60; i++) {
                String mes = "今天真带劲。。。";
                byte[] content = mes.getBytes(StandardCharsets.UTF_8);
                int length = mes.getBytes(StandardCharsets.UTF_8).length;
                MessageProtocol messageProtocol = new MessageProtocol(length, content);
                ctx.writeAndFlush(messageProtocol);
            }
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            System.out.println("客户端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
            System.out.println("客户端接收的数据长度 " + msg.getLen());
            System.out.println("客户端接收到的消息量:" + (++this.count));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("异常信息:" + cause.getMessage());
            ctx.close();
        }
    }

15 RPC 调用流程

RPC 基本介绍

  1. RPC (Remote Procedure Call) —远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
  3. 常见的RPC框架:Dubbo、google 的 gRPC、Go语言的rpxc、Apache的thrift,Spring旗下的Spring Cloud

202212302209267717.png

自己实现dubbo RPC(基于Netty)需求说明

  1. dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框
  2. 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.x

设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据

代码实现:

gitee.com/zhuhuijie/n…