后端与前端消息推送


简介

服务器可以主动向客户端推送信息,客户端即时收到消息,比如微信消息,系统消息,通知消息等等。

方式

短轮询

指在特定的的时间间隔(如每10秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。浏览器做处理后进行显示。无论后端此时是否有新的消息产生,都会进行响应。

setInterval(() => {
  // 方法请求
  queryMessage().then((res) => {
      if (res.code === 200) {
          this.messageData = res.data
      }
  })
}, 1000);

短轮询实现简单,效果还可以,缺点是由于推送数据并不会频繁变更,无论后端此时是否有新的消息产生,浏览器都会进行请求,服务器接到请求后马上返回响应信息并关闭连接,势必对服务端造成很大的压力,浪费带宽和服务器资源。

  • 优点:后端程序编写比较容易,适于小型应用。
  • 缺点:请求中有大半是无用,浪费带宽和服务器资源。

长轮询

长轮询是对短轮询的一种改进,在尽可能减少对服务器资源的浪费的同时保证消息的相对实时性。如Nacos、Kafka、RocketMQ中都有用到长轮询。

  • 长轮询是指客户端会一直向服务端发起请求,适用与服务端向客户端推送数据使用。长轮询要满足以下几点: 客户端发起请求后,当服务端业务没有数据时,不会立即返回空值,而是hold住连接,等待数据生成后立即返回。

  • 请求在服务端有超时时间,不会一直hold住。当超时后,服务端会返回超时信息,客户端收到返回后会再次发起请求。

  • 每次请求结束后,客户端会再次发起请求。

  • 优点:在无消息的情况下不会频繁的请求,相对短轮询较实时。

  • 缺点:与服务器一直连接会消耗资源

SSE(Server-Sent Events)

SSE(Server-Sent Events)是一种基于HTTP的推送技术,可以实现服务器向客户端主动发送消息。相比WebSocket,SSE更加轻量级、易于使用,并且能够兼容老旧浏览器。

在SSE中,客户端通过EventSource对象与服务器建立连接,并监听来自服务器的事件。当有新的事件发生时,服务器会将数据以文本流的形式发送给客户端,在接收到数据后,客户端可以根据需要进行处理。

  • 优点:
  1. 实现简单:只需一个HTTP连接即可完成推送功能;
  2. 兼容性好:支持大部分现代浏览器和移动设备;
  3. 轻量级:相对于WebSocket而言,最大的优势是便利,服务端不需要其他的类库,开发难度较低,传输开销较小。
  4. SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
  • 缺点:
    如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数

SSE 规范:在 html5 的定义中,服务端 SSE,一般需要遵循以下要求:

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

代码实现

前端 html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (!!window.EventSource) {

        // 建立连接
        source = new EventSource('http://127.0.0.1:6602/sse/createSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://localhost:6602/sse/closeConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

后端:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
 * SSE长链接
 *
 * @author longshi
 * @date 2021/12/13
 */
@RestController
@RequestMapping("/sse")
@Slf4j
public class SseEmitterController {

    /**
     * 容器,保存连接,用于输出返回
     */
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    private static final String CLIENT_ID = "test";
    private static final String TASK_RESULT = "t_restult";

    /**
     * 创建SSE长链接
     *
     * @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
     * @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
     **/
    @GetMapping("/createSseConnect")
    @CrossOrigin
    public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 是否需要给客户端推送ID
        if (StringUtils.isBlank(clientId)) {
            clientId = UUID.randomUUID().toString();
        }
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(clientId));
        sseCache.put(clientId, sseEmitter);
        log.info("创建新的sse连接,当前用户:{}", clientId);

        try {
            sseEmitter.send(SseEmitter.event().id(CLIENT_ID).data(clientId));
        } catch (IOException e) {
            log.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
            throw new RuntimeException("创建连接异常!", e);
        }
        return sseEmitter;
    }

    /**
     * 关闭SSE连接
     *
     * @param clientId 客户端ID
     **/
    @CrossOrigin
    @GetMapping("/closeSseConnect")
    public Object closeSseConnect(String clientId) {
        this.closeSseConnect(clientId);
        return "{\"code\":\"200\"}";
    }

    /**
     *  模拟后端推送消息
     * @date 2022/12/16 10:52
     * @param clientId
     * @return java.lang.Object
     */
    @GetMapping("/pushMsg")
    public Object pushMsg(String clientId) {
        JSONObject msg = new JSONObject();
        msg.put("msg","this is push msg !");
        sendMsgToClient(msg);
        return "{\"code\":\"200\"}";
    }



    // 根据客户端id获取SseEmitter对象

    public SseEmitter getSseEmitterByClientId(String clientId) {
        return sseCache.get(clientId);
    }

    // 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息

    public void sendMsgToClient(JSONObject msg) {

        for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
            sendMsgToClientByClientId(entry.getKey(), msg, entry.getValue());
        }
    }

    /**
     * 推送消息到客户端
     * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
     *
     * @param clientId               客户端ID
     * @param msg 推送信息,此处结合具体业务,定义自己的返回值即可
     **/
    private void sendMsgToClientByClientId(String clientId, JSONObject msg, SseEmitter sseEmitter) {
        if (sseEmitter == null) {
            log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
                    clientId, msg.toString());
            return;
        }

        SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(TASK_RESULT).data(msg, MediaType.APPLICATION_JSON);
        try {
            sseEmitter.send(sendData);
        } catch (IOException e) {
            // 推送消息失败,记录错误日志,进行重推
            log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", msg.toString(), e);
            boolean isSuccess = true;
            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000);
                    sseEmitter = sseCache.get(clientId);
                    if (sseEmitter == null) {
                        log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                        continue;
                    }
                    sseEmitter.send(sendData);
                } catch (Exception ex) {
                    log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
                    continue;
                }
                log.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, msg.toString());
                return;
            }
        }
    }

    /**
     * 长链接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId 客户端ID
     * @return java.lang.Runnable
     **/
    private Runnable completionCallBack(String clientId) {
        return () -> {
            log.info("结束连接:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 连接超时时调用
     *
     * @param clientId 客户端ID
     * @return java.lang.Runnable
     **/
    private Runnable timeoutCallBack(String clientId) {
        return () -> {
            log.info("连接超时:{}", clientId);
            removeUser(clientId);
        };
    }

    /**
     * 推送消息异常时,回调方法
     *
     * @param clientId 客户端ID
     * @return java.util.function.Consumer<java.lang.Throwable>
     * @author re
     * @date 2021/12/14
     **/
    private Consumer<Throwable> errorCallBack(String clientId) {
        return throwable -> {
            log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);

            // 推送消息失败后,每隔10s推送一次,推送5次
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000);
                    SseEmitter sseEmitter = sseCache.get(clientId);
                    if (sseEmitter == null) {
                        log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
                        continue;
                    }
                    sseEmitter.send("失败后重新推送");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
    }

    /**
     * 移除用户连接
     *
     * @param clientId 客户端ID
     **/
    private void removeUser(String clientId) {
        sseCache.remove(clientId);
        log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
    }


}

nginx转发配置

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;

proxy_set_header Connection;
chunked_transfer_encoding off;
proxy_cache off;

WebSocket

WebSocket Stomp是一种协议,用于在Web浏览器和服务器之间进行实时通信。它结合了WebSocket协议和Stomp(Simple Text Oriented Messaging Protocol)协议的优点。

WebSocket是一种基于TCP的网络协议,允许双向通信。与HTTP不同,它支持长连接,并且可以在客户端和服务器之间传输任意数据类型。

Stomp是一种简单的文本消息传递协议,用于在应用程序之间发送消息。它提供了一个可靠的、异步的、分布式系统中使用消息队列进行通信的方法。

通过将这两个协议结合起来,WebSocket Stomp可以实现高效、可靠、实时地传输数据。它广泛应用于在线聊天、多人游戏等需要快速响应和实时更新数据的场景中。

服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。

Websocket Stomp

具体实现步骤:

  1. 整合依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  1. 配置类

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.util.List;

//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个STOMP的endpoint,并指定使用SockJS协议
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客户端发送消息的请求前缀
        registry.setApplicationDestinationPrefixes("/app");
        //客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送
        registry.enableSimpleBroker("/topic", "/user");
        //服务端通知客户端的前缀,可以不设置,默认为user
        registry.setUserDestinationPrefix("/user");
    }
    /**
     * 拦截器方式2
     *
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {

                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                //1、判断是否首次连接
                if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
                    //2、判断token
                    List<String> nativeHeader = accessor.getNativeHeader("Authorization");
                    if (nativeHeader != null && !nativeHeader.isEmpty()) {
                        String token = nativeHeader.get(0);
                        if (StringUtils.isNotBlank(token)) {
                            log.info(token);
                            //todo,通过token获取用户信息,下方用loginUser来代

                            return message;
                        }
                    }
                    return null;
                }
                //不是首次连接,已经登陆成功
                return message;
            }
        });
    }
}
  1. 后端推送给前端消息
    • 订阅点对点、广播

后端推送消息配置类


@Autowired
   private SimpMessagingTemplate simpMessagingTemplate;
   
   /**
   * @MethodName: sendToUser
    * @Description: 推送消息给订阅用户
    * @Param: [userId]
   **/
   public void sendToUser(String userId, String message){
       //这里可以控制权限等。。。
       simpMessagingTemplate.convertAndSendToUser(userId,"/listener", message);
   }

   /**
   * @MethodName: sendBroadCast
    * @Description: 发送广播,需要用户事先订阅广播
    * @Param: [topic, msg]
    * @Return: void
   **/
   public void sendBroadCast(String topic,String msg){
       simpMessagingTemplate.convertAndSend(topic,msg);
   }
   

