池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。
网络编程
核心代码
服务端
//服务器端的启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
//多个组,每个组包含Boss和worker(selector+thread)
.group(new NioEventLoopGroup())
//选择channel的具体实现
.channel(NioServerSocketChannel.class)
//决定worker将来能执行哪些操作
.childHandler(
//初始化通道,负责添加别的handler(操作)
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//添加具体的handler
nioSocketChannel.pipeline().addLast(new StringDecoder());//将ByteBuf 转换为字符串
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义handler
//读事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//绑定监听端口
.bind(8282);
客户端(同步去处理结果)
EventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//connect是异步非阻塞的,main线程发起了调用,真正执行connect是nio线程。所以下一步必须执行sync方法阻塞等待连接建立。
//否则直接跳过阻塞获取到channel,此时还未连接就会出问题。
.connect(new InetSocketAddress("localhost", 8282))
//1.使用sycn方法同步处理结果,阻塞方法,直到连接建立
.sync()
//获取channel连接对象去读写
.channel();
while (true){
Scanner scanner = new Scanner(System.in);
String s = scanner.nextLine();
if(s.equals("q")){
//同理connect方法,channel的close方法也是异步非阻塞的,需要sync同步处理结果,直到连接断开。
ChannelFuture closeFuture = channel.close();
closeFuture.sync();
break;
}else{
channel.writeAndFlush(s);
}
}
//虽然主线程的连接断开了,但是还有nio的线程需要优雅的关闭。
group.shutdownGracefully();
理解
- handler为一道工序,pipeline为流水线,多道工序合在一起就是一个流水线。
- 每次有一个事件发生,pipeLine就会负责发布事件,传播给每个handler,handler对自己感兴趣的事件进行处理。
- eventLoop为处理数据的工人,负责管理多个channel的io操作,并且一旦工人负责了某个channel就要负责到底,即绑定。(能够保证线程安全)
- eventLoop除了io操作还可以进行任务处理,并且有任务队列。
- eventLoop按照pipeline里的handler顺序去处理数据。
ByteBuf
直接内存和堆内存
- 直接内存创建和销毁的代价昂贵,但是读写性能高,适合配合池化功能。
- 直接内存对GC压力小,不受JVM垃圾回收管理,但要注意主动释放。
池化和非池化
- 池化可以重用ByteBuf实例,高并发下,池化功能更节约内存,减少内存溢出的可能。
- 没有池化,每次都要创建新的实例,这种方式对直接内存代价太高,对于堆内存会增加GC压力。
内存回收
- UnpooledHeapByteBuf使用JVM内存,只需要等待GC回收内存。
- UnpooledDirectByteBuf,需要特殊的方法来回收。
- PooledByteBuf采用的池化的机制,需要更复杂的规则来回收。
- 因为pipeline的存在,一般需要将ByteBuf传递给下一个handler,如果在finally中release了,就失去了传递性,所以谁是ByteBuf(如果某个handler对ByteBuf进行了转换也算)最后的使用者,谁来负责release。
零拷贝的体现
- slice方法,划分为多个ByteBuf分开进行读和修改(不能进行写操作,切片后的ByteBuf如果进行写,会影响原有的ByteBuf),但是多个ByteBuf共用的同一片内存,并没有进行数据复制。
- duplicate方法,相当于截取了原始的ByteBuf的全部内容,但是用的是同一块底层内存,只是读写指针式独立的。
- Unpooled.wrappedBuffer(buf1,buf2)将两个buf组合为一个buf。
相比较ByteBuffer的优势
- 池化,可以重用ByteBuf实例,节约内存,减少内存溢出的可能。
- 读写指针分离,不需要切换读写模式。
- 自动扩容。
- 支持链式调用。
- 许多方法体现零拷贝。
双向通信的实现
服务端
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug(msg.toString());
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
super.channelRead(ctx, msg);
}
});
}
})
.bind(8282);
客户端
EventLoopGroup group = new NioEventLoopGroup();
ChannelFuture future = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug(msg.toString());
}
});
}
})
.connect(new InetSocketAddress("localhost", 8282));
Channel channel = future.sync().channel();
while(true){
Scanner scanner = new Scanner(System.in);
String s = scanner.nextLine();
if(s.equals("q")){
break;
}else{
channel.writeAndFlush(s);
}
}
ChannelFuture channelFuture = channel.close();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
group.shutdownGracefully();
}
});
粘包半包
原因
- 本质上是因为TCP是流式协议,消息无边界。
- 为了减少传输中的消耗,需要用算法(nagle算法,尽可能多的传输数据)进行处理而导致粘包,因为TCP传输时需要给报文加上若干字节的头部信息,即使是1个字节的信息。
- MSS限制,当发送的数据超过MSS限制后,会将数据切分发送,造成半包。
- 应用层:接收方的ByteBuf设置的太大太小。
滑动窗口
- 设置一个滑动窗口大小,在窗口以内的数据可以不等待服务器响应的结果就可以直接发送。在应答未到达之前,窗口必须停止滑动。接收方也要维护一个窗口,只有落在窗口内的数据才能被接受。
- 滑动窗口起着缓冲区的作用和流量控制的作用。
解决
- 短连接,发送了完整数据后就断开连接。将连接建立到连接断开作为消息边界。能解决粘包但不能解决半包,并且效率低。
- 定长解码器,FixedLengthFrameDecoder(和客户端约定的长度)。会造成很多内存的浪费。
- 分隔符来界定消息的边界,LineBasedFrameDecoder(maxLength),用\n和\r\n定界。DelimiterBasedFrameDecoder自定义分隔符。底层实现还是一个个字节去找分隔符,效率较低。
- 基于长度字段的LTC解码器。LengthFieldBasedFrameDecoder(最大值,长度偏移量,长度字段的长度,跳过多少字节,去掉头部多少字节)。
协议的设计与解析
redis协议
final static byte[] LINE = {13,10};
public static void send(ByteBuf buf,String command){
String[] words = command.split(" ");
buf.writeBytes(("*"+words.length).getBytes());
buf.writeBytes(LINE);
for(String word:words){
buf.writeBytes(("$"+word.length()).getBytes());
buf.writeBytes(LINE);
buf.writeBytes(word.getBytes());
buf.writeBytes(LINE);
}
}
http协议
自定义协议
要素
- 魔数: 用来第一时间判定是否为无效数据包。
- 版本号: 可以支持协议的升级
- 序列化算法: 消息正文到底采用哪种序列化反序列化方式,如json,protobuf。
- 指令类型:登陆,注册,单聊,群聊等业务相关。
- 请求序号:为了双工通信,提供异步能力。
- 正文长度。
- 消息正文:Json,xml,对象流等。