2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104651753

handlerAdded处理器添加事件

上一篇讲了一些基础的知识,本篇就讲下具体是怎么样的,首先就是添加WebSocketServerProtocolHandler之后的handlerAdded事件。因为是WebSocket协议,肯定需要一些处理器,所以这里就会添加一些处理器,比如第一次的 握手处理器UFT8帧验证器来验证文本帧 ,还要 关闭帧处理器 ,用来响应关闭帧

     @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            ChannelPipeline cp = ctx.pipeline();
            if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
                // Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
                cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                        new WebSocketServerProtocolHandshakeHandler(serverConfig));
            }
            if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
                // Add the UFT8 checking before this one.在前面添加帧验证器
                cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                        new Utf8FrameValidator());
            }
            if (serverConfig.sendCloseFrame() != null) {//添加关闭帧处理器
                cp.addBefore(ctx.name(), WebSocketCloseFrameHandler.class.getName(),
                    new WebSocketCloseFrameHandler(serverConfig.sendCloseFrame(), serverConfig.forceCloseTimeoutMillis()));
            }
        }

添加完之后就是这个样子(先不管自定义的处理器):

202309132208261221.png

WebSocketServerProtocolHandshakeHandler的channelRead

之后就是客户端发来HTTP请求websocket握手。HTTP解码出完整消息后就传递到WebSocketServerProtocolHandshakeHandler了,我们来看看他做了什么。

  • 验证协议url
  • 验证GET的请求升级。
  • 替换当前处理器为forbiddenHttpRequestResponder
  • 创建握手WebSocketServerHandshaker 对象,进行握手。
  • 启动一个定义任务进行超时回调。
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            final FullHttpRequest req = (FullHttpRequest) msg;
            if (isNotWebSocketPath(req)) {//不是websocket路径就不管
                ctx.fireChannelRead(msg);
                return;
            }
    
            try {
                if (!GET.equals(req.method())) {//只有GET支持的升级的
                    sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
                    return;
                }
                //创建握手工厂
                final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                        getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
                        serverConfig.subprotocols(), serverConfig.decoderConfig());
                final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
                final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
                if (handshaker == null) {//不支持的版本
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                } else {
    
                    WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
                    ctx.pipeline().replace(this, "WS403Responder",
                            WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把当前处理器替换掉,变成403
    
                    final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
                    handshakeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {//发送不成功
                                localHandshakePromise.tryFailure(future.cause());
                                ctx.fireExceptionCaught(future.cause());
                            } else {//发送成功
                                localHandshakePromise.trySuccess();
                                //  保持兼容性 触发事件
                                ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
                                        WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                                ctx.fireUserEventTriggered(//这个是新的
                                        new WebSocketServerProtocolHandler.HandshakeComplete(
                                                req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                            }
                        }
                    });
                    applyHandshakeTimeout();
                }
            } finally {
                req.release();
            }
        }

isNotWebSocketPath验证URL

这个主要就是验证URL是否是WebSockeURL,主要就是判断创建时候传进去的这个"/wc"

202309132208267382.png
默认是比较整个字符串,不是比较开头。

    private boolean isNotWebSocketPath(FullHttpRequest req) {
            String websocketPath = serverConfig.websocketPath();
            return serverConfig.checkStartsWith() ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
        }

sendHttpResponse发送消息

如果响应的状态码不是200或者请求不是设置长连接,就关闭通道了。

        private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
     	ChannelFutureListener CLOSE = new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                future.channel().close();
            }
        };

WebSocketServerHandshakerFactory的newHandshaker创建握手对象

根据请求头信息的sec-websocket-version来决定要哪个版本的握手对象,一般都是13,如果都不支持就会返回null

202309132208272503.png

    public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
            //从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
            CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
            if (version != null) {
                if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
                    // Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
                    return new WebSocketServerHandshaker13(
                            webSocketURL, subprotocols, decoderConfig);
                } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
                    // Version 8 of the wire protocol - version 10 of the draft hybi specification.
                    return new WebSocketServerHandshaker08(
                            webSocketURL, subprotocols, decoderConfig);
                } else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
                    // Version 8 of the wire protocol - version 07 of the draft hybi specification.
                    return new WebSocketServerHandshaker07(
                            webSocketURL, subprotocols, decoderConfig);
                } else {
                    return null;
                }
            } else {//没指定版本的情况
                // Assume version 00 where version header was not specified
                return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
            }
        }

forbiddenHttpRequestResponder

这个就是用来创建禁止HTTP请求的响应器,只要握手对象创建好了,就不需要响应HTTP了,直接就把当前处理器WebSocketServerProtocolHandler给替换了。

    static ChannelHandler forbiddenHttpRequestResponder() {
            return new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    if (msg instanceof FullHttpRequest) {
                        ((FullHttpRequest) msg).release();
                        FullHttpResponse response =
                                new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0));
                        ctx.channel().writeAndFlush(response);//从通道尾部开始
                    } else {
                        ctx.fireChannelRead(msg);
                    }
                }
            };
        }

替换之后就是这样:

202309132208281174.png

WebSocketServerHandshaker的handshake

握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。

    public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                                HttpHeaders responseHeaders, final ChannelPromise promise) {
    
            if (logger.isDebugEnabled()) {
                logger.debug("{} WebSocket version {} server handshake", channel, version());
            }
            FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
            ChannelPipeline p = channel.pipeline();
            if (p.get(HttpObjectAggregator.class) != null) {
                p.remove(HttpObjectAggregator.class);//删除聚合
            }
            if (p.get(HttpContentCompressor.class) != null) {//删除压缩
                p.remove(HttpContentCompressor.class);
            }
            ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
            final String encoderName;
            if (ctx == null) {//不存在
                // this means the user use an HttpServerCodec
                ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
                if (ctx == null) {//也不存在,就没办法解码http了,失败了
                    promise.setFailure(
                            new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                    return promise;
                }//在之前添加WebSocket编解码
                p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
                p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
                encoderName = ctx.name();
            } else {
                p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder
    
                encoderName = p.context(HttpResponseEncoder.class).name();
                p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
            }//监听发出事件
            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        ChannelPipeline p = future.channel().pipeline();
                        p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
                        promise.setSuccess();
                    } else {
                        promise.setFailure(future.cause());
                    }
                }
            });
            return promise;
        }

发送回调前是这样:

202309132208285415.png
发送回调成功后是这样:

202309132208298216.png

applyHandshakeTimeout

发送可能会等好久,所以就给了个超时的定时任务,默认设置是10秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。

    private void applyHandshakeTimeout() {
            final ChannelPromise localHandshakePromise = handshakePromise;
            final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
            if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
                return;//完成了就不管了
            }
            //起一个定时任务
            final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
                @Override
                public void run() {
                    if (!localHandshakePromise.isDone() &&
                            localHandshakePromise.tryFailure(new WebSocketHandshakeException("handshake timed out"))) {
                        ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
                           .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
                           .close();
                    }
                }
            }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
            //如果成功了,就把超时任务取消
            // Cancel the handshake timeout when handshake is finished.
            localHandshakePromise.addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> f) throws Exception {
                    timeoutFuture.cancel(false);
                }
            });
        }

完成握手后:

202309132208302147.png
至此WebSocketServerProtocolHandshakeHandler做的事就完成了,后面讲升级后的通信。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文