欢迎转载,转载请注明出处:juejin.cn/post/686893…
写在前面
贴个Kula高清图镇楼:
在之前的跟我一起开发商业级IM(1)—— 技术选型及协议定义和跟我一起开发商业级IM(2)—— 接口定义及封装两篇文章,我们已经了解IMS的技术选型及接口定义与封装,接下来,我们来真正实现连接及重连部分。
一个社交产品,长连接稳定是前提,绝大部分业务逻辑的正常运行都需要稳定的长连接支撑,可谓重中之重。本篇文章将会讲述如何去实现并维护一个稳定的长连接,以及各种异常情况的处理等。阅读完本篇文章,你将会学到连接、重连机制、心跳机制等知识。同时,会在Github上开源相关代码(包含Android客户端/Java服务端、基于TCP/WebSocket),废话不说,我们开始吧。
初始化配置
初始化配置,也就是在应用程序启动并进行IMS
初始化时,传入所需配置参数,可根据自己的业务需求自定义。下面我们来看看NettyTCPIMS
初始化接口的代码实现( 由于基于Netty
和WebSocket
实现的NettyWebSocketIMS
大部分代码及逻辑都与NettyTCPIMS
相同,就不单独贴出NettyWebSocketIMS
代码了,下面只会讲解WebSocket
对比TCP
实现所不同的地方 ,有需要完整代码的话可以跳转Github查看):
/**
* 初始化
* @param context
* @param options IMS初始化配置
* @param connectStatusListener IMS连接状态监听
* @param msgReceivedListener IMS消息接收监听
* @return
*/
@Override
public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
if (context == null) {
Log.d(TAG, "初始化失败:Context is null.");
initialized = false;
return false;
}
if (options == null) {
Log.d(TAG, "初始化失败:IMSOptions is null.");
initialized = false;
return false;
}
this.mContext = context;
this.mIMSOptions = options;
this.mIMSConnectStatusListener = connectStatusListener;
this.mIMSMsgReceivedListener = msgReceivedListener;
executors = new ExecutorServiceFactory();
// 初始化重连线程池
executors.initBossLoopGroup();
// 注册网络连接状态监听
NetworkManager.getInstance().registerObserver(context, this);
// 标识ims初始化成功
initialized = true;
// 标识ims已打开
isClosed = false;
callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
return true;
}
如上图,简单讲讲初始化的几个步骤:
- 参数
- context 应用程序上下文,方便IMS获取系统资源并进行一些系统操作等。
- options IMS初始化所需配置,其中定义了通信实现方式、通信协议、传输协议、连接超时时间、重连延时时间、重连次数、心跳前后台间隔时间、服务器地址等一些支持自定义的参数。
- connectStatusListener IMS连接状态回调,便于把连接状态反馈到应用层。
- msgReceivedListener 消息接收回调,便于IMS把接收到的消息回调到应用层( 本篇文章主要讲解连接及重连,所以不涉及消息部分,后续会详细讲解 )。
- 创建线程池组
线程池组分为boss线程池和work线程池,其中boss线程池负责连接及重连部分;work线程池负责心跳部分,均为单线程线程池(因为同时只能有一个线程进行连接或心跳)。至于为什么用线程池,纯属个人习惯,大家也可以分别用一个子线程实现即可。 - 注册网络状态监听
网络变化时,进行IMS重连。
初始Bootstrap
初始化Bootstrap,可参考Netty ChannelOption并根据实际业务场景进行定制,下面贴出我自己定制的配置:
/**
* 初始化bootstrap
*/
void initBootstrap() {
closeBootstrap();// 初始化前先关闭
NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
bootstrap = new Bootstrap();
bootstrap.group(loopGroup).channel(NioSocketChannel.class)
// 设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, true)
// 设置禁用nagle算法,如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false
.option(ChannelOption.TCP_NODELAY, true)
// 设置TCP发送缓冲区大小(字节数)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
// 设置TCP接收缓冲区大小(字节数)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
// 设置连接超时时长,单位:毫秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
// 设置初始化ChannelHandler
.handler(new NettyTCPChannelInitializerHandler(this));
}
至于参数的含义,大家可参照官方文档的介绍。
连接
连接也可以认为是重连,执行重连响应逻辑即可:
/**
* 连接
*/
@Override
public void connect() {
if(!initialized) {
Log.w(TAG, "IMS初始化失败,请查看日志");
return;
}
isExecConnect = true;// 标识已执行过连接
this.reconnect(true);
}
所以我们直接看重连部分,也是整篇文章中最核心最复杂的部分 。
重连
因为连接及重连部分代码较多及逻辑较复杂,为了使NettyTCPIMS
代码尽量简洁及逻辑清晰,所以将连接及重连部分代码抽取到NettyTCPReconnectTask
:
public class NettyTCPReconnectTask implements Runnable {
private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
private NettyTCPIMS ims;
private IMSOptions mIMSOptions;
NettyTCPReconnectTask(NettyTCPIMS ims) {
this.ims = ims;
this.mIMSOptions = ims.getIMSOptions();
}
@Override
public void run() {
try {
// 重连时,释放工作线程组,也就是停止心跳
ims.getExecutors().destroyWorkLoopGroup();
// ims未关闭并且网络可用的情况下,才去连接
while (!ims.isClosed() && ims.isNetworkAvailable()) {
IMSConnectStatus status;
if ((status = connect()) == IMSConnectStatus.Connected) {
ims.callbackIMSConnectStatus(status);
break;// 连接成功,跳出循环
}
if (status == IMSConnectStatus.ConnectFailed
|| status == IMSConnectStatus.ConnectFailed_IMSClosed
|| status == IMSConnectStatus.ConnectFailed_ServerListEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
|| status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
ims.callbackIMSConnectStatus(status);
if(ims.isClosed() || !ims.isNetworkAvailable()) {
return;
}
// 一个服务器地址列表都连接失败后,说明网络情况可能很差,延时指定时间(重连间隔时间*2)再去进行下一个服务器地址的连接
Log.w(TAG, String.format("一个周期连接失败,等待%1$dms后再次尝试重连", mIMSOptions.getReconnectInterval() * 2));
try {
Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
// 标识重连任务停止
ims.setReconnecting(false);
}
}
/**
* 连接服务器
* @return
*/
private IMSConnectStatus connect() {
if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
ims.initBootstrap();
List<String> serverList = mIMSOptions.getServerList();
if (serverList == null || serverList.isEmpty()) {
return IMSConnectStatus.ConnectFailed_ServerListEmpty;
}
for (int i = 0; i < serverList.size(); i++) {
String server = serverList.get(i);
if (StringUtil.isNullOrEmpty(server)) {
return IMSConnectStatus.ConnectFailed_ServerEmpty;
}
String[] params = null;
try {
params = server.split(" ");
} catch (Exception e) {
e.printStackTrace();
}
if (params == null || params.length < 2) {
return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
}
if(i == 0) {
ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
}
// +1是因为首次连接也认为是重连,所以如果重连次数设置为3,则最大连接次数为3+1次
for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
if (ims.isClosed()) {
return IMSConnectStatus.ConnectFailed_IMSClosed;
}
if (!ims.isNetworkAvailable()) {
return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
}
Log.d(TAG, String.format("正在进行【%1$s】的第%2$d次连接", server, j + 1));
try {
String host = params[0];
int port = Integer.parseInt(params[1]);
Channel channel = toServer(host, port);
if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
ims.setChannel(channel);
return IMSConnectStatus.Connected;
} else {
if (j == mIMSOptions.getReconnectCount()) {
// 如果当前已达到最大重连次数,并且是最后一个服务器地址,则回调连接失败
if(i == serverList.size() - 1) {
Log.w(TAG, String.format("【%1$s】连接失败", server));
return IMSConnectStatus.ConnectFailed;
}
// 否则,无需回调连接失败,等待一段时间再去进行下一个服务器地址连接即可
// 也就是说,当服务器地址列表里的地址都连接失败,才认为是连接失败
else {
// 一个服务器地址连接失败后,延时指定时间再去进行下一个服务器地址的连接
Log.w(TAG, String.format("【%1$s】连接失败,正在等待进行下一个服务器地址的重连,当前重连延时时长:%2$dms", server, mIMSOptions.getReconnectInterval()));
Log.w(TAG, "=========================================================================================");
Thread.sleep(mIMSOptions.getReconnectInterval());
}
} else {
// 连接失败,则线程休眠(重连间隔时长 / 2 * n) ms
int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
Log.w(TAG, String.format("【%1$s】连接失败,正在等待重连,当前重连延时时长:%2$dms", server, delayTime));
Thread.sleep(delayTime);
}
}
} catch (InterruptedException e) {
break;// 线程被中断,则强制关闭
}
}
}
return IMSConnectStatus.ConnectFailed;
}
/**
* 真正连接服务器的地方
* @param host
* @param port
* @return
*/
private Channel toServer(String host, int port) {
Channel channel;
try {
channel = ims.getBootstrap().connect(host, port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
channel = null;
}
return channel;
}
}
从以上代码,可以看到主要分为三个方法:
-
run() 重连任务是一个Thread,
run()
方法也就是线程启动时执行的方法,主要是判断IMS是否关闭和网络状态,满足这两个条件就一直循环去连接,连接成功后,回调连接状态并停止线程,否则,一个周期连接失败后(一个连接失败周期,代表从开始连接到所有服务器地址达到最大重连次数),延时一段时间再去尝试重连(大家可能会问为什么要去延时,直接连接不好吗?主要是因为如果连接失败的话,大多数情况下可能是客户端网络环境不好或者是服务端存在问题,延时是为了在下一个时间节点时网络恢复等,避免频繁连接,节约性能),直至连接成功为止。 -
toServer()
toServer()
主要是Netty框架进行TCP长连接的代码,比较简单。 -
connect() 连接及重连的所有逻辑,都放到
connect()
方法中进行。TCP
和WebSocket
的方式有细微的区别,下面主要以TCP
为例,至于WebSocket
的区别,稍后会列出来。
注:ims_kula
SDK中固定的TCP服务器地址的格式为:IP地址 端口号,例:192.168.0.1 8808,大家也可以根据自己的需求来定义格式。connect()
方法大体逻辑如下:- 判断IMS是否已关闭或网络是否不可用,若满足两个条件的其中之一,即返回连接失败状态;
- 判断用户是否设置了服务器地址列表,若未设置,即返回连接失败状态;
- 若以上条件都未满足,也就是IMS未关闭,网络可用,并且服务器地址已设置,则初始化Bootstrap;
- 接着需要两个for循环,外层循环负责遍历服务器地址列表,取出每一个服务器地址;内层循环负责遍历用户设置的最大重连次数,默认为3次,加上连接所需的一次,也就是说在不设置最大重连次数的情况下,
ims_kula
SDK会对每个服务器地址进行 4 次连接。同时,重连间隔时间为 reconnectInterval + reconnectInterval / 2 * n,也就是如果设置重连间隔时间为8000ms,那么第二次重连间隔时间将为12000ms,第三次为16000ms,以此类推; - 获取服务器地址后,对地址进行字符串分割,分别获取host和port;
- 接着调用Netty连接TCP的方法(
toServer(String host, int port)
)进行连接即可。
注:WebSocket
连接方式与TCP
大同小异,唯一的区别就是WebSocket
的服务器地址格式与TCP
不同,ws://IP地址:端口号/websocket,例:ws://192.168.0.1:8808/websocket,所以WebSocket
获取host和port代码如下:
(伪代码,具体代码可见NettyWebSocketReconnectTask
)
URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();
至于其余连接及重连部分代码,WebSocket
与TCP
是一致的,因为WebSocket
本身就是基于TCP
协议作一层封装。
何时重连及断开连接
首先明确一下, 重连是相对于客户端来说的,服务端不存在主动连接;断开连接是相对于服务端来说的,严格来说是移除响应的Channel。
客户端重连时机:
- 网络切换
- 断开网络连接
- 可感知的服务端异常
- 心跳超时等
服务端断开连接时机:
- 可感知的客户端异常
- 心跳超时
- 同一IP的客户端重复连接等
不知道大家注意到没有,上述的客户端重连时机和服务端断开连接时机,都分别有一个可感知异常。什么是可感知异常?也就是无论客户端还是服务端,在对方断开连接的时候,可以感知到,就是可感知异常。
经测试,在双方建立连接成功的状态下,对于客户端来说,如果服务端手动停止服务,Netty会回调exceptionCaught()
方法,如下:
服务端直接关机或者拔网线时,客户端无法感知,需要利用心跳超时机制进行重连。
同理,对于服务端来说,如果客户端手动杀死进程,Netty会回调channelInactive()
方法,如下:
客户端直接关机或者断网时,服务端无法感知,同样需要利用心跳机制进行断开客户端连接(移除channel)。
注:利用心跳超时机制进行重连及断开连接会在后续文章讲解,本篇文章主要讲解连接及重连,就不在此展开了。
效果展示
考虑到GIF图片体积过大,暂时先把连接超时时间和重连间隔时间适当缩短,下面展示几种情况下的客户端连接变化:
-
正常连接
-
客户端主动断网重连
-
服务端停止服务重连
注:以上GIF图,客户端与服务端建立连接成功时,会显示“消息”字样,否则会显示连接状态。
客户端日志如下:
服务端日志如下:
由于客户端杀死进程及服务端主动停止服务,日志会清空,所以就不贴更详细的日志了,感兴趣的同学可以pull代码自行验证。
写在最后
通过以上代码实现,如果不考虑长连接稳定性的情况下(未加入心跳超时重连逻辑),已经可以进行客户端与服务端消息的收发,本文主要讲解连接及重连模块,所以暂未加入消息收发功能。
在下一篇文章中,将会讲解TCP/WebSocket的拆包与粘包处理,由于Netty已封装了各种不同的消息编解码器,所以如果使用我定义的消息格式,拆包与粘包的处理将会很简单,直接拿来用即可。考虑到大家可能有不同业务协议的需求,所以会加入自定义协议的消息编解码器的实现,敬请期待。
相关代码已提交Github,需要自取:
PS:新开的公众号不能留言,如果大家有不同的意见或建议,可以到掘金上评论或者加到QQ群:1015178804,如果群满人的话,也可以在公众号给我私信,谢谢。
贴上公众号:
FreddyChen
下篇文章见,古德拜~ ~ ~