Netty入门

 2023-02-02
原文作者:谋莽台 原文地址:https://juejin.cn/post/7026367494122110990

池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。

网络编程

核心代码

服务端

    //服务器端的启动器,负责组装netty组件,启动服务器
    new ServerBootstrap()
    //多个组,每个组包含Boss和worker(selector+thread)
    .group(new NioEventLoopGroup())
    //选择channel的具体实现
    .channel(NioServerSocketChannel.class)
    //决定worker将来能执行哪些操作
    .childHandler(
            //初始化通道,负责添加别的handler(操作)
            new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
            //添加具体的handler
            nioSocketChannel.pipeline().addLast(new StringDecoder());//将ByteBuf 转换为字符串
            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义handler
    
                //读事件
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println(msg);
                }
            });
        }
    })
    //绑定监听端口
    .bind(8282);

客户端(同步去处理结果)

    EventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            //connect是异步非阻塞的,main线程发起了调用,真正执行connect是nio线程。所以下一步必须执行sync方法阻塞等待连接建立。
            //否则直接跳过阻塞获取到channel,此时还未连接就会出问题。
            .connect(new InetSocketAddress("localhost", 8282))
            //1.使用sycn方法同步处理结果,阻塞方法,直到连接建立
            .sync()
            //获取channel连接对象去读写
            .channel();
    
    while (true){
        Scanner scanner = new Scanner(System.in);
        String s = scanner.nextLine();
        if(s.equals("q")){
            //同理connect方法,channel的close方法也是异步非阻塞的,需要sync同步处理结果,直到连接断开。
            ChannelFuture closeFuture = channel.close();
            closeFuture.sync();
            break;
        }else{
            channel.writeAndFlush(s);
        }
    }
    //虽然主线程的连接断开了,但是还有nio的线程需要优雅的关闭。
    group.shutdownGracefully();

理解

  1. handler为一道工序,pipeline为流水线,多道工序合在一起就是一个流水线。
  2. 每次有一个事件发生,pipeLine就会负责发布事件,传播给每个handler,handler对自己感兴趣的事件进行处理。
  3. eventLoop为处理数据的工人,负责管理多个channel的io操作,并且一旦工人负责了某个channel就要负责到底,即绑定。(能够保证线程安全)
  4. eventLoop除了io操作还可以进行任务处理,并且有任务队列。
  5. eventLoop按照pipeline里的handler顺序去处理数据。

ByteBuf

直接内存和堆内存

  1. 直接内存创建和销毁的代价昂贵,但是读写性能高,适合配合池化功能。
  2. 直接内存对GC压力小,不受JVM垃圾回收管理,但要注意主动释放。

池化和非池化

  1. 池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。
  2. 没有池化,每次都要创建新的实例,这种方式对直接内存代价太高,对于堆内存会增加GC压力。

内存回收

  1. UnpooledHeapByteBuf使用JVM内存,只需要等待GC回收内存。
  2. UnpooledDirectByteBuf,需要特殊的方法来回收。
  3. PooledByteBuf采用的池化的机制,需要更复杂的规则来回收。
  4. 因为pipeline的存在,一般需要将ByteBuf传递给下一个handler,如果在finally中release了,就失去了传递性,所以谁是ByteBuf(如果某个handler对ByteBuf进行了转换也算)最后的使用者,谁来负责release。

零拷贝的体现

  1. slice方法,划分为多个ByteBuf分开进行读和修改(不能进行写操作,切片后的ByteBuf如果进行写,会影响原有的ByteBuf),但是多个ByteBuf共用的同一片内存,并没有进行数据复制。
  2. duplicate方法,相当于截取了原始的ByteBuf的全部内容,但是用的是同一块底层内存,只是读写指针式独立的。
  3. Unpooled.wrappedBuffer(buf1,buf2)将两个buf组合为一个buf。

相比较ByteBuffer的优势

  1. 池化,可以重用ByteBuf实例,节约内存,减少内存溢出的可能。
  2. 读写指针分离,不需要切换读写模式。
  3. 自动扩容。
  4. 支持链式调用。
  5. 许多方法体现零拷贝。

双向通信的实现

服务端

    new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.debug(msg.toString());
                    ByteBuf buffer = ctx.alloc().buffer();
                    buffer.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8));
                    ctx.writeAndFlush(buffer);
                    super.channelRead(ctx, msg);
    
                }
            });
        }
    })
    .bind(8282);

客户端

    EventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture future = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder());
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            log.debug(msg.toString());
                        }
                    });
                }
            })
            .connect(new InetSocketAddress("localhost", 8282));
    Channel channel = future.sync().channel();
    
    while(true){
        Scanner scanner = new Scanner(System.in);
        String s = scanner.nextLine();
        if(s.equals("q")){
            break;
        }else{
            channel.writeAndFlush(s);
        }
    }
    ChannelFuture channelFuture = channel.close();
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            group.shutdownGracefully();
        }
    });

粘包半包

原因

  • 本质上是因为TCP是流式协议,消息无边界。
  • 为了减少传输中的消耗,需要用算法(nagle算法,尽可能多的传输数据)进行处理而导致粘包,因为TCP传输时需要给报文加上若干字节的头部信息,即使是1个字节的信息。
  • MSS限制,当发送的数据超过MSS限制后,会将数据切分发送,造成半包。
  • 应用层:接收方的ByteBuf设置的太大太小。

滑动窗口

  1. 设置一个滑动窗口大小,在窗口以内的数据可以不等待服务器响应的结果就可以直接发送。在应答未到达之前,窗口必须停止滑动。接收方也要维护一个窗口,只有落在窗口内的数据才能被接受。
  2. 滑动窗口起着缓冲区的作用和流量控制的作用。

解决

  1. 短连接,发送了完整数据后就断开连接。将连接建立到连接断开作为消息边界。能解决粘包但不能解决半包,并且效率低。
  2. 定长解码器,FixedLengthFrameDecoder(和客户端约定的长度)。会造成很多内存的浪费。
  3. 分隔符来界定消息的边界,LineBasedFrameDecoder(maxLength),用\n和\r\n定界。DelimiterBasedFrameDecoder自定义分隔符。底层实现还是一个个字节去找分隔符,效率较低。
  4. 基于长度字段的LTC解码器。LengthFieldBasedFrameDecoder(最大值,长度偏移量,长度字段的长度,跳过多少字节,去掉头部多少字节)。

协议的设计与解析

redis协议

    final static byte[] LINE = {13,10};
    public static void send(ByteBuf buf,String command){
        String[] words = command.split(" ");
        buf.writeBytes(("*"+words.length).getBytes());
        buf.writeBytes(LINE);
        for(String word:words){
            buf.writeBytes(("$"+word.length()).getBytes());
            buf.writeBytes(LINE);
            buf.writeBytes(word.getBytes());
            buf.writeBytes(LINE);
        }
    }

http协议

自定义协议

要素
  1. 魔数: 用来第一时间判定是否为无效数据包。
  2. 版本号: 可以支持协议的升级
  3. 序列化算法: 消息正文到底采用哪种序列化反序列化方式,如json,protobuf。
  4. 指令类型:登陆,注册,单聊,群聊等业务相关。
  5. 请求序号:为了双工通信,提供异步能力。
  6. 正文长度。
  7. 消息正文:Json,xml,对象流等。