WebSocket13FrameDecoder解码器
我们拿这个解码器版本来讲,其他的原理都差不多的,这个就是我们在握手的时候创建的解码器,我们新来看看结构:
其实主要的操作还是在WebSocket08FrameDecoder
里面,你会发现WebSocket13FrameDecoder
没什么东西。所以我们主要还是讲WebSocket08FrameDecoder
吧。
WebSocket08FrameDecoder解码器
属性
还是老规矩,先了解下一些属性,这个跟HTTP
的请求解码器很类型,也是根据状态来执行的,因为毕竟协议还是有个解码的过程的。
//读取状态
enum State {
READING_FIRST,//第一次读一个字节 FIN, RSV, OPCODE
READING_SECOND,//解析出MASK, PAYLOAD LEN描述
READING_SIZE,//解析具体长度PAYLOAD LEN
MASKING_KEY,//解析掩码
PAYLOAD,//解析数据
CORRUPT//帧损坏了
}
private static final byte OPCODE_CONT = 0x0;//连续的frame
private static final byte OPCODE_TEXT = 0x1;//文本frame
private static final byte OPCODE_BINARY = 0x2;//二进制frame
private static final byte OPCODE_CLOSE = 0x8;//关闭帧
private static final byte OPCODE_PING = 0x9;//ping
private static final byte OPCODE_PONG = 0xA;//pong
上面的属性可能不太清楚干嘛的,可以看下这篇文章,我觉得写得还不错的,另外我画个简单的图解释下,首先基本的部分是这样的:
然后后面的根据数据长度有不同的情况,第一种就是数据长度在0-125
之间,掩码的情况,没掩码的情况就是掩码的4
个字节没有了。:
第二种长度数据是126
,这个不是表示数据长度就是126
,只是说7
位不够描述长度,长度要用2
个字节描述,后面会直接跟着2
个字节的长度数据:
第二种长度数据是127
,这个不是表示数据长度就是127
,只是说2
字节不够描述长度,长度要用8
个字节描述,后面会直接跟着8
个字节的长度数据:
decode解码
主要还是这个方法,他就是从头开始解析这些数据,所以他分了好几个状态:
READING_FIRST
:解析第一个字节,是不是最后一帧,扩展位怎么样,是什么帧类型。READING_SECOND
:解析第二个字节,是否有掩码,数据长度是多少。READING_SIZE
:处理长度,如果是0-125
,那好办,如果是126
,就要读取后面2
个字节的数据,如果是127
,就要读取后面8
个字节的数据。MASKING_KEY
:如果有掩码就解析出4
字节掩码。PAYLOAD
:解析出最后的数据。CORRUPT
:帧数据可能损坏了,可能要关闭连接。
READING_FIRST
用了位操作去解析第一个字节,这里的Opcode实际上就是帧类型,比如0
表示持续的帧,1
表示文本帧,2
表示二进制帧等等。
case READING_FIRST:
if (!in.isReadable()) {
return;
}
framePayloadLength = 0;
// FIN, RSV, OPCODE
byte b = in.readByte();
frameFinalFlag = (b & 0x80) != 0;//取出FIN,表示是不是一帧的最后一段
frameRsv = (b & 0x70) >> 4;//取出RSV
frameOpcode = b & 0x0F;//取出Opcode
if (logger.isTraceEnabled()) {
logger.trace("Decoding WebSocket Frame opCode={}", frameOpcode);
}
state = State.READING_SECOND;
READING_SECOND
然后读取掩码位,读取长度,进行一些合法性的检查,如果违反协议了,就直接发送关闭帧。
case READING_SECOND:
if (!in.isReadable()) {
return;
}
// MASK, PAYLOAD LEN 1
b = in.readByte();//再读一个字节
frameMasked = (b & 0x80) != 0;//读取掩码,1表示存在,4字节,0不存在
framePayloadLen1 = b & 0x7F;//获取内容长度
if (frameRsv != 0 && !config.allowExtensions()) {//有扩展标志位,但是不允许扩展
protocolViolation(ctx, in, "RSV != 0 and no extension negotiated, RSV:" + frameRsv);
return;
}
if (!config.allowMaskMismatch() && config.expectMaskedFrames() != frameMasked) {//需要掩码加密,但是发来的没进行掩码加密
protocolViolation(ctx, in, "received a frame that is not masked as expected");
return;
}
//控制操作,关闭,ping,pong
if (frameOpcode > 7) { // control frame (have MSB in opcode set)
// control frames MUST NOT be fragmented
if (!frameFinalFlag) {//控制帧不用分段了
protocolViolation(ctx, in, "fragmented control frame");
return;
}
// control frames MUST have payload 125 octets or less
if (framePayloadLen1 > 125) {//长度超过125
protocolViolation(ctx, in, "control frame with payload length > 125 octets");
return;
}
//不为控制帧
// check for reserved control frame opcodes
if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING
|| frameOpcode == OPCODE_PONG)) {
protocolViolation(ctx, in, "control frame using reserved opcode " + frameOpcode);
return;
}
//关闭帧有内容的话,必须是2个字节的无符号整形表示状态码
// close frame : if there is a body, the first two bytes of the
// body MUST be a 2-byte unsigned integer (in network byte
// order) representing a getStatus code
if (frameOpcode == 8 && framePayloadLen1 == 1) {
protocolViolation(ctx, in, "received close control frame with payload len 1");
return;
}
} else { // data frame 数据帧,不是持续,文本,二进制帧的话也违反协议了
// check for reserved data frame opcodes
if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT
|| frameOpcode == OPCODE_BINARY)) {
protocolViolation(ctx, in, "data frame using reserved opcode " + frameOpcode);
return;
}
// check opcode vs message fragmentation state 1/2
if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) {//是持续帧,帧个数为0
protocolViolation(ctx, in, "received continuation data frame outside fragmented message");
return;
}
//帧的端数不为0,但是不是持续帧,也不是ping
// check opcode vs message fragmentation state 2/2
if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) {
protocolViolation(ctx, in,
"received non-continuation data frame while inside fragmented message");
return;
}
}
state = State.READING_SIZE;
protocolViolation违反协议
如果发现有违反协议的,直接把数据丢弃,如果通道没关闭,且设置了违反协议就关闭通道的话就发送关闭帧,抛出异常。
private void protocolViolation(ChannelHandlerContext ctx, ByteBuf in, CorruptedWebSocketFrameException ex) {
state = State.CORRUPT;//帧损坏的状态
int readableBytes = in.readableBytes();
if (readableBytes > 0) {
in.skipBytes(readableBytes);//略过,能帮助释放内存
}
if (ctx.channel().isActive() && config.closeOnProtocolViolation()) {//帧坏了就关闭通道
Object closeMessage;
if (receivedClosingHandshake) {
closeMessage = Unpooled.EMPTY_BUFFER;//空帧
} else {
WebSocketCloseStatus closeStatus = ex.closeStatus();
String reasonText = ex.getMessage();
if (reasonText == null) {
reasonText = closeStatus.reasonText();
}
closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);//封装成关闭帧
}
ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE);//发出去,成功后关闭通道
}
throw ex;//抛出异常
}
READING_SIZE
这个就是我前面说的处理长度的几种情况。
case READING_SIZE://处理长度
//如果是126的话,紧跟着后面需要有两个字节的长度
// Read frame payload length
if (framePayloadLen1 == 126) {
if (in.readableBytes() < 2) {
return;
}
framePayloadLength = in.readUnsignedShort();//读取2次节长度
if (framePayloadLength < 126) {//长度无效
protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
return;
}
} else if (framePayloadLen1 == 127) {//如果是127,后面需要8个字节
if (in.readableBytes() < 8) {
return;
}
framePayloadLength = in.readLong();
// TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe
// just check if it's negative?
if (framePayloadLength < 65536) {//小于等于2字节的
protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)");
return;
}
} else {
framePayloadLength = framePayloadLen1;//0-125的情况
}
//大于最大长度默认65536
if (framePayloadLength > config.maxFramePayloadLength()) {
protocolViolation(ctx, in, WebSocketCloseStatus.MESSAGE_TOO_BIG,
"Max frame length of " + config.maxFramePayloadLength() + " has been exceeded.");
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Decoding WebSocket Frame length={}", framePayloadLength);
}
state = State.MASKING_KEY;
MASKING_KEY
解析出掩码,其实这个掩码加密解密只是用了异或^
。
case MASKING_KEY://解析掩码
if (frameMasked) {//有掩码 4字节的
if (in.readableBytes() < 4) {
return;
}
if (maskingKey == null) {
maskingKey = new byte[4];
}
in.readBytes(maskingKey);
}
state = State.PAYLOAD;
PAYLOAD
有掩码先解码,然后根据不同的Opcode
类型封装成对应的帧数据。
case PAYLOAD://解析数据
if (in.readableBytes() < framePayloadLength) {
return;
}
ByteBuf payloadBuffer = null;
try {
payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength));
// Now we have all the data, the next checkpoint must be the next
// frame
state = State.READING_FIRST;//回到初始要解析的状态
// Unmask data if needed
if (frameMasked) {//如果有掩码,要解码
unmask(payloadBuffer);
}
// Processing ping/pong/close frames because they cannot be
// fragmented
if (frameOpcode == OPCODE_PING) {//如果是ping
out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
payloadBuffer = null;
return;
}
if (frameOpcode == OPCODE_PONG) {//如果是pong
out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
payloadBuffer = null;
return;
}
if (frameOpcode == OPCODE_CLOSE) {//收到关闭帧,也要回一个关闭帧
receivedClosingHandshake = true;
checkCloseFrameBody(ctx, payloadBuffer);
out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
payloadBuffer = null;
return;
}
// Processing for possible fragmented messages for text and binary
// frames
if (frameFinalFlag) {//是最后一帧
// Final frame of the sequence. Apparently ping frames are
// allowed in the middle of a fragmented message
if (frameOpcode != OPCODE_PING) {//允许中间发心跳帧,心跳帧不算,不是心跳帧才要清零
fragmentedFramesCount = 0;
}
} else {
// Increment counter
fragmentedFramesCount++;//帧个数+1,为持续帧
}
// Return the frame
if (frameOpcode == OPCODE_TEXT) {//文本类型
out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
payloadBuffer = null;
return;
} else if (frameOpcode == OPCODE_BINARY) {//二进制
out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer));
payloadBuffer = null;
return;
} else if (frameOpcode == OPCODE_CONT) {//持续帧
out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv,
payloadBuffer));
payloadBuffer = null;
return;
} else {
throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: "
+ frameOpcode);
}
} finally {
if (payloadBuffer != null) {//没有解析出来要释放
payloadBuffer.release();
}
}
unmask解码
其实就是取出4
字节掩码,封装成一个整数,然后跟数据进行每次8
位的轮询的异或运算解码。
private void unmask(ByteBuf frame) {
int i = frame.readerIndex();
int end = frame.writerIndex();
ByteOrder order = frame.order();
// Remark: & 0xFF is necessary because Java will do signed expansion from
// byte to int which we don't want.
int intMask = ((maskingKey[0] & 0xFF) << 24)
| ((maskingKey[1] & 0xFF) << 16)
| ((maskingKey[2] & 0xFF) << 8)
| (maskingKey[3] & 0xFF);
if (order == ByteOrder.LITTLE_ENDIAN) {
intMask = Integer.reverseBytes(intMask);
}
for (; i + 3 < end; i += 4) {
int unmasked = frame.getInt(i) ^ intMask;
frame.setInt(i, unmasked);
}
for (; i < end; i++) {
frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]);
}
}
CORRUPT
一般是有违反协议了,就丢弃了,但是就怕其他问题要读一帧,不然父类处理会出问题。
case CORRUPT://帧坏了
if (in.isReadable()) {
// If we don't keep reading Netty will throw an exception saying
// we can't return null if no bytes read and state not changed.
in.readByte();//要读一下,否则父类会报错
}
return;
问题在于父类ByteToMessageDecoder
的callDecode
中有:
如果没读的话就会报异常啦。不过搜寻能转为CORRUPT
的只有违反协议的方法,那个方法里面还是有略过读数据的,不会导致父类报错,我估计是如果自己手动操作状态为CORRUPT
,可能是要读一下的。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。