2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104521711

ByteToMessageDecoder

字节到消息的编码器,因此应该是实现入站操作的,这类解码器处理器都不是共享的,因为需要存还没有解码完的数据,还有各种状态,是独立的,不能进行共享。

202309132207035111.png

重要属性

先看看他的一些属性:

    	//状态码
        private static final byte STATE_INIT = 0;//初始状态
        private static final byte STATE_CALLING_CHILD_DECODE = 1;//正在调用子类解码
        private static final byte STATE_HANDLER_REMOVED_PENDING = 2;//处理器待删除
    
        ByteBuf cumulation;//累加缓冲区
        private Cumulator cumulator = MERGE_CUMULATOR;//默认是合并累加器
        private boolean singleDecode;//是否只解码一次
        private boolean first;//是否是第一次累加缓冲区
     	private boolean firedChannelRead;//自动读取是false的时候,是否要去调用ChannelHandlerContext的read()来设置监听读事件,可能没读完
    
    
    	//状态
        private byte decodeState = STATE_INIT;
        private int discardAfterReads = 16;//读取16个字节后丢弃已读的
        private int numReads;//cumulation读取数据的次数

内部会维护一个状态decodeState ,以便于如果在执行解码的时候处理器上下文被删除了,可以及时响应。
还有一个累加缓冲区,如果有不能拼成一个消息的数据会放入这个缓冲区里,等待下一次继续拼。当然累加缓冲区怎么累加,就需要有累加器,默认是合并累加器MERGE_CUMULATOR

MERGE_CUMULATOR合并累加器

主要是做一般缓冲区的合并,直接将新的缓冲区拷贝到累加缓冲区中。

        public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
            @Override
            public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
                if (!cumulation.isReadable() && in.isContiguous()) {//累计的不可读(比如为空缓冲区),且新的是连续的,不是符合缓冲区,释放老的,返回新的
                    cumulation.release();
                    return in;
                }
                try {
                    final int required = in.readableBytes();
                    if (required > cumulation.maxWritableBytes() ||
                            (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                            cumulation.isReadOnly()) {//扩容了
    
                        return expandCumulation(alloc, cumulation, in);
                    }
                    cumulation.writeBytes(in, in.readerIndex(), required);//将in写入
                    in.readerIndex(in.writerIndex());//in不可读了
                    return cumulation;
                } finally {
                    in.release();//返回前要释放in
                }
            }
        };

COMPOSITE_CUMULATOR复合累加器

另一个是复合累加器,也就是处理复合缓冲区,默认累加缓冲区也会是复合缓冲区。如果添加进来的缓冲区不可读,那就什么都不做,也就是复合缓冲区的累加方式。

     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
            @Override
            public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
                if (!cumulation.isReadable()) {//不可读了,直接返回in
                    cumulation.release();
                    return in;
                }
                CompositeByteBuf composite = null;
                try {
                    if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {//累计的是复合缓冲区且无其他引用
                        composite = (CompositeByteBuf) cumulation;
    
                        if (composite.writerIndex() != composite.capacity()) {//更新容量到写索引处
                            composite.capacity(composite.writerIndex());
                        }
                    } else {//如果不是复合缓冲区,就创建一个复合缓冲区把累计的添加进来
                        composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
                    }
                    composite.addFlattenedComponents(true, in);//再添加in
                    in = null;
                    return composite;
                } finally {
                    if (in != null) {//有异常,要释放缓冲区
                        in.release();
                        if (composite != null && composite != cumulation) {//有新的缓冲区申请的话也要释放
                            composite.release();
                        }
                    }
                }
            }
        };

抽象方法decode

其实有一个抽象方法需要子类实现,那就是具体的解码方法,参数in就是累加缓冲区,out可以理解为一个列表,存放解码后的对象。

     protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

channelRead读方法

解码器也是一个处理器,只是在业务处理器前面做解码用,当然也是在读数据的地方做处理啦。CodecOutputList暂时不用管,就当一个列表,存放解码出来的消息就行。其实流程就是将新来的缓冲区
msg加到累加的缓冲区cumulation中,然后返回的又赋值给cumulation,这样就做到了合并了,然后去进行解码,解码的结果放入列表out 中。最后再进行资源的释放,往后传递消息和列表的回收。

      @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {//只处理字节缓冲区类型的
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    first = cumulation == null;
                    cumulation = cumulator.cumulate(ctx.alloc(),
                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);//累加
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Exception e) {
                    throw new DecoderException(e);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {//不为空也不可读,要释放
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {//读取数据的次数大于阈值,则尝试丢弃已读的,避免占着内存
    
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();//有被添加或者设置,表是有读过了
                    fireChannelRead(ctx, out, size);//尝试传递数据
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);//其他类型继续传递
            }
        }

callDecode解码

