boss组如何将连接分发给worker组
我觉得我们可以先看一个简单的客户端连接到boss
组的接收器分发,然后到客户端发送数据,worker
组处理,大致的了解下,这样讲到细节的地方才好理解,当然也只是大致的流程,很多细节画出来就有点乱了,下面会讲一些,先上图吧:
processSelectedKeys
当我们服务器启动后,就等待着事件,如果有连接事件了,就会去执行processSelectedKeys
方法:
这里的selectedKeys
是做了封装的,类型是SelectedSelectionKeySet
,方便操作。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();//优化处理
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
processSelectedKeysOptimized
我抽出了部分重要的,他会取出key,然后取出附件ServerSocketChannelImpl
,一起传入处理:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;//拿出来后设为null一旦通道关闭就可以释放,否则可能内存泄露
final Object a = k.attachment();//从附件中获得通道
...
processSelectedKey(k, (AbstractNioChannel) a);//处理key
,,,
}
}
processSelectedKey
主要的就是找到读或者事件,然后读取数据,这里的unsafe
是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
类型,通道创建的时候创建的:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();//读取
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
read
private final List<Object> readBuf = new ArrayList<Object>();
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//缓冲区分配器
allocHandle.reset(config);//重置参数
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);//读取数据到readBuf
...
allocHandle.incMessagesRead(localRead);//总共的消息数+1
} while (allocHandle.continueReading());//是否还要继续读取
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));//遍历传播读事件,参数是NioSocketChannel
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//传播读完成事件
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
读取连接的数据,封装成NioSocketChannel
,然后进行管道的事件传递:
doReadMessages
接受一个SocketChannel
,封装成NioSocketChannel
然后加入缓冲区里:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());//进行接收,返回SocketChannel
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));//创建NioSocketChannel
return 1;
}
} catch (Throwable t) {
...
}
return 0;
}
SocketUtils.accept
其实这个最终就是调用了serverSocketChannel.accept()
,只是现在已经有连接了,所以不会阻塞,直接返回一个SocketChannel
了。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
pipeline.fireChannelRead(readBuf.get(i))
管道传递读事件,参数就是刚封装的NioSocketChannel
,首先调用了通道上下文的invokeChannelRead
方法,传入了头上下文head
,也就是初始化时候的HeadContext
,也就是从头开始传递:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
可以传入上下文对象和消息体,让上下文对象去传递消息体:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//看msg是不是引用计数接口ReferenceCounted类型,不是就直接返回msg,其实是做资源泄露检测
EventExecutor executor = next.executor();//获取next的执行器,如果为null就是通道的NioEventLoop
if (executor.inEventLoop()) {
next.invokeChannelRead(m);//如果执行器线程就是当前线程,就调用invokeChannelRead,传入NioSocketChannel
} else {//否则给executor提交任务
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
HeadContext的invokeChannelRead(Object msg)
直接的上下文对象传递消息体:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {//是否已经被添加到管道
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);//调用处理器的channelRead方法
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);//直接传递到下一个可以处理消息的通道上下文
}
}
HeadContext的channelRead(ChannelHandlerContext ctx, Object msg)
直接调用父类的方法,传递到下一个去:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);//直接传递到下一个可以处理消息的通道上下文
}
AbstractChannelHandlerContext的fireChannelRead(final Object msg)
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
AbstractChannelHandlerContext的findContextInbound(int mask)
先看一个标记mask
,不同的上下文可能有不标记,可以理解为响应不同的事件:
定义了出站入站的总mask
:
这个会在通道上下文初始化的时候创建:
最终处理是这个方法,默认入站和出站都是全部的标记:
还有个判断函数,是否要略过,根据反射出的方法和类的标注,声明了Skip
标注,才会去除某个mask
事件:
因此这个方法就会去找下一个能不能处理这个事件的入站上下文,找不到就返回自己:
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);//寻找下一个能处理相应事件的
return ctx;
}
这里就会去找HeadContext
的下一个,也就是我们放进去的含有ServerBootstrapAcceptor
的上下文DefaultChannelHandlerContext
。
又是AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
这次的next
是DefaultChannelHandlerContext
。
DefaultChannelHandlerContext的invokeChannelRead(Object msg)
这次我们可以看到处理器就是ServerBootstrapAcceptor
:
ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)
这里就是将NioSocketChannel
注册到worker
组里的代码啦:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;//得到NioSocketChannel
child.pipeline().addLast(childHandler);//在得到NioSocketChannel的管道中添加我们自定义的初始化处理器
setChannelOptions(child, childOptions, logger);//设置选项
setAttributes(child, childAttrs);//设置属性
try {//向workerGroupt注册NioSocketChannel并添加完成监听
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
把boss
组接受连接的事基本讲完了,剩下的worker
组干的事我们后面讲吧,完美待续…
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。