Netty的粘包拆包是如何避免消息在TCP传输过程中的错乱的

 2023-01-03
原文作者:Jony_zhang 原文地址:https://juejin.cn/post/7063640363264638989

Netty粘包拆包

TCP粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收。如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。

202212302152122931.png

为什么出现粘包现象

TCP 是面向连接的, 面向流的, 提供高可靠性服务。 收发两端(客户端和服务器端) 都要有成对的 socket,因此, 发送端为了将多个发给接收端的包, 更有效的发给对方, 使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据, 合并成一个大的数据块,然后进行封包。 这样做虽然提高了效率, 但是接收端就难于分辨出完整的数据包了, 因为面向流的通信是无消息保护边界的。 如下代码展示粘包示例:

服务端代码

服务启动代码

    package com.jony.netty.chat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class ChatServer {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //向pipeline加入解码器
                                pipeline.addLast("decoder", new StringDecoder());
                                //向pipeline加入编码器
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new ChatServerHandler());
                                //加入自己的业务处理handler
                            }
                        });
                System.out.println("聊天室server启动。。");
                ChannelFuture channelFuture = bootstrap.bind(9999).sync();
                //关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

收发消息代码

    package com.jony.netty.chat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    
    public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //GlobalEventExecutor.INSTANCE是全局的事件执行器,是一个单例
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        //表示 channel 处于就绪状态, 提示上线
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将该客户加入聊天的信息推送给其它在线的客户端
            //该方法会将 channelGroup 中所有的 channel 遍历,并发送消息
            channelGroup.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + " 上线了 " + sdf.format(new
                    java.util.Date())+ "\n");
            //将当前 channel 加入到 channelGroup
            channelGroup.add(channel);
            System.out.println(ctx.channel().remoteAddress() + " 上线了"+ "\n");
        }
    
        //表示 channel 处于不活动状态, 提示离线了
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将客户离开信息推送给当前在线的客户
            channelGroup.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + " 下线了"+ "\n");
            System.out.println(ctx.channel().remoteAddress() + " 下线了"+ "\n");
            System.out.println("channelGroup size=" + channelGroup.size());
        }
    
        //读取数据
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            //获取到当前 channel
            Channel channel = ctx.channel();
            //这时我们遍历 channelGroup, 根据不同的情况, 回送不同的消息
            channelGroup.forEach(ch -> {
                if (channel != ch) { //不是当前的 channel,转发消息
                    ch.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + " 发送了消息:" + msg + "\n");
                } else {//回显自己发送的消息给自己
                    ch.writeAndFlush("[ 自己 ]发送了消息:" + msg + "\n");
                }
            });
            System.out.println(msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //关闭通道
            ctx.close();
        }
    }

客户端代码

客户端连接代码

    package com.jony.netty.chat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    public class ChatClient {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new StringDecoder());
    
                                pipeline.addLast(new ChatClientHandler());
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
                //得到 channel
                Channel channel = channelFuture.channel();
                System.out.println("========" + channel.localAddress() + "========");
                //客户端需要输入信息, 创建一个扫描器
    //            Scanner scanner = new Scanner(System.in);
    //            while (scanner.hasNextLine()) {
    //                String msg = scanner.nextLine();
    //                //通过 channel 发送到服务器端
    //                channel.writeAndFlush(msg);
    //            }
                for (int i = 0; i < 200; i++) {
                    channel.writeAndFlush("hello,jony!");
                }
            } finally {
    //            group.shutdownGracefully();
            }
        }
    }

客户端代码主要使用了如下代码,进行消息频繁发送

    for (int i = 0; i < 200; i++) {
        channel.writeAndFlush("hello,jony!");
    }

客户端收发消息代码

    package com.jony.netty.chat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg.trim());
        }
    }

执行结果

可以看到执行的结果,即时每次for循环只发送一次消息,但是TCP经过优化处理之后,就会把消息进行粘包处理,虽然效率提升了,但是这不是我们想要的结果,下面来介绍如何拆包

    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!
    [ 自己 ]发送了消息:hello,jony!hello,jony!hello,jony!hello,jony!

解决方案

1)格式化数据:每条数据有固定的格式(开始符、结束符),这种方法简单易行,但选择开始符和结束符的时候一定要注意每条数据的内部一定不能出现开始符或结束符。
缺点: 此方案在发送消息的时候可以给消息添加固定的分隔符,在读取消息的再通过分隔符进行拆包,但是这种方案,代码可维护性低,一方面万一在发送消息的时候有了我们预设的分隔符,消息就会错乱,另外一方面,后面其他人员在维护代码的时候,或者增加其他逻辑的时候,在不知情的情况下,很容易忽视这个分割符。

2)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。

通过以上方案的了解,显然第二种方案更稳妥,废话不多说,上代码。

拆包

创建消息的封装类

需要两个字段,一个字符长度,一个字符内容

    package com.jony.netty.split;
    
    /**
     * 自定义协议包
     */
    public class MyMessageProtocol {
    
        //定义一次发送包体长度
        private int len;
        //一次发送包体内容
        private byte[] content;
    
        public int getLen() {
            return len;
        }
    
        public void setLen(int len) {
            this.len = len;
        }
    
        public byte[] getContent() {
            return content;
        }
    
        public void setContent(byte[] content) {
            this.content = content;
        }
    }

