消息推送简介
消息实时推送可使得用户及时获取相关的信息,增加工作的处理效率,获得更好的交互体验。消息推送的应用场景举例:
- 新闻消息
- 好友信息
- 待办提醒
- 提交成功提示
工程中集成消息推送模块可以使用第三方现有消息架构,可参考之前文章:# 我悟了!这就是消息推送啊 也可以自研利用底层协议推送,下面是个自己实现的简单推送例子,使用netty-socketio框架,原生开发了消息推送的功能。
netty-socketio
netty-socketio是一个开源的Socket.io服务器端的一个java的实现,它基于Netty框架,可用于服务端推送消息给客户端。说到服务端推送技术,一般会涉及WebSocket,WebSocket是HTML5最新提出的规范,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况,为了兼容所有浏览器,给程序员提供一致的编程体验,SocketIO将WebSocket、AJAX和其它的通信方式全部封装成了统一的通信接口,也就是说,使用SocketIO时不用担心兼容问题,底层会自动选用最佳的通信方式。
github地址:github.com/mrniko/nett…
netty-socketio连接流程图:
服务端
使用springBoot集成netty-socketio,简单示例如下:
引入maven依赖
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.18</version>
</dependency>
推送代码
- 用户连接认证
@Slf4j
@Component
public class NettySocketIOAuthorizationListener implements AuthorizationListener{
@Autowired
private AuthenticationService authenticationService;
@Override
public boolean isAuthorized(HandshakeData handshakeData) {
log.info("SocketIO认证开始,传入参数: {}", JSON.toJSONString(handshakeData));
log.info("SocketIO认证地址: {}", handshakeData.getUrl());
// 获取认证参数
String userId = StringUtils.defaultString(handshakeData.getHttpHeaders().get("user_id"), handshakeData.getSingleUrlParam("user_id"))
boolean authResult = true;
try {
String userName = authenticationService.authenticate(userId);
if(StringUtils.isEmpty(userName)){
log.info("SocketIO认证失败: userId[{}]未通过认证", userId);
authResult = false;
}
} catch (Exception e) {
log.error("SocketIO认证失败: userId[{}]未通过认证,{}", userId, e.getMessage());
authResult = false;
}
log.info("SocketIO认证结束");
return authResult;
}
}
- 连接处理器
/**
* 连接处理器
*/
@Slf4j
@Component
public class ConnectHandler {
private final SocketIONamespace socketIONamespace;
/**
* 自动注入SockerIOServer
* @param socketIOServer
*/
@Autowired
public ConnectHandler(SocketIOServer socketIOServer) {
this.socketIONamespace = socketIOServer.getNamespace("test");
this.socketIONamespace.addConnectListener(onConnect());
this.socketIONamespace.addDisconnectListener(onDisconnect());
socketIOServer.addPingListener(onPing());
}
private PingListener onPing() {
return client -> {
log.info("心跳处理开始");
};
}
/**
* 使用userId建立连线
*/
private void userIdConnect(SocketIOClient socketIOClient, String sessionId, String userId,String roomId){
String userId = connectBean.getUserId();
String userName = connectBean.getUserName();
ConnectContentResponseModel connectContentResponseModel = new ConnectContentResponseModel(userName, connectBean.getImageUrl());
try {
ssaxConnectService.connect(socketIOClient, connectBean);
ConnectResponseModel connectResponseModel = new ConnectResponseModel(200, "建立连线成功", connectContentResponseModel);
if(StringUtils.isNotEmpty(roomId)){
socketIOClient.joinRoom(roomId);
connectResponseModel.setRoomId(roomId);
}
log.info("向客户端[{}]用户[{}]发送连线响应: {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
log.info("open_id[{}]客户端[{}]用户[{}]连线成功: 用户名[{}]", openId, sessionId, userId, userName);
} catch (Exception e) {
ConnectResponseModel connectResponseModel = new ConnectResponseModel(401, "建立连线失败", connectContentResponseModel);
if(StringUtils.isNotEmpty(roomId)){
connectResponseModel.setRoomId(roomId);
}
log.info("向客户端[{}]用户[{}]发送连线响应: {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
log.error("open_id[{}]客户端[{}]用户[{}]连线失败: {}", openId, sessionId, userId, e.getMessage());
}
log.info("SocketIO连线结束");
}
/** 连线倾听器,进行Socketio连线认证 **/
private ConnectListener onConnect() {
return client -> {
log.info("SocketIO连线开始");
HandshakeData handshakeData = client.getHandshakeData();
String sessionId = client.getSessionId().toString();
log.info("客户端[{}]连线[{}],传入参数: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
// 获取认证参数
String userId = HandshakeDataUtil.getParam(handshakeData, "token_id");
String roomId = HandshakeDataUtil.getParam(handshakeData, "room_id");
userIdConnect(client, sessionId, userId, roomId);
};
}
/** 断线倾听器,记录断线日志 **/
private DisconnectListener onDisconnect() {
return client -> {
log.info("SocketIO断线开始");
HandshakeData handshakeData = client.getHandshakeData();
String sessionId = client.getSessionId().toString();
log.info("客户端[{}]断线[{}],传入参数: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
client.disconnect();
log.info("SocketIO断线结束");
};
}
}
- 推送消息方法
public void onMessage(MessageModel messageModel) {
Collection<SocketIOClient> socketIOClientCollection = socketIOServer.getNamespace("/"+appId).getRoomOperations("test").getClients();
for(SocketIOClient socketIOClient : socketIOClientCollection){
socketIOClient.sendEvent("event_push", new AckCallback<>() {
@Override
public void onSuccess(Object obj) {
log.info("SocketIOClient onSuccess Status:{}",obj.getStatus());
}
}, messageModel);
}
}
- 推送消息实体
@Getter
@Setter
public class MessageModel implements Serializable {
private static final long serialVersionUID = -1L;
// 推送id
private String pushId;
// 消息id
private String msgId;
// 负载协议
private JSONObject payloadProtocol;
// 负载
private JSONObject payload;
// 消息标题
private String title;
// 消息内容
private String content;
// 图片
private String pic;
// 资源定位符
private String uri;
// 推送类型
private String type;
// options
private JSONObject options;
}
模拟客户端
- 真实客户端可以是web、安卓、IOS,接收到socketIO消息事件后,进行弹窗展示。
- 客户端测试代码
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
String url = "http://localhost:9092/test?user_id=112344094&room_id=test";
final Socket socket = IO.socket(url, options);
//网络事件
socket.on(Socket.EVENT_DISCONNECT, arg -> System.out.println("disconnect"));
socket.on(Socket.EVENT_ERROR, arg -> System.out.println("error"));
socket.on(Socket.EVENT_CONNECT, arg -> System.out.println("connected"));
//SIO连接成功事件
socket.on("event_connect", args -> {
System.out.println("服务器:连接成功事件: " + args[0]); //它是org.json.JSONObject类型的
JSONObject roomObject = new JSONObject();
roomObject.put("roomId", "test");
roomObject.put("eventType", "1");
socket.emit("event_room", roomObject);
});
socket.on("event_push", args -> {
System.out.println("服务器:收到event_push: " + args[0]);
});
}
测试发送接收
- 先启动服务端
- 启动客户端,连接到服务端
- 服务端发送消息
- 客户端接收到消息并打印
打印内容如下:
服务器:连接成功事件: event_connect
服务器:收到event_push: {"pushId":"61707444881e","payloadProtocol":{"type":5},"payload":{},"options":{},"msgId":"dd26fa76-14f4-4ccd-202110201643","title":"new test wwww","type":"bar","uri":"","content":"new test wwww"}
End
本文通过springboot引入netty-socketio实现了一个小小的消息推送功能,用的是原生的netty-socketio,为了方便简洁可以使用注解方式。 在测试客户端内,可以接到消息就算成功。 这个示例中没有任何业务逻辑,真实的推送案例还需要更多业务逻辑考虑,长连接处理、消息类型区分、推送失败重试机制等等。