前言
本想从Bootstrap开始,跟着netty的启动流程一路分析下去,但是netty的系统的确复杂,在阅读了整个流程后,决定从ChannelPipeline开始,ChannelPipeline的特点明显而方法不多,是整个netty的大动脉。
正文
netty示例代码
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new 业务逻辑())
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = server.bind(8080).sync();
f.channel().closeFuture().sync();
注意initChannel这个方法,我们调用了addLast的方法,给ChannelPipeline添加了解码器,一般在最后会添加一个自定义业务逻辑。
按照我们对这个方法的预期设想,ChannelPipeline内部应该是链表组成,我们不断添加节点,当请求到来的时候,会再每一层调用相应的方法直到尾部
ChannelPipeline接口
ChannelPipeline继承关系
ChannelInboundInvoker和ChannelOutboundInvoker分别代表了入站事件和出站事件,非常明显的是作为服务器或者客户端要去交互,必须得有消息进入和发出;而*Iterable<Entry<String, ChannelHandler>>*表示了ChannelPipeline是一个可迭代的类,内部的链表节点是ChannelHandler
ChannelInboundInvoker
public interface ChannelInboundInvoker {
/**
* Channel注册到EventLoop上调用
* 调用这个方法会链式调用ChannelInboundHandler的channelRegistered方法
*/
ChannelInboundInvoker fireChannelRegistered();
/**
* Channel取消注册注册到EventLoop上调用
* 调用这个方法会链式调用ChannelInboundHandler的channelUnregistered方法
*/
ChannelInboundInvoker fireChannelUnregistered();
/**
* Channel处于激活状态,代表连接已经建立完成
* 调用这个方法会链式调用ChannelInboundHandler的channelActive
*/
ChannelInboundInvoker fireChannelActive();
/**
* fireChannelActive的反义
*/
ChannelInboundInvoker fireChannelInactive();
/**
* 异常相关 不重要
*/
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
/**
* Channel接收到了消息
* 调用这个方法会链式调用ChannelInboundHandler的channelRead
*/
ChannelInboundInvoker fireChannelRead(Object msg);
/**
* 阅读完毕,同上
*/
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();
}
根据我写的注释,可以看到ChannelInboundInvoker会去调用ChannelInboundHandler里面的方法,可以猜到ChannelPipeline内部链表有两种类型: Inbound和Outbound ,而Invoker接口方法的含义是一个入口,会链式调用相同类型的节点
ChannelOutboundInvoker
ChannelOutboundInvoker里面的方法我们很熟悉了,其实就传统IO的方法
In/Out总结
看一下netty官方的注释
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
根据官方的注释以及这两个接口的方法可以总结以下几点:
- Inbound和Outbound的调用方向是不同的
- Inbound是一个通知事件,表示某一件事情已经就绪
- Outbound是一个请求事件,对应了传统I/O的请求
通过Inbound方法,可以总结出Channel的几个生命周期
- channelRegistered Channel已经注册到一个EventLoop
- channelActive Channel是活跃的,可以收发消息了
- channelInactive Channel是非活跃的
- channelUnregistered Channel已经创建,但是没有注册
按照给出的顺序,这几个状态是以该顺序循环转换的,而ChannelPipeline收到状态变化请求的时候,会链式调用Inbound类型的节点,可以自定义对该状态感兴趣的代码来处理。
DefaultChannelPipeline
了解到了ChannelPipeline接口方法的含义,看一下实现类DefaultChannelPipeline的细节
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
用AbstractChannelHandlerContext类型的head和tail来表示链表的头节点和尾节点,而head和tail却又不是标准的AbstractChannelHandlerContext实现类,而是内部实现的HeadContext和TailContext
HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
HeadContext既实现了ChannelOutboundHandler又实现了ChannelInboundHandler
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
HeadContext作为一个Outbound,将实现交给unsafe处理,可以把unsafe理解为java底层I/O交互方式;作为一个Inbound,仅仅是把这个请求传递下去。
TailContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) { }
@Override
public void channelActive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelInactive();
}
tail作为尾节点,只继承了ChannelInboundHandler,而且里面的方法都是空方法。
有HeadContext和TailContext内部可以看出来:
Inbound是由head到tail,head用于传递,tail进行收尾不负责任何实现
OutBound是由tail到head,head交给unsafe处理
DefaultChannelPipeline方法
看一下最常用的addLast
public final ChannelPipeline addLast(ChannelHandler handler) {
return addLast(null, handler);
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
可以看出来addLast可以指定group和name
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否有重复的
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
流程如下:
1、检查重复性
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
同一个ChannelHandler用add来标记是否添加,一般不支持重复添加,可以添加@Sharable注解来支持
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
2、根据handler创建一个AbstractChannelHandlerContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
3、调用addLast0将节点添加到链表中
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
4、触发添加成功的回调callHandlerAdded0,如果这个管道还没有注册,就延迟回调;如果不是当前的eventLoop,开辟一个新线程回调
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
final void callHandlerAdded() throws Exception {
//在调用handlerAdded之前,必须先调用setAddComplete。否则,如果handlerAdded方法生成
// 任何管道事件ctx.handler()都会错过它们,因为状态不允许这样做。
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
调用自定义的handlerAdded方法,如果其中抛出了异常,会将这个节点给删除了,并且触发删除回调
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
删除节点
看一下fire开头的方法,前面提到过,例如fireChannelActive是用于触发Channel生命周期事件,而生命周期的开始是HeadContext传递。看一下fireChannelActive实现细节可以知道其他几个fire的细节了
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
调用invokeChannelActive,传入的参数是head,表示从head开始
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
调用head的invokeChannelActive方法,这个方法是AbstractChannelHandlerContext的公有方法
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelActive();
}
}
调用head的channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
传入的ctx是head自身,所以又回到了AbstractChannelHandlerContext的 fireChannelActive
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}
//找到Inbound
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
注意findContextInbound,这个方法就是找到下一个Inbound节点,不断用next遍历整个链表节点来找到属于Inbound的,然后又回到invokeChannelActive来调用channelActive
Inbound是不断next找到下一个,而Outbound就是从tail出发,不断的prev遍历直到head,实现方法大同小异
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
注意到新建AbstractChannelHandlerContext的代码中,ChannelPipeline将自己放入的了context中,这表示我们可以自定义ChannelHandler,并且调用ChannelPipeline去动态删除Handler
比如需要一次性handler,可以在自定义方法里面使用后后用remove掉自己
ChannelInitializer细节
在使用ServerBootstrap的时候,添加自定义handler的时候是new了一个ChannelInitializer类,并重写了initChannel方法,但实现很简单
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
removeState(ctx);
} else {
ctx.fireChannelRegistered();
}
}
ChannelInitializer是一个Inbound添加到head后面,head将注册事件传到下一个Inbound的时候会调用initChannel方法
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) {
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
initChannel会调用我们重写的initChannel(C ch)方法,initMap是为了防止重复调用,当initChannel调用完成后,finally处会将自身remove掉,这也是上面提到了动态删除。
总结
DefaultChannelPipeline内部由于AbstractChannelHandlerContext的链表组成,而链表又分为ChannelOutboundHandler和ChannelInboundHandler,根据不同的事件找到不同的bound链式调用方法,这也是ChannelPipeline的作用。