Kryo+Netty传输序列化对象

 2023-01-09
原文作者:Pymjl 原文地址:https://juejin.cn/post/7094169128843345933

Kryo+Netty传输序列化对象

1.概述

这段时间不是一直想要使用Netty模仿一下dubbo,自己写一个RPC框架嘛,然后在学习的过程中还是学到了不少新东西。我们知道在网络传输中,数据都是以二进制流来传输,但是在Java中数据都是以对象的形式来存储,所以我们想要传输数据,这就涉及到对象的序列化以及反序列化了。而我们知道,不同的序列化协议适用不同的应用场景,jdk原生的序列化方式因为其性能原因绝大多数的人都不会考虑使用它,而我们想要写一个高性能的RPC框架,一个合适的序列化协议自然也是重中之重,因为目前所构思的RPC框架只是Java to Java,所以我选择Kryo作为序列/反序列化的方式。

其他的序列化方式还有很多,他们都各自不同的优缺点,和不同的使用场景。想要深入了解的同学不妨参考一下下面的的文章: 美团技术团队:序列化和反序列化

202212302147488621.png

序列化协议是应用层的协议

kryo的使用方法可以参靠下面这篇文章: Kryo 使用指南

2.核心代码

    目录结构
    C:.
    ├─client 客户端代码
    ├─codec 编解码器
    ├─entity 实体类
    └─server 服务端代码

2.1.导入依赖

首先,先导入Netty的依赖,我是直接导入的Netty的所有模块

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.68.Final</version>
            </dependency>

然后导入Kryo的相关依赖,因为我使用的KryoUtil,还需要导入commons-codec的依赖,因为spring的相关依赖会和Kryo的依赖冲突,所以直接导入的Kryo-shaded,具体的原因读者可以自行百度,我不再赘述

    <!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
            <dependency>
                <groupId>com.esotericsoftware</groupId>
                <artifactId>kryo-shaded</artifactId>
                <version>4.0.0</version>
            </dependency>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>

再导入一些其他的依赖

     <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-api</artifactId>
                <version>5.8.2</version>
                <scope>test</scope>
            </dependency>
            <!-- 添加日志支持Log4j2 -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>2.6.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.6.2</version>
            </dependency>
            <!-- 代码简化 -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.16</version>
            </dependency>

2.2.KryoUtil