后端接收前端信息


@RestController
public class StompSocketController {
 @Autowired
  private SimpMessagingTemplate simpMessagingTemplate; // 发送消息

   /**
     * @MethodName: subscribeMapping
     * @Description: 订阅成功通知
     * @Param: [id]
     * @Return: void
     **/
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>用户:"+id +",已订阅");
        SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
        sendToUser(param);
    }

 /**
    * @MethodName: test
     * @Description: 接收订阅topic消息
     * @Param: [id, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("收到客户端:" +id+",的消息");
        SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
        sendToUser(param);
    }
 /**
    * @MethodName: sendToUser
     * @Description: 推送消息给订阅用户
     * @Param: [userId]
     * @Return: void
    **/
    private void sendToUser(SubscribeMsg screenChangeMsg){
        //这里可以控制权限等。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }


 /**
     * @ClassName: SubMsg
    **/
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }

}
  1. 前端代码
  • 前端样例
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>websocket stomp</title>
  <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>

  <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>

    <script type="text/javascript">

      // 定义全局变量 stomp socket
      var stompClient,socket;

      $(document).ready(function () {
        if (window.WebSocket){
          websocketConfig();
        } else {
          alert("错误","浏览器不支持websocket技术通讯.");
        }
      });

      // websocket 配置
      function websocketConfig() {
        /*
         * 1. 连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置
         * 2. SockJS 所处理的URL 是 "http://" 或 "https://" 模式,而不是 "ws://" or  "wss://"
         */
        socket = new SockJS("http://127.0.0.1:6602/websocket");

        // 通过sock对象监听每个事件节点,非必须,这个必须放在stompClient的方法前面
        sockHandle();

        // 获取 STOMP 子协议的客户端对象
        stompClient = Stomp.over(socket);

        /*
         * 1. 获取到stomp 子协议后,可以设置心跳连接时间,认证连接,主动断开连接
         * 2,连接心跳有的版本的stomp.js 是默认开启的,这里我们不管版本,手工设置
         * 3. 心跳是双向的,客户端开启心跳,必须要服务端支持心跳才行
         * 4. heartbeat.outgoing 表示客户端给服务端发送心跳的间隔时间
         * 5. 客户端接收服务端心跳的间隔时间,如果为0 表示客户端不接收服务端心跳
         */
        stompClient.heartbeat.outgoing = 10000;
        stompClient.heartbeat.incoming = 1;

        /*
         * 1. stompClient.connect(headers, connectCallback, errorCallback);
         * 2. headers表示客户端的认证信息,多个参数 json格式存,这里简单用的httpsessionID,可以根据业务场景变更
         *    这里存的信息,在服务端StompHeaderAccessor 对象调用方法可以取到
         * 3. connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法;
         *    errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;
         */
        var headers = {"Authorization":"123232"};

        stompClient.connect(headers,function (frame) {

          console.log('Connected:' + frame);

          /*
           * 1. 订阅服务,订阅地址为服务器Controller 中的地址
           * 2. 如果订阅为公告,地址为Controller 中@SendTo 注解地址
           * 3. 如果订阅为私信,地址为setUserDestinationPrefix 前缀+@SendToUser注解地址
           *    或者setUserDestinationPrefix 前缀 + controller的convertAndSendToUser地址一致
           * 4. 这里演示为公告信息,所有订阅了的用户都能接受
           */
          stompClient.subscribe("/user/00145/listener",function (message) {

            console.log("接收到信息:" + message);

          });

          /*
           * 1. 因为推送为私信,必须带上或者setUserDestinationPrefix前缀 /user
           * 2. 演示自己发送给自己,做websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,
           *    别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
           */
          stompClient.subscribe('user/00145/listener',function (message) {
            var msg = JSON.parse(message.body).msg;
            console.log("接收到私信信息SendToUser:" + msg);
            alert("接收到私信信息SendToUser:" + msg);
          });

          /*
           * 1. 订阅点对点消息
           * 2. 很多博文这里的路径会写成"/user/{accountId}/userTest/callBack”这种,是因为
           *    @SendToUser发送的代理地址是 /userTest/callBack, 地址将会被转化为 /user/{username}/userTest/callBack
           *    username,为用户的登录名,也是就是Principal或者本文中的WebSocketUserAuthentication对象getName获取的参数
           *    如果在拦截器中配置了认证路径,可以不带参数,不过推荐用带参数的写法
           *
           */
          stompClient.subscribe('${socketUser}/userTest/callBack',function (message) {

            var msg  = message.body;
            console.log("接收到点对点SendToUser:" + msg);
            alert("接收到点对点SendToUser:" + msg);
          });

        }, function (error) {
          console.log('STOMP: ' + error);
          //setTimeout(websocketConfig, 10000);
          console.log('STOMP: Reconnecting in 10 seconds');
        });
      }

      // 发送公告消息
      function sendMsg() {
        var msg = $("#message").val();
        var data ={"msg":msg};

        /**
         *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
         *  2. headers 为发送信息的header,json格式,JavaScript 对象,
         *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
         *  3. body 为发送信息的 body,字符串,可选参数
         */
        stompClient.send('${socketPrefix + "/sendChatMsg/" + groupId}',{},JSON.stringify(data));
      }

      // 发送给自己
      function sendMsgOwn() {
        var msg = $("#message").val();
        var data ={"msg":msg};

        /**
         *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
         *  2. headers 为发送信息的header,json格式,JavaScript 对象,
         *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
         *  3. body 为发送信息的 body,字符串,可选参数
         */
        stompClient.send("${socketPrefix}/sendChatMsgByOwn",{},JSON.stringify(data));
      }

      // 发送点对点消息
      function sendMsgById() {
        var msg = $("#message").val();
        var accountId = $("#accountId").val();
        var data ={"msg":msg};

        /**
         *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
         *  2. headers 为发送信息的header,json格式,JavaScript 对象,
         *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
         *  3. body 为发送信息的 body,字符串,可选参数
         *  4. accountId这个参数其实可以通过header传过去,不过因为是restful风格,所以就跟在url上
         */
        stompClient.send("${socketPrefix}/sendChatMsgById/" + accountId,{},JSON.stringify(data));
      }

      // 通过sock对象监听每个事件节点,非必须,这里开启了stomp的websocket 也不会生效了
      function sockHandle() {

        // 连接成功后的回调函数
        socket.onopen = function () {
          console.log("------连接成功------");
        };

        // 监听接受到服务器的消息
        socket.onmessage = function (event) {
          console.log('-------收到的消息: ' + event.data);
        };

        // 关闭连接的回调函数
        socket.onclose = function (event) {
          console.log('--------关闭连接: connection closed.------');
        };

        // 连接发生错误
        socket.onerror = function () {
          alert("连接错误", "网络超时或通讯地址错误.");
          disconnect();
        } ;
      }

      // 关闭websocket
      function disconnect() {
        if (socket != null) {
          socket.close();
          socket = null;
        }
      }

    </script>
  </head>
