以下思路来自黑马netty教程
JDK的nio服务端启动
因为netty底层使用的是nio,所以先回顾nio代码的流程,然后再看netty的启动的流程。 JDK的NIO代码启动主要步骤
- 创建
ServerSocketChannel
,channel配置为非阻塞,并且绑定端口 - 把channel关注的accept事件注册到select
- 循环调用select,拿到就绪的key
- 如果是accept事件,建立
SocketChannel
,并且把这个chanel关注的读写事件 - 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。
public class WriteServer {
public static void main(String[] args) throws IOException {
// 1. channel配置为非阻塞,并且绑定端口
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
// 2. 把channel关注的accept时间注册到select
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
// 3. 拿到就绪的key
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// 4. 如果是accept事件,建立socketChannel,并且把这个chanel关注的读写事件注册到select
} else if (key.isWritable() || key.isReadable()) {
// 5. 如果是可写或者可写事件,通过key拿到channel进行相应的读写操作。
}
}
}
}
}
使用netty的主干代码
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//4 启动 nio boss 线程执行接下来的操作
//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
netty的启动流程
服务端启动demo
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
}
});
serverSocketChannel = serverBootstrap.bind(port).sync().channel();
serverSocketChannel.closeFuture().sync();
} catch (InterruptedException e) {
} finally {
}
NioServerSocketChannel
netty的NioServerSocketChannel
封装了JDK的ServerSocketChannel
,NioServerSocketChannel底层还是会open这个JDK的channel。serverBootstrap.channel(NioServerSocketChannel.class)
指定了bootStrap类中的channelFactory 返回NioServerSocketChannel这个类型的channel。
bind()方法 开始启动
主要的三个步骤
- 初始化channel
- 注册channel
- 执行dobind0回调
// 主干代码
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
// 注册channel到select
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
初始化channel
channelFactory.newChannel()
通过channelFactory会返回NioServerSocketChannel
init()中向流水线添加了一个channel初始化处理器,这个处理器只会被执行一次,那么什么时候被执行?这个放到后面讲
channel.pipeline().addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
注册channel
channel首先会进行一次线程切换。当前运行的是主线程,但是注册和初始化channel操作是由eventGroup中的nio线程来执行。返回的是一个promise对象用来异步拿到结果。 eventLoop.inEventLoop() 是判断当前线程是不是nio线程,显然不是,那么主线程就会封装一个任务提交给nio线程来执行。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
接下来继续看 register0(promise)
。JavaChannel() 拿到 NioServerSocketChannel
创建的JDK ServerSocketChannel。把这个原生channel注册到eventLoop的select上,绑定附件是NioServerSocketChannel,之后可以通过附件拿到这个channel,并且这个附件应该是唯一的。
注册完毕之后通过pipeline.invokeHandlerAddedIfNeeded();
调用init中注册的初始化处理器,向nioChannel中添加了一个handler,在accept事件发生后执行这个handler建立连接。ServerBootstrapAcceptor
继承自chnnelInboundHandlerAdapter
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
执行doBind0回调
前面提到过注册和初始化操作是主线程提交一个任务由nio线程异步执行,提交之后返回一个promise对象。在初始化channel完成、注册channel完成之后,nio线程通过safeSetSuccess();向promise中设置一个成功值,使得该promise的回调被执行。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
回调中执行doBind0()方法,调用链最后就是调用原生channel进行端口绑定
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在执行完doBind方法后,向select注册channel的accept事件,这里看上去是read事件呢,其实做完位运算得到的是16对应accept事件。
然后到accept时间侦听到连接之后,会执行ServerBootstrapAcceptor
这个handler这个处理器。
至此启动流程结束
总结
总结一下启动流程
- 初始化channel
- 把这个channel注册到select上,这个注册关注的事件是0
- 前面两步是异步操作,当执行完之后执行回调
- 在回调方法中 绑定指定的端口
- 注册accept事件
- 当由accept事件之后调用ServerBootstrapAcceptor handler建立连接