编写Kryo工具类,用于后面的序列反序列化对象

    package cuit.epoch.pymjl.util;
    import com.esotericsoftware.kryo.Kryo;
    import com.esotericsoftware.kryo.io.Input;
    import com.esotericsoftware.kryo.io.Output;
    import org.apache.commons.codec.binary.Base64;
    import org.objenesis.strategy.StdInstantiatorStrategy;
    ​
    import java.io.*;
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 20:07
     **/
    public class KryoUtil {
        private static final String DEFAULT_ENCODING = "UTF-8";
    ​
        //每个线程的 Kryo 实例
        private static final ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() {
            @Override
            protected Kryo initialValue() {
                Kryo kryo = new Kryo();
    ​
                /**
                 * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,
                 * 上线的同时就必须清除 Redis 里的所有缓存,
                 * 否则那些缓存再回来反序列化的时候,就会报错
                 */
                //支持对象循环引用(否则会栈溢出)
                kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置
    ​
                //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
                kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置
    ​
                //Fix the NPE bug when deserializing Collections.
                ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
                        .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
    ​
                return kryo;
            }
        };
    ​
        /**
         * 获得当前线程的 Kryo 实例
         *
         * @return 当前线程的 Kryo 实例
         */
        public static Kryo getInstance() {
            return kryoLocal.get();
        }
    ​
        //-----------------------------------------------
        //          序列化/反序列化对象,及类型信息
        //          序列化的结果里,包含类型的信息
        //          反序列化时不再需要提供类型
        //-----------------------------------------------
    ​
        /**
         * 将对象【及类型】序列化为字节数组
         *
         * @param obj 任意对象
         * @param <T> 对象的类型
         * @return 序列化后的字节数组
         */
        public static <T> byte[] writeToByteArray(T obj) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Output output = new Output(byteArrayOutputStream);
    ​
            Kryo kryo = getInstance();
            kryo.writeClassAndObject(output, obj);
            output.flush();
    ​
            return byteArrayOutputStream.toByteArray();
        }
    ​
        /**
         * 将对象【及类型】序列化为 String
         * 利用了 Base64 编码
         *
         * @param obj 任意对象
         * @param <T> 对象的类型
         * @return 序列化后的字符串
         */
        public static <T> String writeToString(T obj) {
            try {
                return new String(Base64.encodeBase64(writeToByteArray(obj)), DEFAULT_ENCODING);
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e);
            }
        }
    ​
        /**
         * 将字节数组反序列化为原对象
         *
         * @param byteArray writeToByteArray 方法序列化后的字节数组
         * @param <T>       原对象的类型
         * @return 原对象
         */
        @SuppressWarnings("unchecked")
        public static <T> T readFromByteArray(byte[] byteArray) {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
            Input input = new Input(byteArrayInputStream);
    ​
            Kryo kryo = getInstance();
            return (T) kryo.readClassAndObject(input);
        }
    ​
        /**
         * 将 String 反序列化为原对象
         * 利用了 Base64 编码
         *
         * @param str writeToString 方法序列化后的字符串
         * @param <T> 原对象的类型
         * @return 原对象
         */
        public static <T> T readFromString(String str) {
            try {
                return readFromByteArray(Base64.decodeBase64(str.getBytes(DEFAULT_ENCODING)));
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e);
            }
        }
    ​
        //-----------------------------------------------
        //          只序列化/反序列化对象
        //          序列化的结果里,不包含类型的信息
        //-----------------------------------------------
    ​
        /**
         * 将对象序列化为字节数组
         *
         * @param obj 任意对象
         * @param <T> 对象的类型
         * @return 序列化后的字节数组
         */
        public static <T> byte[] writeObjectToByteArray(T obj) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Output output = new Output(byteArrayOutputStream);
    ​
            Kryo kryo = getInstance();
            kryo.writeObject(output, obj);
            output.flush();
    ​
            return byteArrayOutputStream.toByteArray();
        }
    ​
        /**
         * 将对象序列化为 String
         * 利用了 Base64 编码
         *
         * @param obj 任意对象
         * @param <T> 对象的类型
         * @return 序列化后的字符串
         */
        public static <T> String writeObjectToString(T obj) {
            try {
                return new String(Base64.encodeBase64(writeObjectToByteArray(obj)), DEFAULT_ENCODING);
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e);
            }
        }
    ​
        /**
         * 将字节数组反序列化为原对象
         *
         * @param byteArray writeToByteArray 方法序列化后的字节数组
         * @param clazz     原对象的 Class
         * @param <T>       原对象的类型
         * @return 原对象
         */
        @SuppressWarnings("unchecked")
        public static <T> T readObjectFromByteArray(byte[] byteArray, Class<T> clazz) {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
            Input input = new Input(byteArrayInputStream);
    ​
            Kryo kryo = getInstance();
            return kryo.readObject(input, clazz);
        }
    ​
        /**
         * 将 String 反序列化为原对象
         * 利用了 Base64 编码
         *
         * @param str   writeToString 方法序列化后的字符串
         * @param clazz 原对象的 Class
         * @param <T>   原对象的类型
         * @return 原对象
         */
        public static <T> T readObjectFromString(String str, Class<T> clazz) {
            try {
                return readObjectFromByteArray(Base64.decodeBase64(str.getBytes(DEFAULT_ENCODING)), clazz);
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e);
            }
        }
    ​
    }

