Netty 异步高性能通信框架
互联网行业:RPC框架大量引入Netty,Dubbo 中默认使用Netty做通信框架,大型网络游戏,地图服务器,在大数据领域(AVRO实现数据文件共享)默认采用Netty做跨界点通信,Netty Service 对Netty二次封装...
1. IO模型
1.1 BIO 模型
特点:每建立一个连接就会创建一个线程,没有连接就会阻塞等待
package com.zhj.test.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author zhj
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
// 线程池机制
// 思路
// 1. 创建一个线程
// 2. 如果有客户端连接,就创建一个线程,与之通信(单独写一个方法)
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器程序启动!");
while (true) {
// 监听,等待客户端连接
System.out.println("等待连接!!!");
final Socket socket = serverSocket.accept();
System.out.println("连接一个客户端(socket)!");
// 创建一个线程与之通讯
executorService.execute(new Runnable() {
@Override
public void run() {
// 可以与客户端通讯
handler(socket);
}
});
}
}
/**
* 与客户端通讯
*/
public static void handler(Socket socket) {
byte[] bytes = new byte[1024];
try {
InputStream inputStream = socket.getInputStream();
// 循环读取客户端读取的数据
while (true) {
System.out.println("等待输入数据!!!");
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(Thread.currentThread().getName() + " : " + Thread.currentThread().getId());
System.out.println("接收:" + new String(bytes, 0, read));
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭与客户端的连接!");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
1.2 NIO
NIO 全称java non-blocking IO 是指JDK提供的新的API。从JDK1.4开始,Java提供了一系列改进输入输出的新特性,被统称NIO(New IO),是同步非阻塞的。
三大核心部分:Channel(通道),Buffer缓存区),Selector(选择题)
NIO是面向缓冲区,或者面向块编程的,数据读到一个它稍后处理的缓冲区,需要时可在缓冲区前后移动,这就增加了它处理过程中的灵活性,使他可以提供非阻塞式的高伸缩性网络。
特点:
- 非阻塞 不需要线程一直等待,有别的任务线程也可以去执行
- 一个线程可以处理多个连接,当大量请求到服务器,不需要每个连接开一个线程
HTTP2.0采用多路复用技术,同一个连接处理多个请求。
三大核心组件的关系
- 每个Channel都会对应一个Buffer
- Selector对应一个线程,一个线程对应多个Channel连接
- 该图反应了三个Channel 注册到改Selector 程序
- 程序切换到那个Channel是由事件决定的,Event就是一个重要的概念
- Selector会根据不同的时间再各个通道上切换
- Buffer就是一个内容块,底层是与一个数组的
- 数据的读取写入是通过Buffer,这个与BIO有本质区别,BIO要么是输入流,要么是输出流,不能是双向的,NIO的Buffer是可以读也可以写的,需要flip方法切换
- Channel是双向的,可以返回底层操作系统的情况,Linux底层的操作系统就是双向的
1.2.1 Buffer缓冲区的使用
- Capacity 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
- Limit 表示缓冲区当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
- Position 位置,下一个要读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写操作做准备
- Mark 标记
package com.zhj.test.bio;
import java.nio.IntBuffer;
/**
* @author zhj
*/
public class BasicBuffer {
public static void main(String[] args) {
// 举例说明Buffer 的使用
// 创建一个Buffer
IntBuffer intBuffer = IntBuffer.allocate(5);
// 向Buffer 存数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
// 从Buffer 读取数据
// 将Buffer转换,读写切换
/*
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
*/
intBuffer.flip();
// 设置读取位置
intBuffer.position(2);
// 设置读取结束位置
intBuffer.limit(4);
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}
public class NIOByteBufferPutGet {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.putInt(100);
buffer.putLong(9L);
buffer.putChar('强');
buffer.putShort((short) 4);
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
}
}
public class ReadOnlyBuffer {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
buffer.put((byte) i);
}
buffer.flip();
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
// 只读不能放数据
// readOnlyBuffer.put((byte) 1);
}
}
/**
* MappedByteBuffer 说明
* 1. 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
* @author zhj
*/
public class MappedByteBufferTest {
public static void main(String[] args) throws Exception {
File file1 = new File("E:\\data_file\\log1.txt");
File file2 = new File("E:\\data_file\\log2.txt");
RandomAccessFile randomAccessFile = new RandomAccessFile(file1, "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
/**
* 参数(1读写模式,2起始位置,3映射到内存大小)
*/
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
randomAccessFile.close();
System.out.println("修改成功~");
}
}
1.2.2 Channel通道的使用
基本介绍
1)NIO的通道类似与流,但区别如下
- 通道可以同时进行读写,而流只能进行读或者写
- 通道可以实现异步读写数据
- 通道可以从缓冲区读取数据,也可以写数据到缓冲区
2)BIO中的stream 是单向的,如FileinputStream对象只能进行读取数据的操作,而NIO中的通道是双向的,可以读,也可以写
3)Channel 在NIO中是一个接口
4)常用的Channel类有 FileChannel、DatagramChannel、ServerSocketChannel 和SocketChannel
5)FileChannel用于文件的数据读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写
FileChannel 类
- read 将通道数据读取到缓冲区中
- write 把缓冲区的数据写到通道
- transferFrom() 从目标通道中复制数据到当前通道
- transferTo() 把数据从当前通道复制给目标通道
1.3 NIO与BIO的比较
- BIO是以流的方式处理的,而NIO以块的方式处理数据,块IO的效率比流IO的高很多
- BIO是阻塞的,NIO是非阻塞的
- BIO基于字节流和字符流进行操作,而NIO基于Channel通道和Buffer缓冲区进行操作,数据总是从通道读到缓冲区中,或者从缓冲区写入到通道中。Sellector选择器用于监听多个通道的事件比如连接请求,数据到达等,因此使用单个线程就可以监听多个客户端通道
4 AIO 了解
- JDK 7 引入Asynchronous I/O ,即AIO.在进行I/O编程中,常用到两种模式;Reactor 和 Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应处理
- AIO即NIO 2.0 ,叫异步不阻塞IO.AIO引入异步通道的概念,采用了proactor模式,简化了程序的编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
- 目前AIO没有被广泛应用,Netty也是基于NIO,而不是AIO
BIO | NIO | AIO | |
---|---|---|---|
IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
编程难度 | 简单 | 复杂 | 负载 |
可靠性 | 差 | 好 | 好 |
吞吐量 | 低 | 高 | 高 |
5 Netty 概述
异步的基于事件驱动的网络应用的框架,用于快速开发高性能,高可靠的网络IO程序
原生NIO存在的问题
- NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel,SocketChannel、ByteBuffer等。
- 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到 Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。一
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
- JDK NIO的Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector空轮询,最终导致CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决。
Netty 的优点
Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
- 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池
- 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK 5 (Netty3.x或6 (Netty 4.x)就足够了。
- 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的SSL/TLS和StartTLS支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更多的新功能会被加入
Netty版本说明
- netty版本分为netty3.x和netty4.x、netty5.x
- 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本
- 目前在官网可下载的版本netty3.x netty4.0.x和netty4.1.x4)
- 在本套课程中,我们讲解Netty4.1.x版本
- netty下载地址:bintray.com/netty/downl…
6 Netty 线程模型
6.1 线程模型
传统阻塞I/O服务模型 和 Reactor模式(单Reactor单线程、单Reactor多线程、主从Reactor多线程)
Netty基于主从Reactor多线程模型
传统IO模型
缺点
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费
Reactor(反应器模式,分发者模式,通知者模式)
针对传统阻塞I/o服务模型的2个缺点,解决方案:
- 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
Reactor模式中核心组成:
- Reactor: Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对I0O事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
- Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
单Reactor单线程
单Reactor多线程
方案说明
- Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
- 如果建立连接请求,则右Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
- 如果不是连接请求,则由reactor分发调用连接对 应的handler来处理
- handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
- worker线程池会分配独立线程完成真正的业务,并将结果返回给handler
- handler收到响应后,通过send将结果返回给client
主从Reactor多线程
方案优缺点说明:
- 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
- 缺点:编程复杂度较高
结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持
Netty模型
6.2 Netty简单案例
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
</dependencies>
// 服务端
public class SimpleNettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 BossGroup 和 WorkerGroup
// BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
// 两个线程组都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(32);
// 创建服务器端启动对象,配置参数
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 链式编程
serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 设置NIO通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
// 创建通道初始化对象
// 给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new SimpleNettyServerHandler());
}
}); // 给workerGroup管道设置处理器
System.out.println("服务器初始化完毕!!!");
// 启动服务器,并绑定端口,并且同步处理
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 对关闭通道进行监听 (异步模型)
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// 服务端处理器
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据 默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
System.out.println("server ctx = " + ctx);
// 将msg转为一个ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址为:" + ctx.channel().remoteAddress());
// 比如 这有一个非常耗时的业务 需要异步执行
// 解决方案1 用户自定义普通任务 taskQueue
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵2",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
// 15 秒
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵3",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
});
// 解决方案2 用户自定义定时任务 scheduleTaskQueue
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
// 20 秒
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~ 喵4",CharsetUtil.UTF_8));
} catch (InterruptedException e) {
System.out.println("服务端发生异常了!!!");
}
}
},5, TimeUnit.SECONDS);
// 解决方案3 非当前Reactor 线程调用Channel的各种方法
System.out.println("Go to...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// write + flush 将数据写入缓冲并刷新
// 发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 发生异常,关闭通道
ctx.close();
}
}
// 客户端
public class SimpleNettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new SimpleNettyClientHandler());
}
});
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
// 客户端处理器
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 通道就绪就可以发送消息
System.out.println("client active ctx = " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务器!",CharsetUtil.UTF_8));
}
/**
* 通道有读取数据时,会触发
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据 默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("client read ctx = " + ctx);
// 将msg转为一个ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器端地址为:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常,关闭通道
ctx.close();
}
}
7 Netty异步模型
基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。
- 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
- Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)
8 Netty入门实例 Http服务
public class TestHttpServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestHttpServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class TestHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个netty 提供的httpServerCodec => [coder - decoder]
// HttpServerCodec 说明
// 1.HttpServerCodec 是netty提供的处理http的编码解码器
pipeline.addLast("MyHttpServerCodeC", new HttpServerCodec());
// 2.增加自定义handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
// 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("请求了favicon.ico资源 不做处理!!!");
return;
}
// 每次请求都会产生新的
System.out.println("pipeline hashcode" + ctx.pipeline().hashCode());
System.out.println("TestHttpServerHandler hashcode" + this.hashCode());
System.out.println("msg 类型 : " + msg.getClass());
System.out.println("客户端地址 : " + ctx.channel().remoteAddress());
// 回复信息给浏览器 [http协议]
ByteBuf content = Unpooled.copiedBuffer("Hello,服务器", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 将构建好 response 返回
ctx.writeAndFlush(response);
}
}
}
9 Netty 核心模块
Bootstrap、ServerBootstrap
-
Bootstrap意思是引导,一个Netty应用通常由一个 Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中 Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类
-
常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoopGrouppublic B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现public B option(ChannelOption option, Tvalue),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption, Tvalue),用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) WorkerGroup
public ServerBootstrap Handler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) BossGroup public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort),该方法用于客户端,用来连接服务器端
Future、ChannelFuture
- Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和—ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
- 常见的方法有Channel channel(),返回当前正在进行IO操作的通道ChannelFutyre sync(),等待异步操作执行完毕
Channel
-
Netty 网络通信的组件,能够用于执行网络I/o操作。
-
通过Channel可获得当前网络连接的通道的状态
-
通过Channel可获得网络连接的配置参数(例如接收缓冲区大小)
-
Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成
-
调用立即返回一个 ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方
-
支持关联I/O操作与对应的处理程序
-
不同协议、不同的阻塞类型的连接都有不同的 Channel类型与之对应
常用的Channel类型:
- NioSocketChannel,异步的客户端 TCP Socket 连接。
- NioServerSocketChannel,异步的服务器端TCP Socket连接。
- NioDatagramChannel,异步的UDP连接。
- NioSctpChannel,异步的客户端 Sctp连接。
- NioSctpServerChannel,异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO。
Selector
- Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的 Channel事件。
- 当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select) 这些注册的Channel是否有己就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel
10 Netty 群聊
// 服务端
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入解码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(port).sync();
System.out.println("【服务器】启动完成~");
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatServer(7000).run();
}
}
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// 定义一个channelGroup 管理所有的channel
// GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单列
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// 私聊
// private static Map<String, Channel> channelMap = new HashMap<>();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 连接建立,第一个被执行 将 channel 加入 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将客户加入聊天的信息推送其他客户端
// 不需要自己遍历
channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天室。" + sdf.format(new Date()) + "\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 可以自动执行 channelGroup.remove(channel);
channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开聊天室。" + sdf.format(new Date()) + "\n");
}
// channel 处于活动状态
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 上线了。\n");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【客户端】" + ctx.channel().remoteAddress() + " 跑路了。\n");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if (channel != ch) {
ch.writeAndFlush("【客户端】" + channel.remoteAddress() + " : " + s + "\n");
} else {
// 回显
ch.writeAndFlush("【我】" + " : " + s + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【服务端】 开小差了~");
ctx.close();
}
}
// 客户端
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入解码器
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
Channel channel = cf.channel();
System.out.println("【客户端】" + channel.localAddress() + " 启动完成~");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
} finally {
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s.trim());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【客户端】 开小差了~");
ctx.close();
}
}
11 Netty 实现Websocket
改变http协议的状态码为101,升级成为ws协议实现全双工长连接通信
服务端
package com.zhj.test.netty.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @author zhj
*/
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个线程组 BossGroup 和 WorkerGroup
// BossGroup 只处理连接请求 WorkerGroup 处理与客户端的业务处理
// 两个线程组都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建服务器端启动对象,配置参数
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 链式编程
serverBootstrap.group(bossGroup,workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 设置NIO通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) // 设置保持活动连接状态
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
// 创建通道初始化对象
// 给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 基于Http协议的,添加Http编解码器
pipeline.addLast(new HttpServerCodec());
// 以块的方式写,添加ChunkedWriteHandler
pipeline.addLast(new ChunkedWriteHandler());
// http数据在传输是分段的 HttpObjectAggregator 可以将多段聚合
pipeline.addLast(new HttpObjectAggregator(8192));
// websocket 数据以帧(frame)形式传递
// websocket 有六个子类
// 浏览器请求时 ws://localhost:7000/xxx 表示请求的uri
// websocket 核心功能是将http协议升级为ws 保持长连接
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new WebSocketHandler());
}
}); // 给workerGroup管道设置处理器
System.out.println("服务器初始化完毕!!!");
// 启动服务器,并绑定端口,并且同步处理
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
// 对关闭通道进行监听 (异步模型)
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.zhj.test.netty.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* 这里TextWebSocketFrame 类型,表示一个文本帧(frame)
* @author zhj
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器接收消息:" + msg.text());
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器响应时间:" + LocalDateTime.now() + msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id 表示唯一的一个值
System.out.println("handler added 被调用 " + ctx.channel().id().asLongText());
System.out.println("handler added 被调用 " + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler removed 被调用 " + ctx.channel().id().asLongText());
System.out.println("handler removed 被调用 " + ctx.channel().id().asShortText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close();
}
}
客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>websocket</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持websocket编程
if (window.WebSocket) {
// go on
socket = new WebSocket("ws://localhost:7000/hello")
// ev收到服务器端发送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById('responseText')
rt.value = rt.value + "\n" + ev.data
}
// 相当于连接开启
socket.onopen = function (ev) {
var rt = document.getElementById('responseText')
rt.value = '连接开启~'
}
socket.onclose = function (ev) {
var rt = document.getElementById('responseText')
rt.value = rt.value + '\n' + '连接关闭~'
}
} else {
alert("您的浏览器太菜了,不支持websocket")
}
function send(message) {
// 发送消息到服务器
if (!window.socket) {
return
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px;width: 300px" ></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px;width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
12 Protobuf 序列化数据
Protobuf基本介绍
- Protobuf是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC[远程过程调用remote procedure call ]数据交换格式。 自前很多公司http+json ---> tcp+protobuf
- 参考文档:developers.google.com/protocol-bu… 语言指南
- Protobuf是以 message的方式来管理数据的.
- 支持跨平台、跨语言,即客户端和服务器端可以是不同的语言编写的(支持目前绝 大多数语言,例如C++、C#、Java、python等)
- 高性能,高可靠性
- 使用propobuf编译能自动生成代码,Protobuf是将类定义使用.proto文件进行描述。说明,在idea中编写.proto文件时,会自动提示是否下载.ptotot编写插件.可以让语法高亮。
- 然后通过protoc.exe编译器根据.proto自动生成.java文件
- protobuf使用 user.proto -> protoc.exe ->user.java 编码 传递二进制 服务端解码
protoc.exe --java_out=. Student.proto
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
int32 id = 1;
string name = 2;
}
syntax = "proto3"; // 版本
option optimize_for = SPEED; // 加快解析
option java_package = "com.zhj.test.netty.codec2"; // 指生成到哪个包下
option java_outer_classname = "DataInfo"; // 生成的外部类名 同时也是文件名
// protobuf 使用message 管理数据 其他的message
message MyMessage {
// 定义一个枚举
enum DataType {
StudentType = 0; // 在proto3 要求enum 编号从0开始
workerType = 1;
}
// 用data_type来标识传的是哪一个枚举类型
DataType data_type = 1;
// 表示每次枚举类型只能出现其中的一个,节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student { // 内部类 真正发送的POJO对象 1表示属性序号
int32 id = 1;
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
13 Netty 编解码
入站先解码,再执行自己的业务处理器,出站先执行自己的业务处理器,再编码
解码器-ReplayingDecoder
- public abstract class ReplayingDecoder extends ByteToMessageDecoder
- ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数T指定了用户状态管理的类型,其中Void代表不需要状态管理
- ReplayingDecoder使用方便,但它也有一些局限性:并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedoperationException。ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
其它解码器
- LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(In或者Irln)作为分隔符来解析数据。
- DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
- HttpObjectDecoder:一个HTTP数据的解码器
- LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
14 TCP粘包和拆包基本介绍
- TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有—一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为 面向流的通信是无消息保护边界的
- 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘
传输对象(协议包)
package com.zhj.test.netty.protocoltcp;
import java.util.Arrays;
/**
* 协议包
* @author zhj
*/
public class MessageProtocol {
private int len;
private byte[] content;
public MessageProtocol() {
}
public MessageProtocol(int len, byte[] content) {
this.len = len;
this.content = content;
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
@Override
public String toString() {
return "MessageProtocol{" +
"len=" + len +
", content=" + Arrays.toString(content) +
'}';
}
}
编解码器
package com.zhj.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* @author zhj
*/
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
System.out.println("MyMessageDecoder decode 方法被调用");
int length = byteBuf.readInt();
byte[] content = new byte[length];
byteBuf.readBytes(content);
MessageProtocol messageProtocol = new MessageProtocol(length, content);
list.add(messageProtocol);
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author zhj
*/
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) throws Exception {
System.out.println("MyMessageEncoder encoder方法被调用");
byteBuf.writeInt(messageProtocol.getLen());
byteBuf.writeBytes(messageProtocol.getContent());
}
}
服务端
package com.zhj.test.netty.protocoltcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author zhj
*/
public class TcpClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new TcpClientInitializer());
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author zhj
*/
public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyMessageEncoder());
// 2.增加自定义handler
pipeline.addLast("MyTcpServerHandler", new TcpServerHandler());
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
// System.out.println(new String(msg.array(), CharsetUtil.UTF_8));
System.out.println("服务端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("服务端接收的数据长度 " + msg.getLen());
System.out.println("服务器接收到的消息量:" + (++this.count));
String responseContent = UUID.randomUUID().toString();
int len = responseContent.getBytes(StandardCharsets.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol(len, responseContent.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(messageProtocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务器异常:" + cause.getMessage());
ctx.close();
}
}
客户端
package com.zhj.test.netty.protocoltcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author zhj
*/
public class TcpClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
// 创建客户端启动对象
try {
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new TcpClientInitializer());
System.out.println("客户端启动完成!!!");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* @author zhj
*/
public class TcpClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyMessageDecoder());
// 2.增加自定义handler
pipeline.addLast("MyTestTcpClientHandler", new TcpClientHandler());
}
}
package com.zhj.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
/**
* 说明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. httpObject 客户端和服务端相互通讯的数据封装成HttpObject
* @author zhj
*/
public class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 使用客户端发送10条数据 hello server
for (int i = 0; i < 60; i++) {
String mes = "今天真带劲。。。";
byte[] content = mes.getBytes(StandardCharsets.UTF_8);
int length = mes.getBytes(StandardCharsets.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol(length, content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("客户端接收的数据 " + new String(msg.getContent(), CharsetUtil.UTF_8));
System.out.println("客户端接收的数据长度 " + msg.getLen());
System.out.println("客户端接收到的消息量:" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常信息:" + cause.getMessage());
ctx.close();
}
}
15 RPC 调用流程
RPC 基本介绍
- RPC (Remote Procedure Call) —远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
- 常见的RPC框架:Dubbo、google 的 gRPC、Go语言的rpxc、Apache的thrift,Spring旗下的Spring Cloud
自己实现dubbo RPC(基于Netty)需求说明
- dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框
- 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.x
设计说明
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据
代码实现: