跟我一起开发商业级IM(3)—— 长连接稳定性之连接及重连

 2023-01-02
原文作者:FreddyChen 原文地址:https://juejin.cn/post/6868939934804213767

欢迎转载,转载请注明出处:juejin.cn/post/686893…

202212302237231631.png

写在前面

贴个Kula高清图镇楼:

202212302237238672.png

在之前的跟我一起开发商业级IM(1)—— 技术选型及协议定义跟我一起开发商业级IM(2)—— 接口定义及封装两篇文章,我们已经了解IMS的技术选型及接口定义与封装,接下来,我们来真正实现连接及重连部分。

一个社交产品,长连接稳定是前提,绝大部分业务逻辑的正常运行都需要稳定的长连接支撑,可谓重中之重。本篇文章将会讲述如何去实现并维护一个稳定的长连接,以及各种异常情况的处理等。阅读完本篇文章,你将会学到连接、重连机制、心跳机制等知识。同时,会在Github上开源相关代码(包含Android客户端/Java服务端、基于TCP/WebSocket),废话不说,我们开始吧。

初始化配置

初始化配置,也就是在应用程序启动并进行IMS初始化时,传入所需配置参数,可根据自己的业务需求自定义。下面我们来看看NettyTCPIMS初始化接口的代码实现( 由于基于NettyWebSocket实现的NettyWebSocketIMS大部分代码及逻辑都与NettyTCPIMS相同,就不单独贴出NettyWebSocketIMS代码了,下面只会讲解WebSocket对比TCP实现所不同的地方 ,有需要完整代码的话可以跳转Github查看):

     /**
      * 初始化
      * @param context
      * @param options               IMS初始化配置
      * @param connectStatusListener IMS连接状态监听
      * @param msgReceivedListener   IMS消息接收监听
      * @return
      */
     @Override
     public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
        if (context == null) {
            Log.d(TAG, "初始化失败:Context is null.");
            initialized = false;
            return false;
        }
    
        if (options == null) {
            Log.d(TAG, "初始化失败:IMSOptions is null.");
            initialized = false;
            return false;
        }
        this.mContext = context;
        this.mIMSOptions = options;
        this.mIMSConnectStatusListener = connectStatusListener;
        this.mIMSMsgReceivedListener = msgReceivedListener;
        executors = new ExecutorServiceFactory();
        // 初始化重连线程池
        executors.initBossLoopGroup();
        // 注册网络连接状态监听
        NetworkManager.getInstance().registerObserver(context, this);
        // 标识ims初始化成功
        initialized = true;
        // 标识ims已打开
        isClosed = false;
        callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
        return true;
    }

如上图,简单讲讲初始化的几个步骤:

  1. 参数
  • context 应用程序上下文,方便IMS获取系统资源并进行一些系统操作等。
  • options IMS初始化所需配置,其中定义了通信实现方式、通信协议、传输协议、连接超时时间、重连延时时间、重连次数、心跳前后台间隔时间、服务器地址等一些支持自定义的参数。
  • connectStatusListener IMS连接状态回调,便于把连接状态反馈到应用层。
  • msgReceivedListener 消息接收回调,便于IMS把接收到的消息回调到应用层( 本篇文章主要讲解连接及重连,所以不涉及消息部分,后续会详细讲解 )。
  1. 创建线程池组
    线程池组分为boss线程池和work线程池,其中boss线程池负责连接及重连部分;work线程池负责心跳部分,均为单线程线程池(因为同时只能有一个线程进行连接或心跳)。至于为什么用线程池,纯属个人习惯,大家也可以分别用一个子线程实现即可。
  2. 注册网络状态监听
    网络变化时,进行IMS重连。

初始Bootstrap

初始化Bootstrap,可参考Netty ChannelOption并根据实际业务场景进行定制,下面贴出我自己定制的配置:

    /**
     * 初始化bootstrap
     */
    void initBootstrap() {
        closeBootstrap();// 初始化前先关闭
        NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
        bootstrap = new Bootstrap();
        bootstrap.group(loopGroup).channel(NioSocketChannel.class)
                // 设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 设置禁用nagle算法,如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false
                .option(ChannelOption.TCP_NODELAY, true)
                // 设置TCP发送缓冲区大小(字节数)
                .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                // 设置TCP接收缓冲区大小(字节数)
                .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                // 设置连接超时时长,单位:毫秒
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
                // 设置初始化ChannelHandler
                .handler(new NettyTCPChannelInitializerHandler(this));
    }

至于参数的含义,大家可参照官方文档的介绍。

连接

连接也可以认为是重连,执行重连响应逻辑即可:

    /**
     * 连接
     */
    @Override
    public void connect() {
        if(!initialized) {
            Log.w(TAG, "IMS初始化失败,请查看日志");
            return;
        }
        isExecConnect = true;// 标识已执行过连接
        this.reconnect(true);
    }

所以我们直接看重连部分,也是整篇文章中最核心最复杂的部分

重连

因为连接及重连部分代码较多及逻辑较复杂,为了使NettyTCPIMS代码尽量简洁及逻辑清晰,所以将连接及重连部分代码抽取到NettyTCPReconnectTask

    public class NettyTCPReconnectTask implements Runnable {
    
        private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
        private NettyTCPIMS ims;
        private IMSOptions mIMSOptions;
    
        NettyTCPReconnectTask(NettyTCPIMS ims) {
            this.ims = ims;
            this.mIMSOptions = ims.getIMSOptions();
        }
    
        @Override
        public void run() {
            try {
                // 重连时,释放工作线程组,也就是停止心跳
                ims.getExecutors().destroyWorkLoopGroup();
    
                // ims未关闭并且网络可用的情况下,才去连接
                while (!ims.isClosed() && ims.isNetworkAvailable()) {
                    IMSConnectStatus status;
                    if ((status = connect()) == IMSConnectStatus.Connected) {
                        ims.callbackIMSConnectStatus(status);
                        break;// 连接成功,跳出循环
                    }
    
                    if (status == IMSConnectStatus.ConnectFailed
                            || status == IMSConnectStatus.ConnectFailed_IMSClosed
                            || status == IMSConnectStatus.ConnectFailed_ServerListEmpty
                            || status == IMSConnectStatus.ConnectFailed_ServerEmpty
                            || status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
                            || status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
                        ims.callbackIMSConnectStatus(status);
    
                        if(ims.isClosed() || !ims.isNetworkAvailable()) {
                            return;
                        }
                        // 一个服务器地址列表都连接失败后,说明网络情况可能很差,延时指定时间(重连间隔时间*2)再去进行下一个服务器地址的连接
                        Log.w(TAG, String.format("一个周期连接失败,等待%1$dms后再次尝试重连", mIMSOptions.getReconnectInterval() * 2));
                        try {
                            Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } finally {
                // 标识重连任务停止
                ims.setReconnecting(false);
            }
        }
    
        /**
         * 连接服务器
         * @return
         */
        private IMSConnectStatus connect() {
            if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
            ims.initBootstrap();
            List<String> serverList = mIMSOptions.getServerList();
            if (serverList == null || serverList.isEmpty()) {
                return IMSConnectStatus.ConnectFailed_ServerListEmpty;
            }
    
            for (int i = 0; i < serverList.size(); i++) {
                String server = serverList.get(i);
                if (StringUtil.isNullOrEmpty(server)) {
                    return IMSConnectStatus.ConnectFailed_ServerEmpty;
                }
    
                String[] params = null;
                try {
                    params = server.split(" ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (params == null || params.length < 2) {
                    return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
                }
    
                if(i == 0) {
                    ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
                }
    
                // +1是因为首次连接也认为是重连,所以如果重连次数设置为3,则最大连接次数为3+1次
                for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
                    if (ims.isClosed()) {
                        return IMSConnectStatus.ConnectFailed_IMSClosed;
                    }
                    if (!ims.isNetworkAvailable()) {
                        return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
                    }
    
                    Log.d(TAG, String.format("正在进行【%1$s】的第%2$d次连接", server, j + 1));
                    try {
                        String host = params[0];
                        int port = Integer.parseInt(params[1]);
                        Channel channel = toServer(host, port);
                        if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
                            ims.setChannel(channel);
                            return IMSConnectStatus.Connected;
                        } else {
                            if (j == mIMSOptions.getReconnectCount()) {
                                // 如果当前已达到最大重连次数,并且是最后一个服务器地址,则回调连接失败
                                if(i == serverList.size() - 1) {
                                    Log.w(TAG, String.format("【%1$s】连接失败", server));
                                    return IMSConnectStatus.ConnectFailed;
                                }
                                // 否则,无需回调连接失败,等待一段时间再去进行下一个服务器地址连接即可
                                // 也就是说,当服务器地址列表里的地址都连接失败,才认为是连接失败
                                else {
                                    // 一个服务器地址连接失败后,延时指定时间再去进行下一个服务器地址的连接
                                    Log.w(TAG, String.format("【%1$s】连接失败,正在等待进行下一个服务器地址的重连,当前重连延时时长:%2$dms", server, mIMSOptions.getReconnectInterval()));
                                    Log.w(TAG, "=========================================================================================");
                                    Thread.sleep(mIMSOptions.getReconnectInterval());
                                }
                            } else {
                                // 连接失败,则线程休眠(重连间隔时长 / 2 * n) ms
                                int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
                                Log.w(TAG, String.format("【%1$s】连接失败,正在等待重连,当前重连延时时长:%2$dms", server, delayTime));
                                Thread.sleep(delayTime);
                            }
                        }
                    } catch (InterruptedException e) {
                        break;// 线程被中断,则强制关闭
                    }
                }
            }
    
            return IMSConnectStatus.ConnectFailed;
        }
    
        /**
         * 真正连接服务器的地方
         * @param host
         * @param port
         * @return
         */
        private Channel toServer(String host, int port) {
            Channel channel;
            try {
                channel = ims.getBootstrap().connect(host, port).sync().channel();
            } catch (Exception e) {
                e.printStackTrace();
                channel = null;
            }
    
            return channel;
        }
    }

从以上代码,可以看到主要分为三个方法:

  • run() 重连任务是一个Thread,run()方法也就是线程启动时执行的方法,主要是判断IMS是否关闭和网络状态,满足这两个条件就一直循环去连接,连接成功后,回调连接状态并停止线程,否则,一个周期连接失败后(一个连接失败周期,代表从开始连接到所有服务器地址达到最大重连次数),延时一段时间再去尝试重连(大家可能会问为什么要去延时,直接连接不好吗?主要是因为如果连接失败的话,大多数情况下可能是客户端网络环境不好或者是服务端存在问题,延时是为了在下一个时间节点时网络恢复等,避免频繁连接,节约性能),直至连接成功为止。

  • toServer() toServer()主要是Netty框架进行TCP长连接的代码,比较简单。

  • connect() 连接及重连的所有逻辑,都放到connect()方法中进行。TCPWebSocket的方式有细微的区别,下面主要以TCP为例,至于WebSocket的区别,稍后会列出来。
    注:ims_kulaSDK中固定的TCP服务器地址的格式为:IP地址 端口号,例:192.168.0.1 8808,大家也可以根据自己的需求来定义格式。

    connect()方法大体逻辑如下:

    • 判断IMS是否已关闭或网络是否不可用,若满足两个条件的其中之一,即返回连接失败状态;
    • 判断用户是否设置了服务器地址列表,若未设置,即返回连接失败状态;
    • 若以上条件都未满足,也就是IMS未关闭,网络可用,并且服务器地址已设置,则初始化Bootstrap;
    • 接着需要两个for循环,外层循环负责遍历服务器地址列表,取出每一个服务器地址;内层循环负责遍历用户设置的最大重连次数,默认为3次,加上连接所需的一次,也就是说在不设置最大重连次数的情况下,ims_kulaSDK会对每个服务器地址进行 4 次连接。同时,重连间隔时间为 reconnectInterval + reconnectInterval / 2 * n,也就是如果设置重连间隔时间为8000ms,那么第二次重连间隔时间将为12000ms,第三次为16000ms,以此类推;
    • 获取服务器地址后,对地址进行字符串分割,分别获取host和port;
    • 接着调用Netty连接TCP的方法(toServer(String host, int port))进行连接即可。

注:WebSocket连接方式与TCP大同小异,唯一的区别就是WebSocket的服务器地址格式与TCP不同,ws://IP地址:端口号/websocket,例:ws://192.168.0.1:8808/websocket,所以WebSocket获取host和port代码如下:
(伪代码,具体代码可见NettyWebSocketReconnectTask

    URI uri = URI.create(server);
    String host = uri.getHost();
    int port = uri.getPort();

至于其余连接及重连部分代码,WebSocketTCP是一致的,因为WebSocket本身就是基于TCP协议作一层封装。

何时重连及断开连接

首先明确一下, 重连是相对于客户端来说的,服务端不存在主动连接;断开连接是相对于服务端来说的,严格来说是移除响应的Channel。

客户端重连时机:

  • 网络切换
  • 断开网络连接
  • 可感知的服务端异常
  • 心跳超时等

服务端断开连接时机:

  • 可感知的客户端异常
  • 心跳超时
  • 同一IP的客户端重复连接等

不知道大家注意到没有,上述的客户端重连时机和服务端断开连接时机,都分别有一个可感知异常。什么是可感知异常?也就是无论客户端还是服务端,在对方断开连接的时候,可以感知到,就是可感知异常。

经测试,在双方建立连接成功的状态下,对于客户端来说,如果服务端手动停止服务,Netty会回调exceptionCaught()方法,如下:

202212302237289243.png
服务端直接关机或者拔网线时,客户端无法感知,需要利用心跳超时机制进行重连。

同理,对于服务端来说,如果客户端手动杀死进程,Netty会回调channelInactive()方法,如下:

202212302237299824.png
客户端直接关机或者断网时,服务端无法感知,同样需要利用心跳机制进行断开客户端连接(移除channel)。

注:利用心跳超时机制进行重连及断开连接会在后续文章讲解,本篇文章主要讲解连接及重连,就不在此展开了。

效果展示

考虑到GIF图片体积过大,暂时先把连接超时时间和重连间隔时间适当缩短,下面展示几种情况下的客户端连接变化:

  • 正常连接

    202212302237315455.png

  • 客户端主动断网重连

    202212302237331546.png

  • 服务端停止服务重连

    202212302237354367.png

注:以上GIF图,客户端与服务端建立连接成功时,会显示“消息”字样,否则会显示连接状态。

客户端日志如下:

202212302237368858.png

服务端日志如下:

202212302237376979.png

由于客户端杀死进程及服务端主动停止服务,日志会清空,所以就不贴更详细的日志了,感兴趣的同学可以pull代码自行验证。

写在最后

通过以上代码实现,如果不考虑长连接稳定性的情况下(未加入心跳超时重连逻辑),已经可以进行客户端与服务端消息的收发,本文主要讲解连接及重连模块,所以暂未加入消息收发功能。

在下一篇文章中,将会讲解TCP/WebSocket的拆包与粘包处理,由于Netty已封装了各种不同的消息编解码器,所以如果使用我定义的消息格式,拆包与粘包的处理将会很简单,直接拿来用即可。考虑到大家可能有不同业务协议的需求,所以会加入自定义协议的消息编解码器的实现,敬请期待。

相关代码已提交Github,需要自取:

PS:新开的公众号不能留言,如果大家有不同的意见或建议,可以到掘金上评论或者加到QQ群:1015178804,如果群满人的话,也可以在公众号给我私信,谢谢。
贴上公众号:
FreddyChen

2022123022373842310.png

下篇文章见,古德拜~ ~ ~