2.3.编写实体类

  1. 编写RPC请求类
    package cuit.epoch.pymjl.nettydemo.entity;
    ​
    import lombok.*;
    ​
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:06
     **/
    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    @Builder
    @ToString
    public class RpcRequest {
        private String interfaceName;
        private String methodName;
    }
  1. 编写RPC响应类
    package cuit.epoch.pymjl.nettydemo.entity;
    ​
    import lombok.*;
    ​
    /**
     * 服务端响应实体类
     *
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 21:48
     **/
    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    @ToString
    @Builder
    public class RpcResponse {
        private String message;
    }

2.4.编写编解码器

注意,对数据进行编码时因为TCP粘包/拆包的原因,我们这里需要自定义传输协议,然后我这里是:把传输数据的长度写在字节数组的前面四个字节中,服务端在读取数据时会先从前四个字节获取到这次传输数据的长度,在对数据进行都写操作

另外,我们需要在编解码器中将对象序列化成字节数组或者将字节数组反序列化成原对象

  1. 编码器
    package cuit.epoch.pymjl.nettydemo.codec;
    ​
    import cuit.epoch.pymjl.util.KryoUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.handler.codec.MessageToByteEncoder;
    import lombok.AllArgsConstructor;
    ​
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:03
     **/
    @AllArgsConstructor
    public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
        private final Class<?> clazz;
    ​
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            if (clazz.isInstance(o)) {
                //将对象转换为byte
                byte[] bytes = KryoUtil.writeObjectToByteArray(o);
                //读取消息的长度
                int length = bytes.length;
                //将消息长度写入到字节数组的前4个字节
                byteBuf.writeInt(length);
                //将字节数组写入到缓冲区
                byteBuf.writeBytes(bytes);
            }
    ​
        }
    }
  1. 解码器

