传统手艺实现一个消息推送吧!

 2023-02-07
原文作者:斗笠小熊猫 原文地址:https://juejin.cn/post/7021507447474880526

消息推送简介

消息实时推送可使得用户及时获取相关的信息,增加工作的处理效率,获得更好的交互体验。消息推送的应用场景举例:

  • 新闻消息
  • 好友信息
  • 待办提醒
  • 提交成功提示

工程中集成消息推送模块可以使用第三方现有消息架构,可参考之前文章:# 我悟了!这就是消息推送啊 也可以自研利用底层协议推送,下面是个自己实现的简单推送例子,使用netty-socketio框架,原生开发了消息推送的功能。

netty-socketio

netty-socketio是一个开源的Socket.io服务器端的一个java的实现,它基于Netty框架,可用于服务端推送消息给客户端。说到服务端推送技术,一般会涉及WebSocket,WebSocket是HTML5最新提出的规范,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况,为了兼容所有浏览器,给程序员提供一致的编程体验,SocketIO将WebSocket、AJAX和其它的通信方式全部封装成了统一的通信接口,也就是说,使用SocketIO时不用担心兼容问题,底层会自动选用最佳的通信方式。

github地址:github.com/mrniko/nett…

netty-socketio连接流程图:

202212302204549141.png

服务端

使用springBoot集成netty-socketio,简单示例如下:

引入maven依赖
    <dependency>
         <groupId>com.corundumstudio.socketio</groupId>
         <artifactId>netty-socketio</artifactId>
         <version>1.7.18</version>
    </dependency>