服务端代码

服务端连接及添加编编解码器

如果服务端不发消息,只需要收消息,则只添加解码器即可

    package com.jony.netty.split;
    
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class MyServer {
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new MyMessageDecoder());
                                pipeline.addLast(new MyServerHandler());
                            }
                        });
    
                System.out.println("netty server start。。");
                ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

核心代码: 主要添加了解码器和消息处理器

    pipeline.addLast(new MyMessageDecoder()); 
    pipeline.addLast(new MyServerHandler());

解码器(数据处理解析)

此代码根据消息封装类中的字符串长度,然后读取字符串,发送到下一个Hander

    package com.jony.netty.split;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    public class MyMessageDecoder extends ByteToMessageDecoder {
    
        int length = 0;
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println();
            System.out.println("MyMessageDecoder decode 被调用");
            //需要将得到二进制字节码-> MyMessageProtocol 数据包(对象)
            System.out.println(in);
    
            if(in.readableBytes() >= 4) {
                if (length == 0){
                    length = in.readInt();
                }
                if (in.readableBytes() < length) {
                    System.out.println("当前可读数据不够,继续等待。。");
                    return;
                }
                byte[] content = new byte[length];
                if (in.readableBytes() >= length){
                    in.readBytes(content);
    
                    //封装成MyMessageProtocol对象,传递到下一个handler业务处理
                    MyMessageProtocol messageProtocol = new MyMessageProtocol();
                    messageProtocol.setLen(length);
                    messageProtocol.setContent(content);
                    out.add(messageProtocol);
                }
                length = 0;
            }
        }
    }

消息读写

    package com.jony.netty.split;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    
    public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {
    
        private int count;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
            System.out.println("====服务端接收到消息如下====");
            System.out.println("长度=" + msg.getLen());
            System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8));
    
            System.out.println("服务端接收到消息包数量=" + (++this.count));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

客户端代码

客户端连接及添加相关组件

    package com.jony.netty.split;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class MyClient {
        public static void main(String[] args)  throws  Exception{
    
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new MyMessageEncoder());
                                pipeline.addLast(new MyClientHandler());
                            }
                        });
    
                System.out.println("netty client start。。");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully();
            }
        }
    }

客户端编码器

    package com.jony.netty.split;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {
        @Override
        protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {
            System.out.println("MyMessageEncoder encode 方法被调用");
            out.writeInt(msg.getLen());
            out.writeBytes(msg.getContent());
        }
    }

客户端消息处理器

将每次发送的消息转为byte,然后将消息长度和内容set到消息封装类中,进行发送

    package com.jony.netty.split;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            for(int i = 0; i< 200; i++) {
                String msg = "你好,我是张三!";
                //创建协议包对象
                MyMessageProtocol messageProtocol = new MyMessageProtocol();
                messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
                messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
                ctx.writeAndFlush(messageProtocol);
            }
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }

执行结果

正常读取

    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 0, widx: 1024, cap: 1024)
    ====服务端接收到消息如下====
    长度=24
    内容=你好,我是张三!
    服务端接收到消息包数量=1
    
    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 28, widx: 1024, cap: 1024)
    ====服务端接收到消息如下====
    长度=24
    内容=你好,我是张三!
    服务端接收到消息包数量=2
    
    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 56, widx: 1024, cap: 1024)
    ====服务端接收到消息如下====
    长度=24
    内容=你好,我是张三!
    服务端接收到消息包数量=3

根据以上信息看如下代码,我们可以知道当前一次我们读取了1024个长度的消息,并且消息内容不在有粘包的情况。

    PooledUnsafeDirectByteBuf(ridx: 0, widx: 1024, cap: 1024)

消息不连续

    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 980, widx: 1024, cap: 1024)
    ====服务端接收到消息如下====
    长度=24
    内容=你好,我是张三!
    服务端接收到消息包数量=36
    
    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 1008, widx: 1024, cap: 1024)
    当前可读数据不够,继续等待。。
    
    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 1012, widx: 1024, cap: 1024)
    当前可读数据不够,继续等待。。
    
    MyMessageDecoder decode 被调用
    PooledUnsafeDirectByteBuf(ridx: 1012, widx: 4172, cap: 8192)
    ====服务端接收到消息如下====
    长度=24
    内容=你好,我是张三!
    服务端接收到消息包数量=37

根据以上代码可以看到消息长度不够的时候,程序会等待读取,等到下次消息长度够的时候,再继续读取,此时我们看到

    PooledUnsafeDirectByteBuf(ridx: 1012, widx: 4172, cap: 8192)

ridx为1012,之前的widx为1024,也就是有2个剩余带读取的长度不够,然后程序不进行读取,等到下次字符长度没问题的时候继续读取,这样就避免了TCP在数据传输的时候,自动给我们粘包或者拆包造成数据错乱的问题。