对数据进行解码的时候需要注意此次接收到的数据是否齐全

    package cuit.epoch.pymjl.nettydemo.codec;
    
    import cuit.epoch.pymjl.util.KryoUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import lombok.AllArgsConstructor;
    import lombok.extern.log4j.Log4j2;
    
    import java.util.List;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:02
     **/
    @AllArgsConstructor
    @Log4j2
    public class NettyKryoDecoder extends ByteToMessageDecoder {
        private final Class<?> clazz;
        /**
         * Netty传输的数据长度在前四个字节
         */
        private static final int BODY_LENGTH = 4;
    
        /**
         * 解码
         *
         * @param ctx 上下文
         * @param in  字节缓冲
         * @param out 存放解码后的对象
         * @throws Exception 异常
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            //消息的长度必须大于四
            if (in.readableBytes() >= BODY_LENGTH) {
                //标记当前的readIndex的位置,方便后面重置
                in.markReaderIndex();
                //读取数据长度
                int dataLength = in.readInt();
                //判断数据是否齐全
                if (in.readableBytes() >= dataLength) {
                    //读取完整的数据
                    byte[] data = new byte[dataLength];
                    in.readBytes(data);
                    //解码
                    Object obj = KryoUtil.readObjectFromByteArray(data, clazz);
                    out.add(obj);
                    log.info("解码成功:" + obj);
                }
            }
    
        }
    }

2.5.服务端

  1. 先使用Netty初始化服务,让服务端循环监听客户端的请求
    package cuit.epoch.pymjl.nettydemo.server;
    
    import cuit.epoch.pymjl.nettydemo.codec.NettyKryoDecoder;
    import cuit.epoch.pymjl.nettydemo.codec.NettyKryoEncoder;
    import cuit.epoch.pymjl.nettydemo.entity.RpcRequest;
    import cuit.epoch.pymjl.nettydemo.entity.RpcResponse;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.log4j.Log4j2;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:23
     **/
    @Log4j2
    public class NettyServer {
        private final int port;
    
        public NettyServer(int port) {
            this.port = port;
        }
    
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        // 是否开启 TCP 底层心跳机制
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new NettyKryoDecoder(RpcRequest.class));
                                ch.pipeline().addLast(new NettyKryoEncoder(RpcResponse.class));
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        });
    
                // 绑定端口,同步等待绑定成功
                ChannelFuture f = b.bind(port).sync();
                log.info("Netty server start success, port: {}", port);
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("occur exception when start server:", e);
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
  1. 自定义服务端的Handler,处理业务
    package cuit.epoch.pymjl.nettydemo.server;
    
    import cuit.epoch.pymjl.nettydemo.entity.RpcRequest;
    import cuit.epoch.pymjl.nettydemo.entity.RpcResponse;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    import lombok.extern.log4j.Log4j2;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:26
     **/
    @Log4j2
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        private static final AtomicInteger atomicInteger = new AtomicInteger(1);
    
        /**
         * 接收客户端发送的请求
         *
         * @param ctx 上下文
         * @param msg 传输的对象
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            try {
                RpcRequest rpcRequest = (RpcRequest) msg;
                log.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement());
                RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build();
                ChannelFuture f = ctx.writeAndFlush(messageFromServer);
                f.addListener(ChannelFutureListener.CLOSE);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("server catch exception", cause);
            ctx.close();
        }
    
    }

2.6.客户端

  1. 初始化客户端
    package cuit.epoch.pymjl.nettydemo.client;
    
    import cuit.epoch.pymjl.nettydemo.codec.NettyKryoDecoder;
    import cuit.epoch.pymjl.nettydemo.codec.NettyKryoEncoder;
    import cuit.epoch.pymjl.nettydemo.entity.RpcRequest;
    import cuit.epoch.pymjl.nettydemo.entity.RpcResponse;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.util.AttributeKey;
    import lombok.extern.log4j.Log4j2;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 21:50
     **/
    @Log4j2
    public class NettyClient {
        private final String host;
        private final int port;
        private static final Bootstrap b;
    
        /**
         * 初始化Client
         *
         * @param host 服务器地址
         * @param port 服务器端口
         */
        public NettyClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        /**
         * 初始化Bootstrap等相关资源
         */
        static {
            EventLoopGroup group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //设置连接的超时时间,超过这个时间则代表连接失败
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyKryoDecoder(RpcResponse.class));
                            socketChannel.pipeline().addLast(new NettyKryoEncoder(RpcRequest.class));
                            socketChannel.pipeline().addLast(new NettyClientHandler());
    
                        }
                    });
        }
    
        /**
         * 发送请求
         *
         * @param rpcRequest 请求对象
         * @return 返回响应对象RpcResponse
         */
        public RpcResponse sendMessage(RpcRequest rpcRequest) {
            try {
                ChannelFuture f = b.connect(host, port).sync();
                log.info("client connect server success ==> {}:{}", host, port);
                Channel futureChannel = f.channel();
                log.info("client start send message");
                if (futureChannel != null) {
                    futureChannel.writeAndFlush(rpcRequest).addListener(channelFuture -> {
                        if (channelFuture.isSuccess()) {
                            log.info("client send message success ==> [{}]", rpcRequest);
                        } else {
                            log.error("send failed cause: ", channelFuture.cause());
                        }
                    });
                }
                //阻塞等待服务器返回结果
                f.channel().closeFuture().sync();
                //获取返回结果
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("response");
                RpcResponse rpcResponse = futureChannel.attr(key).get();
                if (rpcResponse != null) {
                    log.info("RpcResponse is [{}]", rpcResponse);
                    return rpcResponse;
                } else {
                    log.error("RpcResponse is Null");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
  1. 自定义客户端Handler
    package cuit.epoch.pymjl.nettydemo.client;
    
    import cuit.epoch.pymjl.nettydemo.entity.RpcResponse;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.AttributeKey;
    import io.netty.util.ReferenceCountUtil;
    import lombok.extern.log4j.Log4j2;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:04
     **/
    @Log4j2
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                RpcResponse response = (RpcResponse) msg;
                log.info("handler client receive response from server, response={}", response.toString());
                //声明一个AttributeKey对象
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("response");
                ctx.channel().attr(key).set(response);
                ctx.channel().close();
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("client caught exception", cause);
            ctx.close();
        }
    }

3.测试

3.1.服务端启动类

    package cuit.epoch.pymjl.nettydemo.server;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:48
     **/
    public class ServerMain {
        public static void main(String[] args) {
            new NettyServer(8080).run();
        }
    }

3.2.客户端启动类

    package cuit.epoch.pymjl.nettydemo.client;
    
    import cuit.epoch.pymjl.nettydemo.entity.RpcRequest;
    import cuit.epoch.pymjl.nettydemo.entity.RpcResponse;
    
    /**
     * @author Pymjl
     * @version 1.0
     * @date 2022/4/18 22:48
     **/
    public class ClientMain {
        public static void main(String[] args) {
            RpcRequest rpcRequest = RpcRequest.builder()
                    .interfaceName("interface")
                    .methodName("hello").build();
            NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);
            for (int i = 0; i < 3; i++) {
                nettyClient.sendMessage(rpcRequest);
            }
            RpcResponse rpcResponse = nettyClient.sendMessage(rpcRequest);
            System.out.println(rpcResponse.toString());
        }
    }

