handlerAdded处理器添加事件
上一篇讲了一些基础的知识,本篇就讲下具体是怎么样的,首先就是添加WebSocketServerProtocolHandler
之后的handlerAdded
事件。因为是WebSocket
协议,肯定需要一些处理器,所以这里就会添加一些处理器,比如第一次的 握手处理器 , UFT8
帧验证器来验证文本帧 ,还要 关闭帧处理器 ,用来响应关闭帧
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.在前面添加帧验证器
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator());
}
if (serverConfig.sendCloseFrame() != null) {//添加关闭帧处理器
cp.addBefore(ctx.name(), WebSocketCloseFrameHandler.class.getName(),
new WebSocketCloseFrameHandler(serverConfig.sendCloseFrame(), serverConfig.forceCloseTimeoutMillis()));
}
}
添加完之后就是这个样子(先不管自定义的处理器):
WebSocketServerProtocolHandshakeHandler的channelRead
之后就是客户端发来HTTP
请求websocket
握手。HTTP
解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler
了,我们来看看他做了什么。
- 验证协议
url
。 - 验证
GET
的请求升级。 - 替换当前处理器为
forbiddenHttpRequestResponder
。 - 创建握手
WebSocketServerHandshaker
对象,进行握手。 - 启动一个定义任务进行超时回调。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
final FullHttpRequest req = (FullHttpRequest) msg;
if (isNotWebSocketPath(req)) {//不是websocket路径就不管
ctx.fireChannelRead(msg);
return;
}
try {
if (!GET.equals(req.method())) {//只有GET支持的升级的
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
return;
}
//创建握手工厂
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
serverConfig.subprotocols(), serverConfig.decoderConfig());
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
if (handshaker == null) {//不支持的版本
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
ctx.pipeline().replace(this, "WS403Responder",
WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把当前处理器替换掉,变成403
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {//发送不成功
localHandshakePromise.tryFailure(future.cause());
ctx.fireExceptionCaught(future.cause());
} else {//发送成功
localHandshakePromise.trySuccess();
// 保持兼容性 触发事件
ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.fireUserEventTriggered(//这个是新的
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
}
}
});
applyHandshakeTimeout();
}
} finally {
req.release();
}
}
isNotWebSocketPath验证URL
这个主要就是验证URL
是否是WebSocke
的URL
,主要就是判断创建时候传进去的这个"/wc"
:
默认是比较整个字符串,不是比较开头。
private boolean isNotWebSocketPath(FullHttpRequest req) {
String websocketPath = serverConfig.websocketPath();
return serverConfig.checkStartsWith() ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
}
sendHttpResponse发送消息
如果响应的状态码不是200
或者请求不是设置长连接,就关闭通道了。
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
f.addListener(ChannelFutureListener.CLOSE);
}
}
ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
WebSocketServerHandshakerFactory的newHandshaker创建握手对象
根据请求头信息的sec-websocket-version
来决定要哪个版本的握手对象,一般都是13
,如果都不支持就会返回null
。
public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
//从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
return new WebSocketServerHandshaker13(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 10 of the draft hybi specification.
return new WebSocketServerHandshaker08(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 07 of the draft hybi specification.
return new WebSocketServerHandshaker07(
webSocketURL, subprotocols, decoderConfig);
} else {
return null;
}
} else {//没指定版本的情况
// Assume version 00 where version header was not specified
return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
}
}
forbiddenHttpRequestResponder
这个就是用来创建禁止HTTP
请求的响应器,只要握手对象创建好了,就不需要响应HTTP
了,直接就把当前处理器WebSocketServerProtocolHandler
给替换了。
static ChannelHandler forbiddenHttpRequestResponder() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
((FullHttpRequest) msg).release();
FullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0));
ctx.channel().writeAndFlush(response);//从通道尾部开始
} else {
ctx.fireChannelRead(msg);
}
}
};
}
替换之后就是这样:
WebSocketServerHandshaker的handshake
握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse
响应,然后把跟HTTP
相关的聚合,压缩处理器删除,如果有HttpServerCodec
,那就在前面添加websocket
的编解码器,等发送响应成功了把HttpServerCodec
删了。如果是HTTP
编解码器,就把解码器先替换成websocket
的解码器,等发送响应成功了,再把编码器替换成websocket
的编码器。
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, version());
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);//删除聚合
}
if (p.get(HttpContentCompressor.class) != null) {//删除压缩
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
final String encoderName;
if (ctx == null) {//不存在
// this means the user use an HttpServerCodec
ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
if (ctx == null) {//也不存在,就没办法解码http了,失败了
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}//在之前添加WebSocket编解码
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
}//监听发出事件
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
发送回调前是这样:
发送回调成功后是这样:
applyHandshakeTimeout
发送可能会等好久,所以就给了个超时的定时任务,默认设置是10
秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。
private void applyHandshakeTimeout() {
final ChannelPromise localHandshakePromise = handshakePromise;
final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
return;//完成了就不管了
}
//起一个定时任务
final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (!localHandshakePromise.isDone() &&
localHandshakePromise.tryFailure(new WebSocketHandshakeException("handshake timed out"))) {
ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
.fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
.close();
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
//如果成功了,就把超时任务取消
// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> f) throws Exception {
timeoutFuture.cancel(false);
}
});
}
完成握手后:
至此WebSocketServerProtocolHandshakeHandler
做的事就完成了,后面讲升级后的通信。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。