<body>
<div>
  <div>我的 </div>
  <span>消息</span>
  <input type="text" id="message" name="message" />
  <input type="button" id="sendMsg" name="sendMsg" value="发送公告" onclick="sendMsg();" />
  <input type="button" id="sendMsgOwn" name="sendMsgOwn" value="自己给自己推送" onclick="sendMsgOwn();" />
  <br/>
  <span>接收人</span>
  <input type="text" id="accountId" name="accountId" />
  <input type="button" id="sendMsgById" name="sendMsgById" value="点对点消息" onclick="sendMsgById();" />
  <br/>
  <input type="button" value="断开" onclick="disconnect();" />
</div>

</html>
  1. Nginx 配置websocket

http 中添加

######websocket start#########
map `$http_upgrade $`connection_upgrade {
default upgrade;
''   close;
}

######websocket

server 中添加
location = '/websocket' {
proxy_pass <http://127.0.0.1:6602>;
proxy_read_timeout 300s;
proxy_http_version 1.1;
proxy_set_header  X-Real-IP `$remote_addr;
     proxy_set_header  X-Forwarded-For $`remote_addr;
proxy_set_header Upgrade `$http_upgrade;
     proxy_set_header Connection $`connection_upgrade;
}
  1. 网关负载均衡
    路由配置