3.3.运行项目

  1. 启动服务端

202212302147495612.png

  1. 启动客户端

202212302147511433.png

至此,整个项目就成功运行了

4.小结

至此,Netty使用Kryo序列化对象传输数据的Demo到此为止了,这中间其实还有很多细节的地方我没有多说。比如Kryo线程不安全,需要使用ThreadLocal来保证线程安全,这进而又引出一个问题,ThreadLocal是什么?它为什么能保证线程安全?这些问题我会在后面更新一篇文章详细解释。

除此之外,Netty传输的异步机制Listenner的相关知识点我也没有做详细讲解,以及AttrbuteKey,AttributeKeyMap等,这些知识点目前笔者也是一知半解,在没有熟练掌握这些知识点前我也不敢细说,误人子弟。等我后面详细研究之后,理解通透后再更新相关的讲解文章.

最后,附上我的Log4j2的配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="ERROR" monitorInterval="600">
        <!-- configure.status 为设置日志输出级别,级别如下:OFF 、FATAL 、ERROR、WARN、INFO、DEBUG、TRACE、ALL -->
        <!-- configure.monitorInterval 监控间隔 指log4j2每隔600秒(10分钟),自动监控该配置文件是否有变化,如果变化,则自动根据文件内容重新配置 -->
    
        <Properties>
            <property name="pattern">%d %highlight{%-5level}{ERROR=Bright RED, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White} %style{[%t]}{bright,magenta} %style{%c{1.}.%M(%L)}{cyan}: %msg%n</property>
            <property name="basePath">/logs</property>
        </Properties>
    
        <!--配置appenders源:日志输出的地址-->
        <Appenders>
    
            <Console name="console" target="SYSTEM_OUT">
                <PatternLayout pattern="${pattern}"/>
            </Console>
    
            <RollingRandomAccessFile name="fileLogger"
                                     fileName="${basePath}/server.log"
                                     filePattern="${basePath}/server-%d{yyyy-MM-dd}.log"
                                     append="true">
                <PatternLayout pattern="${pattern}"/>
    
                <Policies>
                    <TimeBasedTriggeringPolicy interval="1" modulate="true"/>
                    <!--测试日志输出:-->
    <!--                <SizeBasedTriggeringPolicy size="3kb"/>-->
                    <SizeBasedTriggeringPolicy size="100MB"/>
                </Policies>
            </RollingRandomAccessFile>
        </Appenders>
    
        <!--配置logers:级别、使用的输出地-->
        <Loggers>
            <Logger name="cuit.epoch.pymjl"
                    level="debug"
                    additivity="true">
                <appender-ref ref="fileLogger"
                              level="debug"/>
            </Logger>
    
            <Root level="info" additivity="false">
                <appender-ref ref="console"/>
            </Root>
        </Loggers>
    </Configuration>