有待处理的任务invokeHandlerAddedIfNeeded
触发通道未注册时候添加的处理器的handlerAdded
事件:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {//只处理一次
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
这方法哪里调用呢,当然是注册完了就调用啦:
callHandlerAddedForAllHandlers
//回调待添加的所有处理器HandlerAdded方法
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {//需要同步,只能执行一次
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.便于回收
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop. 需要在synchronized (this)外,否则在其他线程中的处理器的handlerAdded方法又添加另外一个处理器,会对管道对象进行synchronized加锁,于是就死锁了,卡在这里了
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {//遍历链表执行
task.execute();
task = task.next;
}
}
PendingHandlerCallback待处理任务
其实就是一个链表的结点结构,但是有个抽象的执行方法需要子类实现,因为有添加和删除,执行实现是不一样的:
//待处理任务
private abstract static class PendingHandlerCallback implements Runnable {
final AbstractChannelHandlerContext ctx;
PendingHandlerCallback next;//单链表下一个
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute();
}
PendingHandlerAddedTask待处理添加事件任务
可以看到最后处理还是调用callHandlerAdded0
:
//待添加的处理器任务
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
//给执行器执行的
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {//为了防止多线程问题,只用单线程
callHandlerAdded0(ctx);//直接触发
} else {
try {
executor.execute(this);//让执行器处理
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}
PendingHandlerRemovedTask 待处理删除事件任务
和添加类似,所以就不多说了:
//待删除的任务
private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerRemoved0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerRemoved0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
" removing handler {}.", executor, ctx.name(), e);
}
// remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
ctx.setRemoved();
}
}
}
}
}
入站事件的一些方法
入站事件有很多个,都是类似的原理:
传递入站事件fireChannelRead(Object msg)
可以看到,都是从head
开始传递事件,我拿一个典型的fireChannelRead(Object msg)
分析:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
AbstractChannelHandlerContext的invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
调用传入上下文的invokeChannelRead
方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//看是不是引用计数接口类型,不是就直接返回,是就返回相应的接口类型
EventExecutor executor = next.executor();//获取next的执行器
if (executor.inEventLoop()) {
next.invokeChannelRead(m);//如果执行器线程就是当前线程,就调用管道上下文的处理方法
} else {//否则给executor提交任务
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
AbstractChannelHandlerContext的invokeChannelRead(Object msg)
获取处理器,触发相应事件:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);//调用通道上下文的channelRead方法
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
head的channelRead
其实他什么都没做,就是向后传递:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);//触发fireChannelRead
}
AbstractChannelHandlerContext的fireChannelRead
这个我前面有讲过,就是获取相应入站MASK_CHANNEL_READ
的处理器上下文,然后调用invokeChannelRead
获取相应的处理器来处理读事件:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
//获取处理相应事件的入站处理器上下文
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;//寻找下一个能处理相应事件的
} while ((ctx.executionMask & mask) == 0);//是否处理该mask
return ctx;
}
其实只要在处理器里调用 ctx.fireChannelRead
就可以把事件往后传,很灵活,而且你再调用出站的方法,又会往前传递了,出站事件的原理类似,我就不多讲了。
HeadContext的作用
我们继续上一篇,我们知道,在管道的头尾是不一样的处理器上下文和处理器,他们都集中于一个类啦,我们先来看下头上下文:
基本上把我上一篇介绍的接口和上下文抽象类都涵盖了,所以他自身既是上下文,也是出入站处理器。
构造方法
先调用父类的上下文构造方法,然后获得通道的unsafe
对象,后续的一些操作都是他做的,然后设置添加完成,因为他是在管道构造函数中创建的,所以创建了就是添加完成的:
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();//通道的unsafe
setAddComplete();
}
接下去是他的处理器就是自身,然后添加和删除事件都是空实现:
channelRegistered
通道注册事件里会调用一次invokeHandlerAddedIfNeeded
,检查是否有处理器的待添加任务,然后就往后传递了,其实一般情况下,没做什么事,只是做了向后传递:
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
channelUnregistered
通道卸载事件先会进行传递,如果最后通道关闭了就销毁所有的处理器,其实也就是删除每一个处理器上下文,除了头和尾节点外:
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
channelActive
先往后传递通道激活事件,最后判断是否可以自动读,因为通道注册完了之后就会触发激活,所以就可以开始读取数据了,读取到有连接也是读取事件:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
readIfIsAutoRead
如果配置了自动读,就会开始去设置监听通道事件:
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
默认是自动读的:
调用通道的read
,还是会调用管道的read
最终调用了尾结点tail
的read
:
其实也就是AbstractChannelHandlerContext
的read
:
@Override
public ChannelHandlerContext read() {
//从尾到头找到一个出站的读,开始初始化的时候next就是head,如果你自定了,没处理好的话,可能后面就读不到数据了
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {//如果是同一个线程,就开始读
next.invokeRead();
} else {//否则就添加一个任务
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
最终还是回到head
的read
:
其实这个很重要,如果你自定义的出站处理器的read方法没有处理好,那后面可能就读不到数据了,所以一般还是交给unsafe
去处理读:
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
最后其实就是这段,以前分析过,就是初始化的时候事件循环执行的最后一个任务,其实就是设置相应的监听事件,当然可以监听读写事件:
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
channelReadComplete
刚好一起把这个也说了,等读数据处理完了,会再次去监听:
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
//传递读事件完成后自动读
readIfIsAutoRead();
}
所以这里要注意就是自己自定义的处理器不要随意去覆盖read
方法,因为处理器上下文的read
方法会从尾部开始遍历,找到第一个可以处理read
的处理器,如果是你自定义的,你又没处理好,那就可能再也收不到消息了。
我画了一个head
的read
设置监听的大致情况:
这里要注意,就是无论是接受连接还是读取数据,都是属于读的,只是设置的事件标记不一样。
篇幅有点长了,下一篇继续吧。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。