2023-09-13  阅读(34)
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104613277

聚合消息

前面我们讲了, 一个HTTP请求最少也会在HttpRequestDecoder里分成两次往后传递,第一次是消息行和消息头,第二次是消息体,哪怕没有消息体,也会传一个空消息体。如果发送的消息体比较大的话,可能还会分成好几个消息体来处理,往后传递多次,这样使得我们后续的处理器可能要写多个逻辑判断,比较麻烦,那能不能把消息都整合成一个完整的,再往后传递呢,当然可以,用HttpObjectAggregator

先介绍一下一些属性

HTTP有个头属性Except:100-continue用来优化服务器和客户端数据传输的,在要发送比较大的数据的时候,不会直接发送,而是会先征求下服务器意见是否可以继续发送数据,服务器可以允许也可以不允许,都应该响应一下。具体介绍可以参考这篇文章

    	//接受100-continue,响应状态码100
        private static final FullHttpResponse CONTINUE =//Except:100-continue的响应
                new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);
                
    	//不接受,响应状态码417 不支持
        private static final FullHttpResponse EXPECTATION_FAILED = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.EXPECTATION_FAILED, Unpooled.EMPTY_BUFFER);
    
    	//不接受,响应状态码413 消息体太大而关闭连接
        private static final FullHttpResponse TOO_LARGE_CLOSE = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
    
    	//不接受,响应状态码413 消息体太大,没关闭连接
        private static final FullHttpResponse TOO_LARGE = new DefaultFullHttpResponse(
            HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER);
    
        static {//设定头消息
            EXPECTATION_FAILED.headers().set(CONTENT_LENGTH, 0);
            TOO_LARGE.headers().set(CONTENT_LENGTH, 0);
    
            TOO_LARGE_CLOSE.headers().set(CONTENT_LENGTH, 0);
            TOO_LARGE_CLOSE.headers().set(CONNECTION, HttpHeaderValues.CLOSE);//关闭头信息
        }
    
        private final boolean closeOnExpectationFailed;//如果消息过大是否关闭连接,报异常

结构

看到他有4个泛型,分别对应是聚合HTTP类型的,HTTP通用消息请求行和请求头的,HTTP消息体,HTTP完整通用消息,包括消息体

202309132208104341.png
对应的父类的泛型就是:

202309132208111122.png
这些类型直接会影响到后续的逻辑判断,所以要弄清楚对应的关系。

MessageAggregator

主要的逻辑代码在这里,这个是通用的模板,里面就是模板方法啦,先看下他的一些属性吧,他会把HTTP的消息体都封装成一个缓冲区,加到复合缓冲区里。

      private static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;//最大复合缓冲区组件个数
    
        private final int maxContentLength;//最大消息图长度
        private O currentMessage;//当前消息
        private boolean handlingOversizedMessage;//是否处理过大消息
    
        private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;//累加组件的最大个数
        private ChannelHandlerContext ctx;//处理器上下文
        private ChannelFutureListener continueResponseWriteListener;// 100-continue响应监听器
    
        private boolean aggregating;//是否正在聚合

acceptInboundMessage判断类型

判断是否是泛型I类型,也就是我们HttpObjectAggregator泛型中的HttpObject类型,是才会处理,否则就不处理。然后会判断是否聚合好了,如果没开始聚合就进行聚合,如果还在聚合就继续。

    @Override
        public boolean acceptInboundMessage(Object msg) throws Exception {
    
            if (!super.acceptInboundMessage(msg)) {//是否是泛型I类型,比如HttpObject类型
                return false;
            }
            @SuppressWarnings("unchecked")
            I in = (I) msg;
    
            if (isAggregated(in)) {//是否聚合好了
                return false;
            }
            if (isStartMessage(in)) {//是否是开始聚合
                aggregating = true;//开始聚合
                return true;
            } else if (aggregating && isContentMessage(in)) {//正在内容聚合
                return true;
            }
    
            return false;
        }

decode真正的聚合

这个方法比较长,但是做的事情分那么几个:

  • 如果是开始消息,也就不是请求体,那就开始判断是否有Except:100-continue头信息,有的话根据长度和是否支持来判断是否要返回响应。之后判断如果前面解码失败,就直接整合消息体返回,否则就创建复合缓冲区,如果是消息体的话就添加进去,然后封装成一个完整的消息类型。
  • 如果是消息体了,就加入到复合画冲去里,然后判断是否是最后一个消息体,是的话就进行最后的整合,其实就是设置Content-Length头信息。
        @Override
        protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
            assert aggregating;
    
            if (isStartMessage(msg)) {//是否是开始消息
                handlingOversizedMessage = false;//没处理超大信息
                if (currentMessage != null) {//上次的消息没释放
                    currentMessage.release();
                    currentMessage = null;
                    throw new MessageAggregationException();
                }
    
                @SuppressWarnings("unchecked")
                S m = (S) msg;
    
                // 100-continue需要持续响应
               
                Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
                if (continueResponse != null) {//有 100-continue响应
                    // Cache the write listener for reuse.
                    ChannelFutureListener listener = continueResponseWriteListener;
                    if (listener == null) {//不存在监听器要创建一个
                        continueResponseWriteListener = listener = new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (!future.isSuccess()) {
                                    ctx.fireExceptionCaught(future.cause());
                                }
                            }
                        };
                    }
    
    
                    boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
                    handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);
                    //这里会直接刷出去,所以HttpResponseEncoder需要放在这个前面,不然写出去没编码过会报错的
                    final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);
    
                    if (closeAfterWrite) {
                        future.addListener(ChannelFutureListener.CLOSE);
                        return;
                    }
                    if (handlingOversizedMessage) {
                        return;
                    }
                } else if (isContentLengthInvalid(m, maxContentLength)) {//消息体长度是否超过了
    
                    invokeHandleOversizedMessage(ctx, m);
                    return;
                }
                //解码不成功
                if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
                    O aggregated;
                    if (m instanceof ByteBufHolder) {
                        aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
                    } else {
                        aggregated = beginAggregation(m, EMPTY_BUFFER);
                    }
                    finishAggregation0(aggregated);
                    out.add(aggregated);
                    return;
                }
    
                CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);//创建复合缓冲区
                if (m instanceof ByteBufHolder) {//是内容
                    appendPartialContent(content, ((ByteBufHolder) m).content());
                }
                currentMessage = beginAggregation(m, content);//开始聚合
            } else if (isContentMessage(msg)) {//后面属于消息体聚合
                if (currentMessage == null) {//长度超过最大了,直接丢弃了,不处理了
    
                    return;
                }
                //提取内容
                CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();
    
                @SuppressWarnings("unchecked")
                final C m = (C) msg;
                // 超过最大长度了,处理过大的消息
                if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
                   
                    @SuppressWarnings("unchecked")
                    S s = (S) currentMessage;
                    invokeHandleOversizedMessage(ctx, s);
                    return;
                }
    
                //添加新的内容到复合缓冲区
                appendPartialContent(content, m.content());
    
    
                aggregate(currentMessage, m);//整合尾部请求头
    
                final boolean last;//是不是最后一次聚合
                if (m instanceof DecoderResultProvider) {//处理解码结果
                    DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
                    if (!decoderResult.isSuccess()) {//没解码成功
                        if (currentMessage instanceof DecoderResultProvider) {
                            ((DecoderResultProvider) currentMessage).setDecoderResult(
                                    DecoderResult.failure(decoderResult.cause()));
                        }
                        last = true;
                    } else {
                        last = isLastContentMessage(m);//是否是最后的内容
                    }
                } else {
                    last = isLastContentMessage(m);
                }
    
                if (last) {//是最后的
                    finishAggregation0(currentMessage);
    
                    // All done
                    out.add(currentMessage);
                    currentMessage = null;
                }
            } else {
                throw new MessageAggregationException();
            }
        }

HttpObjectAggregator的isStartMessage

HTTP来说其实就是判断是否是通用的消息行和消息头信息。

     @Override
        protected boolean isStartMessage(HttpObject msg) throws Exception {
            return msg instanceof HttpMessage;
        }

HttpObjectAggregator的isLastContentMessage