推送代码
  • 用户连接认证
    @Slf4j
    @Component
    public class NettySocketIOAuthorizationListener implements AuthorizationListener{
        @Autowired
        private AuthenticationService authenticationService;
    
        @Override
        public boolean isAuthorized(HandshakeData handshakeData) {
            log.info("SocketIO认证开始,传入参数: {}", JSON.toJSONString(handshakeData));
            log.info("SocketIO认证地址: {}", handshakeData.getUrl());
    
            // 获取认证参数
            String userId = StringUtils.defaultString(handshakeData.getHttpHeaders().get("user_id"), handshakeData.getSingleUrlParam("user_id"))
            boolean authResult = true;
           
            try {
                String userName = authenticationService.authenticate(userId);
                if(StringUtils.isEmpty(userName)){
                    log.info("SocketIO认证失败: userId[{}]未通过认证", userId);
                    authResult = false;
                }
            } catch (Exception e) {
                log.error("SocketIO认证失败: userId[{}]未通过认证,{}", userId, e.getMessage());
                authResult = false;
            }
           
            log.info("SocketIO认证结束");
            return authResult;
        }
    }
  • 连接处理器
    
    /**
     * 连接处理器
     */
    @Slf4j
    @Component
    public class ConnectHandler {
        private final SocketIONamespace socketIONamespace;
    
    
        /**
         * 自动注入SockerIOServer
         * @param socketIOServer
         */
        @Autowired
        public ConnectHandler(SocketIOServer socketIOServer) {
            this.socketIONamespace = socketIOServer.getNamespace("test");
            this.socketIONamespace.addConnectListener(onConnect());
            this.socketIONamespace.addDisconnectListener(onDisconnect());
            socketIOServer.addPingListener(onPing());
        }
    
        private PingListener onPing() {
            return client -> {
                log.info("心跳处理开始");
                
            };
        }
    
       
        /**
         * 使用userId建立连线
         */
        private void userIdConnect(SocketIOClient socketIOClient, String sessionId, String userId,String roomId){
    
            String userId = connectBean.getUserId();
            String userName = connectBean.getUserName();
            ConnectContentResponseModel connectContentResponseModel = new ConnectContentResponseModel(userName, connectBean.getImageUrl());
            try {
                ssaxConnectService.connect(socketIOClient, connectBean);
                ConnectResponseModel connectResponseModel = new ConnectResponseModel(200, "建立连线成功", connectContentResponseModel);
                if(StringUtils.isNotEmpty(roomId)){
                    socketIOClient.joinRoom(roomId);
                    connectResponseModel.setRoomId(roomId);
                }
                log.info("向客户端[{}]用户[{}]发送连线响应: {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
                socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
                log.info("open_id[{}]客户端[{}]用户[{}]连线成功: 用户名[{}]", openId, sessionId, userId, userName);
            } catch (Exception e) {
                ConnectResponseModel connectResponseModel = new ConnectResponseModel(401, "建立连线失败", connectContentResponseModel);
                if(StringUtils.isNotEmpty(roomId)){
                    connectResponseModel.setRoomId(roomId);
                }
                log.info("向客户端[{}]用户[{}]发送连线响应: {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
                socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
                log.error("open_id[{}]客户端[{}]用户[{}]连线失败: {}", openId, sessionId, userId, e.getMessage());
            }
            log.info("SocketIO连线结束");
        }
    
        /** 连线倾听器,进行Socketio连线认证 **/
        private ConnectListener onConnect() {
            return client -> {
                log.info("SocketIO连线开始");
                HandshakeData handshakeData = client.getHandshakeData();
                String sessionId = client.getSessionId().toString();
                log.info("客户端[{}]连线[{}],传入参数: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
                // 获取认证参数
                String userId = HandshakeDataUtil.getParam(handshakeData, "token_id");
                String roomId = HandshakeDataUtil.getParam(handshakeData, "room_id");
                userIdConnect(client, sessionId, userId, roomId);
            };
        }
    
        /** 断线倾听器,记录断线日志 **/
        private DisconnectListener onDisconnect() {
            return client -> {
                log.info("SocketIO断线开始");
                HandshakeData handshakeData = client.getHandshakeData();
                String sessionId = client.getSessionId().toString();
                log.info("客户端[{}]断线[{}],传入参数: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
                client.disconnect();
                log.info("SocketIO断线结束");
            };
        }
    }
  • 推送消息方法
    public void onMessage(MessageModel messageModel) {
      Collection<SocketIOClient> socketIOClientCollection = socketIOServer.getNamespace("/"+appId).getRoomOperations("test").getClients();
      
      for(SocketIOClient socketIOClient : socketIOClientCollection){					
          socketIOClient.sendEvent("event_push", new AckCallback<>() {
          	@Override
          	public void onSuccess(Object obj) {
          		log.info("SocketIOClient onSuccess  Status:{}",obj.getStatus());
          	}
          
          }, messageModel);
      }
    }
  • 推送消息实体
    @Getter
    @Setter
    public class MessageModel implements Serializable {
    	private static final long serialVersionUID = -1L;
    
    	// 推送id
    	private String pushId;
    
    	// 消息id
    	private String msgId;
    
    	// 负载协议
    	private JSONObject payloadProtocol;
    
    	// 负载
    	private JSONObject payload;
    
    	// 消息标题
    	private String title;
    
    	// 消息内容
    	private String content;
    
    	// 图片
    	private String pic;
    
    	// 资源定位符
    	private String uri;
    
    	// 推送类型
    	private String type;
    
    	// options
    	private JSONObject options;
    }

模拟客户端

  • 真实客户端可以是web、安卓、IOS,接收到socketIO消息事件后,进行弹窗展示。
  • 客户端测试代码
    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
       String url = "http://localhost:9092/test?user_id=112344094&room_id=test";
       final Socket socket = IO.socket(url, options);
    
       //网络事件
       socket.on(Socket.EVENT_DISCONNECT, arg -> System.out.println("disconnect"));
       socket.on(Socket.EVENT_ERROR, arg -> System.out.println("error"));
       socket.on(Socket.EVENT_CONNECT, arg -> System.out.println("connected"));
    
    
       //SIO连接成功事件
       socket.on("event_connect", args -> {
           System.out.println("服务器:连接成功事件: " + args[0]); //它是org.json.JSONObject类型的
           JSONObject roomObject = new JSONObject();
           roomObject.put("roomId", "test");
           roomObject.put("eventType", "1");
           socket.emit("event_room", roomObject);
       });
    
    
       socket.on("event_push", args -> {
           System.out.println("服务器:收到event_push: " + args[0]);
       });
    }

测试发送接收

  • 先启动服务端
  • 启动客户端,连接到服务端
  • 服务端发送消息
  • 客户端接收到消息并打印

打印内容如下:

    服务器:连接成功事件: event_connect
    服务器:收到event_push: {"pushId":"61707444881e","payloadProtocol":{"type":5},"payload":{},"options":{},"msgId":"dd26fa76-14f4-4ccd-202110201643","title":"new test wwww","type":"bar","uri":"","content":"new test wwww"}

End

本文通过springboot引入netty-socketio实现了一个小小的消息推送功能,用的是原生的netty-socketio,为了方便简洁可以使用注解方式。 在测试客户端内,可以接到消息就算成功。 这个示例中没有任何业务逻辑,真实的推送案例还需要更多业务逻辑考虑,长连接处理、消息类型区分、推送失败重试机制等等。

参考