只要判断新的缓冲区in还有可读的,就进行解码,当然最开始消息列表out是空的,所以就进行子类来解码decodeRemovalReentryProtection,解码后看是否真正读取了缓冲区的内容,如果没读,说明不符合子类解码器的要求,就跳出循环了。如果能读取,就判断是否只解码一次,是就跳出,不是就继续读取来解码,解码好的消息会马上传递给后面,并把消息列表清空,当然这里不一定一次解码1个消息,也可能一次很多个。当然每次完成解码或者传递消息后要进行上下文是否被移除的检查,如果被移除了,就不能再进行处理了。

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {//有可读的
                    int outSize = out.size();//数量
    
                    if (outSize > 0) {//有消息解码出来就先传递了
                        fireChannelRead(ctx, out, outSize);//有解码好的数据就传递给后面
                        out.clear();//清空
    
                        if (ctx.isRemoved()) {//上下文被删除了就不处理了
                            break;
                        }
                        outSize = 0;
                    }
                    //继续解码
                    int oldInputLength = in.readableBytes();//还以后多少字节可读
                    decodeRemovalReentryProtection(ctx, in, out);//解码
    
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    if (outSize == out.size()) {//没有生成新的消息,可能要求不够无法解码出一个消息
                        if (oldInputLength == in.readableBytes()) {//没有读取数据
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    if (oldInputLength == in.readableBytes()) {//解码器没有读数据
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                        ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {//是否每次只解码一条,就返回
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Exception cause) {
                throw new DecoderException(cause);
            }
        }

fireChannelRead传递消息列表中的消息

这个方法是个用来传递消息列表中的所有消息的,判断消息列表是不是CodecOutputList类型,是的话就调用相应的获取方法getUnsafe来传递,这个获取消息的方法可能是不安全的,因为没做索引的越界检查,可能会越界。如果是一般的列表,就直接调用get方法获得。

    static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
            if (msgs instanceof CodecOutputList) {//如果是CodecOutputList类型的
                fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
            } else {//正常获取对象,传递下去
                for (int i = 0; i < numElements; i++) {
                    ctx.fireChannelRead(msgs.get(i));//传递每一个
                }
            }
        }
    
        // 传递CodecOutputList中的每一个对象
        static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
            for (int i = 0; i < numElements; i ++) {
                ctx.fireChannelRead(msgs.getUnsafe(i));
            }
        }
        
     	Object getUnsafe(int index) {
            return array[index];
        }

decodeRemovalReentryProtection调用子类来解码

主要是调用子类实现的decode方法来解码,最后会考虑处理器是否被删除了,做一些处理。

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                throws Exception {
            decodeState = STATE_CALLING_CHILD_DECODE;//设置为子类解码
            try {
                decode(ctx, in, out);//调用子类解码
            } finally {
                boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;//是否待删除状态
                decodeState = STATE_INIT;//处理完了设置为初始化
                if (removePending) {//如果有被设置待删除状态,就马上处理
                    fireChannelRead(ctx, out, out.size());//把数据传出去
                    out.clear();//清空
                    handlerRemoved(ctx);//删除
                }
            }
        }

FixedLengthFrameDecoder的decode

举个例子,拿这个固定长的来看看他的解码方法,其实就是调用自定义的解码方法decode,然后把结果放进消息队列out中。具体的解码就是看可读数据是否大于等于固定长,如果是,就进行缓冲区的保留切片,切出固定长的缓冲区,这里为什么要保留切片呢,因为切片是共享原缓冲区的数据的,如果源缓冲区用完了可能被释放,所以需要保留一下,增加引用计数,当然在切片释放的时候,也会释放源缓冲区的。 注意如果没达到解码器要求的,可能不会去读取缓冲区数据。

     @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
                out.add(decoded);
            }
        }
        
    	protected Object decode(
                @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (in.readableBytes() < frameLength) {//如果可读字节的小于固定长度,什么都不做
                return null;
            } else {
                return in.readRetainedSlice(frameLength);//返回的是切片,会增加in引用计数,防止被回收了
            }
        }

channelReadComplete读完成方法

当数据读取完成的时候,会尝试去丢弃discardSomeReadBytes累加缓冲区的已读信息,虽然可能要进行拷贝消耗点新能,但是放在那里浪费内存,所以就先丢弃了。之后判断是否有读取过缓存区的内容,如果没读到数据(可能没达到解码器要求,不读取数据),且没设置自动去读的,就手动设置一次监听读事件,可能后面还有部分没发过来,发过来了就可以解码拼成一个完整消息了。最后在传递读完成事件。

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            numReads = 0;
            discardSomeReadBytes();
            if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {//如果没有读到数据,且没有自动开启读,就设置读事件
                ctx.read();
            }
            firedChannelRead = false;
            ctx.fireChannelReadComplete();
        }

discardSomeReadBytes丢弃已读数据

如果缓冲区不为空,而且没有别的引用指向他,就丢弃已读的数据。

     protected final void discardSomeReadBytes() {
            if (cumulation != null && !first && cumulation.refCnt() == 1) {//当引用值有1的时候丢弃,否则用户可能有其他用就不能直接丢弃
                cumulation.discardSomeReadBytes();
            }
        }

decodeLast最后解码

在通道失效之前,会进行最后一次解码,以便于取出剩下的数据解码,当然如果没有数据,那等于什么都没做:

    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.isReadable()) {//如果还能读的话把剩下的解码
                decodeRemovalReentryProtection(ctx, in, out);
            }
        }

其他的一些方法就不多说了,理解了这些其他的都没太大难度了,后面再介绍下他的一些常用子类是怎么实现的。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文