Netty对Keepalive和idel的检测与支持

 2023-01-21
原文作者:大胖鱼鱼 原文地址:https://juejin.cn/post/7030239170551349285

1:设置的Keepalive是怎么生效的

1.1:如何设置

这是EchoServer里面提供的 两种设置Keepalive方法的选项

![202212302204185191.png][]

1.2 如何生效的

    //1:我们来看下ChildOption是如何设置选项的
    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            if (value == null) {
                childOptions.remove(childOption);
            } else {
                //和客户端做链接的childOption
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
    
    //2:看一下chileOptions (取关键字了)
    在ServerBootStrap.java 
    
    51行设置了一个Option选项Map
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    
    140行转为一个Entry
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    
    152行在链接后的一个处理程序将currentChildOptions 作为参数传递
    ch.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
    
    //3:查看ServerBootstrapAcceptor(是内部类)的Read方法
    
    关键的一行 设置Option
    setChannelOptions(child, childOptions, logger);
    
    static void setChannelOptions(
            Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
        for (Map.Entry<ChannelOption<?>, Object> e: options) {
            setChannelOption(channel, e.getKey(), e.getValue(), logger);
        }
    }
    
    这个是里面的调用链 
    private static void setChannelOption(
            Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
        try {
            if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
                logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
            }
        } catch (Throwable t) {
            logger.warn(
                    "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
        }
    }
    
    在下异步的时候,我们选用NioWebSocketChannelOption 因为前面设置的时候是使用的这种方式 
    
    这里我们可以看到 有两种设置方式,那么我们接下来就可以看一下
    
    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
            return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
        }
        return super.setOption(option, value);
    }

2:两种设置Keepalive的方式有什么区别

2.1:NioChannelOption.setOption设置方式

    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {
        java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;
        if (!channel.supportedOptions().contains(option.option)) {
            return false;
        }
        //因为jdk有bug 所以当选项为IP_TOP的时候直接返回false 不支持
        if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {
            // Skip IP_TOS as a workaround for a JDK bug:
            // See https://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.html
            return false;
        }
        try {
            //调用的是jdk的方法 设置到Channel里面了
            channel.setOption(option.option, value);
            return true;
        } catch (IOException e) {
            throw new ChannelException(e);
        }
    }

2.2:父类构造器也就是default方式

    //这个就是默认实现方式 这种方式显然没有上面对应的jdk
    //的实现方式比较好
    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);
    
        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_SNDBUF) {
            setSendBufferSize((Integer) value);
        } else if (option == TCP_NODELAY) {
            setTcpNoDelay((Boolean) value);
        }
        
        else if (option == SO_KEEPALIVE) {
            setKeepAlive((Boolean) value);
        }
        
        else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_LINGER) {
            setSoLinger((Integer) value);
        } else if (option == IP_TOS) {
            setTrafficClass((Integer) value);
        } else if (option == ALLOW_HALF_CLOSURE) {
            setAllowHalfClosure((Boolean) value);
        } else {
            return super.setOption(option, value);
        }
    
        return true;
    }

2.3 哪种方式比较好

肉眼可见,还是使用NIO的设置方式比较好,因为直接调用了jdk的方法,不用那么多的if else的判断效率还是比 default方式好很多的
注意:这里还有一个oio的方式 不做讲解

3:idle检测类包(io.netty.handler.timeout)的功能浏览

手写描述这几个类的和类的功能:

3.1 IdleState

    该类定义了三个状态 
    读空闲: READER_IDLE
    写空闲:WRITER_IDLE
    全部空闲:ALL_IDLE

3.2 IdleStateEvent

    这里定义了六种状态时对IdleState的状态升级 标识了
    是否是第一次的读或者写 
    public static final FIRST_READER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.READER_IDLE, true);
    public static final IdleStateEvent READER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.READER_IDLE, false);
    public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.WRITER_IDLE, true);
    public static final IdleStateEvent WRITER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.WRITER_IDLE, false);
    public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.ALL_IDLE, true);
    public static final IdleStateEvent ALL_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.ALL_IDLE, false);

3.3:IdleStateHandler

里面有四个内部类分别是

      AbstractIdleTask<br> 
         传入了:Channel的上下文 集成了Runnable
         判断当先小消息通道是否有打开 
         实现了run方法  调用了接下来ide三个具体实现
      
      ReadIdleTimeOut<br>
          判断当前是否应该有Idle
          如果是空闲了,那么就弄一个定时任务去执行Idle检测
          如果没有检测到Idle那就重置Idle监测时间
          
      WriterIdlerTimeout<br>
          和读取有一个地方有区别
          Read是判断是否是空闲 而Write是判断是否是
          读取完毕
    //和读Idle不同的地方
    if (hasOutputChanged(ctx, first)) {
    return;
    }
/**
* 正常情况下,flase,即写空闲的判断中的写是指写成功,但实际上有这几种情况
* (1)写了,但是缓存区满了,写不出去
* (2)写了,一个大"数据",写一直在动,但是没有完成
* 所以这个参数,判断是否有写的意图,而不是判断是否写成功
**/
      
  AllIdleTimeoutTask<br>
  这个和Read意思相似

#### 3.4ReadTimeOut和ReadTimeOutException ####
当判定通道不活跃时,触发此开关,然后将其关闭,
并抛出ReadTimeOutException
如果活跃就重新计算定时任务

#### 3.5WriteTimeOut和WriteTimeOutException ####
    大致流程和ReadTimeOut一样 但是 
    这个不是算是否WriteIdle 是判断当前是否是读状态
    或者写状态 上边的IdleHandler有贴出代码 

//和读Idle不同的地方
if (hasOutputChanged(ctx, first)) {
    return;
}

### 4:idle检测的原理(读写) ###

执行流程: 其实就是在初始化Handler的时候,添加一个事件处理器 检测是否有读写 如果没有那么就关闭这个Channel 如果有,那么每次检测都会重置时间.

其实我在想:如果使用分布式的方式做这个执行计划

可不可以:使用Reids的滑动窗口来进行定时任务的增加和删除 有兴趣的可以了解一下(或者等我出下一篇)


[202212302204185191.png]: http://image.skjava.com/article/common/2022/12/202212302204185191.png