是否是最后的内容。

        @Override
        protected boolean isLastContentMessage(HttpContent msg) throws Exception {
            return msg instanceof LastHttpContent;
        }

HttpObjectAggregator的isAggregated

是否聚合好了。

      @Override
        protected boolean isAggregated(HttpObject msg) throws Exception {
            return msg instanceof FullHttpMessage;
        }

HttpObjectAggregator的newContinueResponse

如果需要100-continue响应的话,要把100-continue头设置去掉,不往后传播了。

     @Override
        protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
            Object response = continueResponse(start, maxContentLength, pipeline);
           
            if (response != null) {
                start.headers().remove(EXPECT);//如果有100-continue响应,就不用再传播下去了
            }
            return response;
        }

HttpObjectAggregator的continueResponse

这个就是上面说的根据是否支持100-continue,是否长度超过限制等进行响应。

    private static Object continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
            if (HttpUtil.isUnsupportedExpectation(start)) {//不支持Expect头
               
                pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
                return EXPECTATION_FAILED.retainedDuplicate();
            } else if (HttpUtil.is100ContinueExpected(start)) {//支持100-continue请求
               
                if (getContentLength(start, -1L) <= maxContentLength) {
                    return CONTINUE.retainedDuplicate();//继续
                }
                pipeline.fireUserEventTriggered(HttpExpectationFailedEvent.INSTANCE);
                return TOO_LARGE.retainedDuplicate();//消息体太大
            }
    
            return null;
        }

HttpObjectAggregator的closeAfterContinueResponse

是否不支持100-continue后把连接断开。

     @Override
        protected boolean closeAfterContinueResponse(Object msg) {
            return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);
        }

HttpObjectAggregator的ignoreContentAfterContinueResponse

如果直接给他报400的话就要断开了,后面的内容就不忽略了。

      @Override
        protected boolean ignoreContentAfterContinueResponse(Object msg) {
            if (msg instanceof HttpResponse) {
                final HttpResponse httpResponse = (HttpResponse) msg;
                return httpResponse.status().codeClass().equals(HttpStatusClass.CLIENT_ERROR);
            }
            return false;
        }

HttpObjectAggregator的beginAggregation

开始聚合就是创建一个聚合的类,根据不同情况创建请求还是响应的完整类型。

     @Override
        protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
            assert !(start instanceof FullHttpMessage);
    
            HttpUtil.setTransferEncodingChunked(start, false);
    
            AggregatedFullHttpMessage ret;
            if (start instanceof HttpRequest) {
                ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);//聚合请求
            } else if (start instanceof HttpResponse) {
                ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);//聚合响应
            } else {
                throw new Error();
            }
            return ret;
        }

appendPartialContent

这个就是将内容添加到复合缓冲区里。

     private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
            if (partialContent.isReadable()) {//可读的话就加进去
                content.addComponent(true, partialContent.retain());
            }
        }

HttpObjectAggregator的aggregate

这个就是整合尾部的头信息,因为chunk协议可能会有尾部头信息的。

     @Override
        protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
            if (content instanceof LastHttpContent) {//如果是最后的尾部内容就整合尾部头信息
                // Merge trailing headers into the message.
                ((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
            }
        }

finishAggregation0

完成聚合,标志位也设置为false了,最后再坚持一遍头信息。

        private void finishAggregation0(O aggregated) throws Exception {
            aggregating = false;
            finishAggregation(aggregated);
        }

finishAggregation

最后检查下,如果没设置Content-Length头的话要设置。

    @Override
        protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
            if (!HttpUtil.isContentLengthSet(aggregated)) {//没设置Content-Length头的话要设置
                aggregated.headers().set(
                        CONTENT_LENGTH,
                        String.valueOf(aggregated.content().readableBytes()));
            }
        }

基本上所有的方法都讲了,其实说白了,就是把先到的包保存下来,等最后接收完了一起传递给后面的。其他的一些异常什么的就不说了,自己看看就好了。最后要注意用的时候,这个放到HttpResponseEncoder后面,否则他出站的错误消息不经过HttpResponseEncoder响应解码器,底层传输是不支持的:

202309132208116913.png

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文