2023-09-13  阅读(16)
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104652885

WebSocket13FrameDecoder解码器

我们拿这个解码器版本来讲,其他的原理都差不多的,这个就是我们在握手的时候创建的解码器,我们新来看看结构:

202309132208334971.png
其实主要的操作还是在WebSocket08FrameDecoder里面,你会发现WebSocket13FrameDecoder没什么东西。所以我们主要还是讲WebSocket08FrameDecoder吧。

WebSocket08FrameDecoder解码器

属性

还是老规矩,先了解下一些属性,这个跟HTTP的请求解码器很类型,也是根据状态来执行的,因为毕竟协议还是有个解码的过程的。

    	//读取状态
        enum State {
            READING_FIRST,//第一次读一个字节 FIN, RSV, OPCODE
            READING_SECOND,//解析出MASK, PAYLOAD LEN描述
            READING_SIZE,//解析具体长度PAYLOAD LEN
            MASKING_KEY,//解析掩码
            PAYLOAD,//解析数据
            CORRUPT//帧损坏了
        }
    
    
        private static final byte OPCODE_CONT = 0x0;//连续的frame
        private static final byte OPCODE_TEXT = 0x1;//文本frame
        private static final byte OPCODE_BINARY = 0x2;//二进制frame
        private static final byte OPCODE_CLOSE = 0x8;//关闭帧
        private static final byte OPCODE_PING = 0x9;//ping
        private static final byte OPCODE_PONG = 0xA;//pong

上面的属性可能不太清楚干嘛的,可以看下这篇文章,我觉得写得还不错的,另外我画个简单的图解释下,首先基本的部分是这样的:

202309132208342732.png
然后后面的根据数据长度有不同的情况,第一种就是数据长度在0-125之间,掩码的情况,没掩码的情况就是掩码的4个字节没有了。:

202309132208356663.png
第二种长度数据是126,这个不是表示数据长度就是126,只是说7位不够描述长度,长度要用2个字节描述,后面会直接跟着2个字节的长度数据:

202309132208363724.png
第二种长度数据是127,这个不是表示数据长度就是127,只是说2字节不够描述长度,长度要用8个字节描述,后面会直接跟着8个字节的长度数据:

202309132208379605.png

decode解码

主要还是这个方法,他就是从头开始解析这些数据,所以他分了好几个状态:

  1. READING_FIRST:解析第一个字节,是不是最后一帧,扩展位怎么样,是什么帧类型。
  2. READING_SECOND:解析第二个字节,是否有掩码,数据长度是多少。
  3. READING_SIZE:处理长度,如果是0-125,那好办,如果是126,就要读取后面2个字节的数据,如果是127,就要读取后面8个字节的数据。
  4. MASKING_KEY:如果有掩码就解析出4字节掩码。
  5. PAYLOAD:解析出最后的数据。
  6. CORRUPT:帧数据可能损坏了,可能要关闭连接。

READING_FIRST

用了位操作去解析第一个字节,这里的Opcode实际上就是帧类型,比如0表示持续的帧,1表示文本帧,2表示二进制帧等等。

        case READING_FIRST:
                if (!in.isReadable()) {
                    return;
                }
    
                framePayloadLength = 0;
    
                // FIN, RSV, OPCODE
                byte b = in.readByte();
                frameFinalFlag = (b & 0x80) != 0;//取出FIN,表示是不是一帧的最后一段
                frameRsv = (b & 0x70) >> 4;//取出RSV
                frameOpcode = b & 0x0F;//取出Opcode
    
                if (logger.isTraceEnabled()) {
                    logger.trace("Decoding WebSocket Frame opCode={}", frameOpcode);
                }
    
                state = State.READING_SECOND;

READING_SECOND

然后读取掩码位,读取长度,进行一些合法性的检查,如果违反协议了,就直接发送关闭帧。

     case READING_SECOND:
                if (!in.isReadable()) {
                    return;
                }
                // MASK, PAYLOAD LEN 1
                b = in.readByte();//再读一个字节
                frameMasked = (b & 0x80) != 0;//读取掩码,1表示存在,4字节,0不存在
                framePayloadLen1 = b & 0x7F;//获取内容长度
    
                if (frameRsv != 0 && !config.allowExtensions()) {//有扩展标志位,但是不允许扩展
                    protocolViolation(ctx, in, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
                    return;
                }
    
                if (!config.allowMaskMismatch() && config.expectMaskedFrames() != frameMasked) {//需要掩码加密,但是发来的没进行掩码加密
                    protocolViolation(ctx, in, "received a frame that is not masked as expected");
                    return;
                }
                //控制操作,关闭,ping,pong
                if (frameOpcode > 7) { // control frame (have MSB in opcode set)
    
                    // control frames MUST NOT be fragmented
                    if (!frameFinalFlag) {//控制帧不用分段了
                        protocolViolation(ctx, in, "fragmented control frame");
                        return;
                    }
    
                    // control frames MUST have payload 125 octets or less
                    if (framePayloadLen1 > 125) {//长度超过125
                        protocolViolation(ctx, in, "control frame with payload length > 125 octets");
                        return;
                    }
                    //不为控制帧
                    // check for reserved control frame opcodes
                    if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
                          || frameOpcode == OPCODE_PONG)) {
                        protocolViolation(ctx, in, "control frame using reserved opcode " + frameOpcode);
                        return;
                    }
                    //关闭帧有内容的话,必须是2个字节的无符号整形表示状态码
                    // close frame : if there is a body, the first two bytes of the
                    // body MUST be a 2-byte unsigned integer (in network byte
                    // order) representing a getStatus code
                    if (frameOpcode == 8 && framePayloadLen1 == 1) {
                        protocolViolation(ctx, in, "received close control frame with payload len 1");
                        return;
                    }
                } else { // data frame 数据帧,不是持续,文本,二进制帧的话也违反协议了
                    // check for reserved data frame opcodes
                    if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
                          || frameOpcode == OPCODE_BINARY)) {
                        protocolViolation(ctx, in, "data frame using reserved opcode " + frameOpcode);
                        return;
                    }
    
                    // check opcode vs message fragmentation state 1/2
                    if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {//是持续帧,帧个数为0
                        protocolViolation(ctx, in, "received continuation data frame outside fragmented message");
                        return;
                    }
                    //帧的端数不为0,但是不是持续帧,也不是ping
                    // check opcode vs message fragmentation state 2/2
                    if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
                        protocolViolation(ctx, in,
                                          "received non-continuation data frame while inside fragmented message");
                        return;
                    }
                }
    
                state = State.READING_SIZE;

protocolViolation违反协议

如果发现有违反协议的,直接把数据丢弃,如果通道没关闭,且设置了违反协议就关闭通道的话就发送关闭帧,抛出异常。

    private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, CorruptedWebSocketFrameException ex) {
            state = State.CORRUPT;//帧损坏的状态
            int readableBytes = in.readableBytes();
            if (readableBytes > 0) {
                in.skipBytes(readableBytes);//略过,能帮助释放内存
            }
            if (ctx.channel().isActive() && config.closeOnProtocolViolation()) {//帧坏了就关闭通道
                Object closeMessage;
                if (receivedClosingHandshake) {
                    closeMessage = Unpooled.EMPTY_BUFFER;//空帧
                } else {
                    WebSocketCloseStatus closeStatus = ex.closeStatus();
                    String reasonText = ex.getMessage();
                    if (reasonText == null) {
                        reasonText = closeStatus.reasonText();
                    }
                    closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);//封装成关闭帧
                }
                ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE);//发出去,成功后关闭通道
            }
            throw ex;//抛出异常
        }

READING_SIZE

这个就是我前面说的处理长度的几种情况。

    case READING_SIZE://处理长度
                //如果是126的话,紧跟着后面需要有两个字节的长度
                // Read frame payload length
                if (framePayloadLen1 == 126) {
                    if (in.readableBytes() < 2) {
                        return;
                    }
                    framePayloadLength = in.readUnsignedShort();//读取2次节长度
                    if (framePayloadLength < 126) {//长度无效
                        protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                        return;
                    }
                } else if (framePayloadLen1 == 127) {//如果是127,后面需要8个字节
                    if (in.readableBytes() < 8) {
                        return;
                    }
                    framePayloadLength = in.readLong();
                    // TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
                    // just check if it's negative?
    
                    if (framePayloadLength < 65536) {//小于等于2字节的
                        protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
                        return;
                    }
                } else {
                    framePayloadLength = framePayloadLen1;//0-125的情况
                }
                //大于最大长度默认65536
                if (framePayloadLength > config.maxFramePayloadLength()) {
                    protocolViolation(ctx, in, WebSocketCloseStatus.MESSAGE_TOO_BIG,
                        "Max frame length of " + config.maxFramePayloadLength() + " has been exceeded.");
                    return;
                }
    
                if (logger.isTraceEnabled()) {
                    logger.trace("Decoding WebSocket Frame length={}", framePayloadLength);
                }
    
                state = State.MASKING_KEY;

MASKING_KEY

解析出掩码,其实这个掩码加密解密只是用了异或^

    case MASKING_KEY://解析掩码
                if (frameMasked) {//有掩码 4字节的
                    if (in.readableBytes() < 4) {
                        return;
                    }
                    if (maskingKey == null) {
                        maskingKey = new byte[4];
                    }
                    in.readBytes(maskingKey);
                }
                state = State.PAYLOAD;

PAYLOAD

有掩码先解码,然后根据不同的Opcode类型封装成对应的帧数据。

       case PAYLOAD://解析数据
                if (in.readableBytes() < framePayloadLength) {
                    return;
                }
    
                ByteBuf payloadBuffer = null;
                try {
                    payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));
    
                    // Now we have all the data, the next checkpoint must be the next
                    // frame
                    state = State.READING_FIRST;//回到初始要解析的状态
    
                    // Unmask data if needed
                    if (frameMasked) {//如果有掩码,要解码
                        unmask(payloadBuffer);
                    }
    
                    // Processing ping/pong/close frames because they cannot be
                    // fragmented
                    if (frameOpcode == OPCODE_PING) {//如果是ping
                        out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
                    if (frameOpcode == OPCODE_PONG) {//如果是pong
                        out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
                    if (frameOpcode == OPCODE_CLOSE) {//收到关闭帧,也要回一个关闭帧
                        receivedClosingHandshake = true;
                        checkCloseFrameBody(ctx, payloadBuffer);
                        out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    }
    
                    // Processing for possible fragmented messages for text and binary
                    // frames
                    if (frameFinalFlag) {//是最后一帧
                        // Final frame of the sequence. Apparently ping frames are
                        // allowed in the middle of a fragmented message
                        if (frameOpcode != OPCODE_PING) {//允许中间发心跳帧,心跳帧不算,不是心跳帧才要清零
                            fragmentedFramesCount = 0;
                        }
                    } else {
                        // Increment counter
                        fragmentedFramesCount++;//帧个数+1,为持续帧
                    }
    
                    // Return the frame
                    if (frameOpcode == OPCODE_TEXT) {//文本类型
                        out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else if (frameOpcode == OPCODE_BINARY) {//二进制
                        out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else if (frameOpcode == OPCODE_CONT) {//持续帧
                        out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
                                                               payloadBuffer));
                        payloadBuffer = null;
                        return;
                    } else {
                        throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
                                                                + frameOpcode);
                    }
                } finally {
                    if (payloadBuffer != null) {//没有解析出来要释放
                        payloadBuffer.release();
                    }
                }

unmask解码

其实就是取出4字节掩码,封装成一个整数,然后跟数据进行每次8位的轮询的异或运算解码。

    private void unmask(ByteBuf frame) {
            int i = frame.readerIndex();
            int end = frame.writerIndex();
    
            ByteOrder order = frame.order();
    
            // Remark: & 0xFF is necessary because Java will do signed expansion from
            // byte to int which we don't want.
            int intMask = ((maskingKey[0] & 0xFF) << 24)
                        | ((maskingKey[1] & 0xFF) << 16)
                        | ((maskingKey[2] & 0xFF) << 8)
                        | (maskingKey[3] & 0xFF);
    
    
            if (order == ByteOrder.LITTLE_ENDIAN) {
                intMask = Integer.reverseBytes(intMask);
            }
    
            for (; i + 3 < end; i += 4) {
                int unmasked = frame.getInt(i) ^ intMask;
                frame.setInt(i, unmasked);
            }
            for (; i < end; i++) {
                frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]);
            }
        }

CORRUPT

一般是有违反协议了,就丢弃了,但是就怕其他问题要读一帧,不然父类处理会出问题。

     case CORRUPT://帧坏了
                if (in.isReadable()) {
                    // If we don't keep reading Netty will throw an exception saying
                    // we can't return null if no bytes read and state not changed.
                    in.readByte();//要读一下,否则父类会报错
                }
                return;

问题在于父类ByteToMessageDecodercallDecode中有:

202309132208386306.png
如果没读的话就会报异常啦。不过搜寻能转为CORRUPT的只有违反协议的方法,那个方法里面还是有略过读数据的,不会导致父类报错,我估计是如果自己手动操作状态为CORRUPT,可能是要读一下的。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文