server:
port: 6602
spring:
cloud:
gateway:
httpclient:
response-timeout: 180s
connect-timeout: 180000
routes:
- id: websocket-message
uri: lb://message #服务名
predicates:
- Path=/websocket/info/**
filters:
- StripPrefix=0
- id: websocket-api
uri: lb:ws://message
predicates:
- Path=websocket/**
filters:
- StripPrefix=0

    #无注册中心,配置静态服务IP健康检查
    discovery:
      client:
        simple:
          instances:  ## 服务发现实力列表,无注册中心服务实例的地址,新增或删除服务需要手动维护这个列表
            message[0]:
              uri: http://192.168.1.100:8080
            message[1]:
              uri: http://192.168.1.101:8080

    loadbalancer:
      service-discovery:
        timeout: 9000
      configurations: health-check # 启用health-check
      ## 全局
      health-check:
        initial-delay: 3s
      clients:  ## 服务发现无注册中心服务的实例(实例列表)健康检查
        message: # 单个服务指定服务名
          health-check:
            interval: 30s
            initial-delay: 1s
            refetch-instances-interval: 5s
            path:
              message: /actuator/health # 消息服务的监控检查URL
  1. 分布式情况下消息推送场景

当服务器要通过 WebSocket 发送消息时,该消息首先发送到 STOMP 代理。此 STOMP 代理将传入消息发送到与其连接的所有实例。
这个消息代理例如RabbitMQ、ActiveMQ等

后端主动推送给用户消息,由于负载均衡,后端多个服务,websocket是长连接,连接的应用多个实例中的某一个,所以推送的时候可以结合MQ消息队列的广播模式,推送给消息模块,再由消息模块发送给前端。

链路:
app -> nginx > api gateway(负载均衡) > message service(多个服务)

a.MQ-广播结果信息给消息服务,(不知道具体服务与前端长链接)
b.消息服务(2台)推送消息给前端
c.其中一台已经和前端建立了长链接,推送成功

Netty Socket.IO

Netty Socket.IO 是基于 Netty 框架实现的一个 WebSocket 库,它提供了对 Socket.IO 协议的支持。下面是 Netty Socket.IO 的具体实现:

  1. 客户端连接:客户端通过 WebSocket 连接到服务器,建立长连接。

  2. 握手协议:握手协议是在客户端和服务器之间进行的一次通信过程,在这个过程中,双方会交换一些信息以确保彼此都能够理解对方所使用的协议版本、编码方式等。

3.事件监听器注册:Netty Socket.IO 提供了多种事件类型,例如连接成功、断开连接、收到消息等。开发者可以通过注册相应的事件监听器来处理这些事件。

  1. 数据传输:Netty Socket.IO 支持两种数据传输方式:文本和二进制。开发者可以根据需要选择合适的传输方式。

  2. 命名空间管理:Socket.IO 允许创建多个命名空间(namespace),每个命名空间都有自己独立的房间(room)。Netty Socket.IO 提供了命名空间管理功能,允许用户创建、删除或修改命名空间及其相关配置。

  3. 房间管理:房间是指在某个命名空间内共享同一个标识符(ID)的客户端集合。Netty Socket.IO 支持动态加入或离开房间,并且可以向房间内的所有客户端广播消息。

  4. 跨域支持:Netty Socket.IO 支持跨域访问,允许客户端从不同的域名或 IP 地址连接到服务器。开发者可以通过配置来控制跨域策略。

总之,Netty Socket.IO 是一个功能强大、易于使用的 WebSocket 库,它提供了完整的实现和丰富的功能,为开发者提供了便捷、高效、可靠的数据传输方案。

具体实现步骤如下:

  1. 创建Spring Boot项目并添加Netty Socket.IO依赖在pom.xml文件中添加以下依赖:
<dependency>
 <groupId>com.corundumstudio.socketio</groupId>
 <artifactId>netty-socketio</artifactId>
 <version>1.7.17</version>
</dependency>
  1. 配置Netty Socket.IO服务器创建一个配置类,配置Netty Socket.IO服务器,并将其注入到Spring容器中。
@Configuration
public class NettySocketIOConfig {

  @Bean 
  public SocketIOServer socketIOServer() {
      Configuration config = new Configuration();
      config.setHostname("localhost");
      config.setPort(8080);

      return new SocketIOServer(config);
  }
}
  1. 编写Socket.IO事件处理器编写处理Socket.IO事件的代码,并将其注册到服务器上。
@Component
public class ChatEventHandler {
 @Autowired 
 private SocketIOServer server;

 @OnConnect 
 public void onConnect(SocketIOClient client) { 
    System.out.println("Client connected: " + client.getSessionId()); 
    server.getBroadcastOperations().sendEvent("message", "A new user has joined the chat.");
   }

 @OnDisconnect 
 public void onDisconnect(SocketIOClient client) { 
  System.out.println("Client disconnected: " + client.getSessionId()); 
  server.getBroadcastOperations().sendEvent("message", "A user has left the chat."); }

 @OnEvent("chat")
  public void onChat(SocketIOClient client, ChatMessage message) { 
    System.out.println("Received message: " + message.getContent()); 
    server.getBroadcastOperations().sendEvent("chat", message); }
}
  1. 启动服务器在Spring Boot应用程序的main方法中启动Netty Socket.IO服务器。

@SpringBootApplication
public class Application {
 public static void main(String[] args) {
      SpringApplication.run(Application.class, args);
      // Start Netty Socket.IO server ApplicationContext context = new AnnotationConfigApplicationContext(NettySocketIOConfig.class);
      SocketIOServer server = context.getBean(SocketIOServer.class);
      server.start();
 }
}

现在可以使用WebSocket或Socket.IO协议来实时通信了。例如,在JavaScript中,你可以使用以下代码连接到服务器并发送消息:


var socket = io.connect('http://localhost:8080');
socket.on('connect', function() {
 console.log('Connected to server');
});

socket.emit('chat', { content: 'Hello world!' });

当有新用户加入聊天室或发送消息时,将会触发相应的事件处理器,并向所有客户端广播相关信息。

ReactiveStream

ReactiveStream 是一种标准化的异步流处理 API,旨在解决异步编程中的背压问题。它提供了一组接口和规范,使得不同的库可以互相兼容,并且可以在一个流中使用多个库进行数据处理。

ReactiveStream 的核心概念是 Publisher、Subscriber 和 Subscription。Publisher用于发布数据流,Subscriber 订阅并消费这些数据,Subscription 则负责管理订阅关系和控制背压。

通过 ReactiveStream 规范定义的接口和方法,开发者可以方便地创建自己的 Publisher 和 Subscriber 实现,并与其他遵循该规范的库进行交互。这样就能够实现基于事件驱动、响应式编程模型下高效可靠地异步处理大量数据。

目前已经有多个 Java 平台上常见框架(如 RxJava, Reactor, Akka Streams 等)支持 ReactiveStream 标准。同时也有其他语言平台开始出现对ReactiveStream 的支持(如 Kotlin Coroutines)。

ReactiveStream是一种基于流的异步编程模型,可以用来处理高并发、大数据量的场景。Spring Boot提供了对ReactiveStream的支持,使得开发者可以更加方便地使用这种编程模型。

具体实现如下:

  1. 引入相关依赖在pom.xml文件中引入以下依赖:
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-core</artifactId>
 <version>${reactor.version}</version>
</dependency>

<dependency>
 <groupId>io.projectreactor.addons</groupId>
 <artifactId>reactor-extra</artifactId>
 <version>${reactor.version}</version>
 </dependency>
  1. 编写Controller创建一个RestController类,并添加@RequestMapping注解和@GetMapping注解。
@RestController
public class UserController {

 @GetMapping("/users")
 public Flux<UserDTO> getUsers() {
 // TODO: 实现获取用户列表逻辑 }

}
  1. 使用Flux返回响应结果在getUsers方法中,我们使用Flux作为返回值类型。Flux表示一个包含多个元素的序列,在本例中就是用户列表。我们可以通过Mono.fromCallable方法从数据库或其他数据源中获取数据,并将其转换成Flux对象。
@GetMapping("/users")
public Flux<UserDTO> getUsers() {
 return Mono.fromCallable(() -> userService.getUsers())
 .flatMapMany(Flux::fromIterable)
 .map(user -> new UserDTO(user.getId(), user.getName()));
}

在上面的代码中,我们首先使用Mono.fromCallable方法获取用户列表,并将其转换成Flux对象。然后通过flatMapMany方法将Flux对象扁平化为一个包含多个UserDTO元素的Flux流。

  1. 添加异常处理由于异步编程模型存在很多不确定性,因此需要对可能出现的异常进行处理。Spring Boot提供了@ExceptionHandler注解来捕获全局异常或指定类型的异常。
@RestControllerAdvice
public class GlobalExceptionHandler {
 @ExceptionHandler(Exception.class)
 public Mono<ResponseEntity<String>> handleException(Exception e) {
 log.error("Unexpected error occurred: {}", e.getMessage());
 return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage()));
 }

}

在上面的代码中,我们定义了一个全局异常处理器,用于捕获所有未被其他Handler拦截到的异常。当发生未知错误时,该方法会返回500状态码和错误信息。

  1. 配置Webflux服务器最后,在应用程序启动类中添加@EnableWebFlux注解并配置Webflux服务器:
@SpringBootApplication
@EnableWebFlux
public class Application {

 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }

 @Bean 
 public HttpServer httpServer() {
 return HttpServer.create().port(8080);
 }
}

以上就是ReactiveStream Spring Boot具体实现过程。


文章作者: weilongshi
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 weilongshi !
  目录