简介
服务器可以主动向客户端推送信息,客户端即时收到消息,比如微信消息,系统消息,通知消息等等。
方式
短轮询
指在特定的的时间间隔(如每10秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。浏览器做处理后进行显示。无论后端此时是否有新的消息产生,都会进行响应。
setInterval(() => {
// 方法请求
queryMessage().then((res) => {
if (res.code === 200) {
this.messageData = res.data
}
})
}, 1000);
短轮询实现简单,效果还可以,缺点是由于推送数据并不会频繁变更,无论后端此时是否有新的消息产生,浏览器都会进行请求,服务器接到请求后马上返回响应信息并关闭连接,势必对服务端造成很大的压力,浪费带宽和服务器资源。
- 优点:后端程序编写比较容易,适于小型应用。
- 缺点:请求中有大半是无用,浪费带宽和服务器资源。
长轮询
长轮询是对短轮询的一种改进,在尽可能减少对服务器资源的浪费的同时保证消息的相对实时性。如Nacos、Kafka、RocketMQ中都有用到长轮询。
长轮询是指客户端会一直向服务端发起请求,适用与服务端向客户端推送数据使用。长轮询要满足以下几点: 客户端发起请求后,当服务端业务没有数据时,不会立即返回空值,而是hold住连接,等待数据生成后立即返回。
请求在服务端有超时时间,不会一直hold住。当超时后,服务端会返回超时信息,客户端收到返回后会再次发起请求。
每次请求结束后,客户端会再次发起请求。
优点:在无消息的情况下不会频繁的请求,相对短轮询较实时。
缺点:与服务器一直连接会消耗资源
SSE(Server-Sent Events)
SSE(Server-Sent Events)是一种基于HTTP的推送技术,可以实现服务器向客户端主动发送消息。相比WebSocket,SSE更加轻量级、易于使用,并且能够兼容老旧浏览器。
在SSE中,客户端通过EventSource对象与服务器建立连接,并监听来自服务器的事件。当有新的事件发生时,服务器会将数据以文本流的形式发送给客户端,在接收到数据后,客户端可以根据需要进行处理。
- 优点:
- 实现简单:只需一个HTTP连接即可完成推送功能;
- 兼容性好:支持大部分现代浏览器和移动设备;
- 轻量级:相对于WebSocket而言,最大的优势是便利,服务端不需要其他的类库,开发难度较低,传输开销较小。
- SSE和轮询相比它不用处理很多请求,不用每次建立新连接,延迟较低。
- 缺点:
如果客户端有很多,那就要保持很多长连接,这会占用服务器大量内存和连接数
SSE 规范:在 html5 的定义中,服务端 SSE
,一般需要遵循以下要求:
Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive
代码实现
前端 html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (!!window.EventSource) {
// 建立连接
source = new EventSource('http://127.0.0.1:6602/sse/createSseConnect?clientId=39bd662b7942418595c21a1ef0af7fad');
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
closeSse();
};
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:6602/sse/closeConnect/?clientId=e410d4c1d71c469b8d719de5d39783b7', true);
httpRequest.send();
console.log("close");
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
后端:
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* SSE长链接
*
* @author longshi
* @date 2021/12/13
*/
@RestController
@RequestMapping("/sse")
@Slf4j
public class SseEmitterController {
/**
* 容器,保存连接,用于输出返回
*/
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
private static final String CLIENT_ID = "test";
private static final String TASK_RESULT = "t_restult";
/**
* 创建SSE长链接
*
* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
**/
@GetMapping("/createSseConnect")
@CrossOrigin
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 是否需要给客户端推送ID
if (StringUtils.isBlank(clientId)) {
clientId = UUID.randomUUID().toString();
}
// 注册回调
sseEmitter.onCompletion(completionCallBack(clientId));
sseCache.put(clientId, sseEmitter);
log.info("创建新的sse连接,当前用户:{}", clientId);
try {
sseEmitter.send(SseEmitter.event().id(CLIENT_ID).data(clientId));
} catch (IOException e) {
log.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
throw new RuntimeException("创建连接异常!", e);
}
return sseEmitter;
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
**/
@CrossOrigin
@GetMapping("/closeSseConnect")
public Object closeSseConnect(String clientId) {
this.closeSseConnect(clientId);
return "{\"code\":\"200\"}";
}
/**
* 模拟后端推送消息
* @date 2022/12/16 10:52
* @param clientId
* @return java.lang.Object
*/
@GetMapping("/pushMsg")
public Object pushMsg(String clientId) {
JSONObject msg = new JSONObject();
msg.put("msg","this is push msg !");
sendMsgToClient(msg);
return "{\"code\":\"200\"}";
}
// 根据客户端id获取SseEmitter对象
public SseEmitter getSseEmitterByClientId(String clientId) {
return sseCache.get(clientId);
}
// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
public void sendMsgToClient(JSONObject msg) {
for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
sendMsgToClientByClientId(entry.getKey(), msg, entry.getValue());
}
}
/**
* 推送消息到客户端
* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
*
* @param clientId 客户端ID
* @param msg 推送信息,此处结合具体业务,定义自己的返回值即可
**/
private void sendMsgToClientByClientId(String clientId, JSONObject msg, SseEmitter sseEmitter) {
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, msg.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(TASK_RESULT).data(msg, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
log.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", msg.toString(), e);
boolean isSuccess = true;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
log.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
log.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, msg.toString());
return;
}
}
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
/**
* 连接超时时调用
*
* @param clientId 客户端ID
* @return java.lang.Runnable
**/
private Runnable timeoutCallBack(String clientId) {
return () -> {
log.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 推送消息异常时,回调方法
*
* @param clientId 客户端ID
* @return java.util.function.Consumer<java.lang.Throwable>
* @author re
* @date 2021/12/14
**/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
/**
* 移除用户连接
*
* @param clientId 客户端ID
**/
private void removeUser(String clientId) {
sseCache.remove(clientId);
log.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
}
}
nginx转发配置
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_http_version 1.1;
proxy_set_header Connection;
chunked_transfer_encoding off;
proxy_cache off;
WebSocket
WebSocket Stomp是一种协议,用于在Web浏览器和服务器之间进行实时通信。它结合了WebSocket协议和Stomp(Simple Text Oriented Messaging Protocol)协议的优点。
WebSocket是一种基于TCP的网络协议,允许双向通信。与HTTP不同,它支持长连接,并且可以在客户端和服务器之间传输任意数据类型。
Stomp是一种简单的文本消息传递协议,用于在应用程序之间发送消息。它提供了一个可靠的、异步的、分布式系统中使用消息队列进行通信的方法。
通过将这两个协议结合起来,WebSocket Stomp可以实现高效、可靠、实时地传输数据。它广泛应用于在线聊天、多人游戏等需要快速响应和实时更新数据的场景中。
服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
具体实现步骤:
- 整合依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
- 配置类
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import java.util.List;
//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//注册一个STOMP的endpoint,并指定使用SockJS协议
registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客户端发送消息的请求前缀
registry.setApplicationDestinationPrefixes("/app");
//客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送
registry.enableSimpleBroker("/topic", "/user");
//服务端通知客户端的前缀,可以不设置,默认为user
registry.setUserDestinationPrefix("/user");
}
/**
* 拦截器方式2
*
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
//1、判断是否首次连接
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
//2、判断token
List<String> nativeHeader = accessor.getNativeHeader("Authorization");
if (nativeHeader != null && !nativeHeader.isEmpty()) {
String token = nativeHeader.get(0);
if (StringUtils.isNotBlank(token)) {
log.info(token);
//todo,通过token获取用户信息,下方用loginUser来代
return message;
}
}
return null;
}
//不是首次连接,已经登陆成功
return message;
}
});
}
}
- 后端推送给前端消息
- 订阅点对点、广播
后端推送消息配置类
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MethodName: sendToUser
* @Description: 推送消息给订阅用户
* @Param: [userId]
**/
public void sendToUser(String userId, String message){
//这里可以控制权限等。。。
simpMessagingTemplate.convertAndSendToUser(userId,"/listener", message);
}
/**
* @MethodName: sendBroadCast
* @Description: 发送广播,需要用户事先订阅广播
* @Param: [topic, msg]
* @Return: void
**/
public void sendBroadCast(String topic,String msg){
simpMessagingTemplate.convertAndSend(topic,msg);
}
后端接收前端信息
@RestController
public class StompSocketController {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate; // 发送消息
/**
* @MethodName: subscribeMapping
* @Description: 订阅成功通知
* @Param: [id]
* @Return: void
**/
@SubscribeMapping("/user/{id}/listener")
public void subscribeMapping(@DestinationVariable("id") final long id) {
System.out.println(">>>>>>用户:"+id +",已订阅");
SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
sendToUser(param);
}
/**
* @MethodName: test
* @Description: 接收订阅topic消息
* @Param: [id, msg]
* @Return: void
* @Author: scott
* @Date: 2021/6/30
**/
@MessageMapping(value = "/user/{id}/listener")
public void UserSubListener(@DestinationVariable long id, String msg) {
System.out.println("收到客户端:" +id+",的消息");
SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
sendToUser(param);
}
/**
* @MethodName: sendToUser
* @Description: 推送消息给订阅用户
* @Param: [userId]
* @Return: void
**/
private void sendToUser(SubscribeMsg screenChangeMsg){
//这里可以控制权限等。。。
simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
}
/**
* @ClassName: SubMsg
**/
public static class SubscribeMsg {
private Long userId;
private String msg;
public SubscribeMsg(Long UserId, String msg){
this.userId = UserId;
this.msg = msg;
}
public Long getUserId() {
return userId;
}
public String getMsg() {
return msg;
}
}
}
- 前端代码
- 前端样例
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>websocket stomp</title>
<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script type="text/javascript">
// 定义全局变量 stomp socket
var stompClient,socket;
$(document).ready(function () {
if (window.WebSocket){
websocketConfig();
} else {
alert("错误","浏览器不支持websocket技术通讯.");
}
});
// websocket 配置
function websocketConfig() {
/*
* 1. 连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置
* 2. SockJS 所处理的URL 是 "http://" 或 "https://" 模式,而不是 "ws://" or "wss://"
*/
socket = new SockJS("http://127.0.0.1:6602/websocket");
// 通过sock对象监听每个事件节点,非必须,这个必须放在stompClient的方法前面
sockHandle();
// 获取 STOMP 子协议的客户端对象
stompClient = Stomp.over(socket);
/*
* 1. 获取到stomp 子协议后,可以设置心跳连接时间,认证连接,主动断开连接
* 2,连接心跳有的版本的stomp.js 是默认开启的,这里我们不管版本,手工设置
* 3. 心跳是双向的,客户端开启心跳,必须要服务端支持心跳才行
* 4. heartbeat.outgoing 表示客户端给服务端发送心跳的间隔时间
* 5. 客户端接收服务端心跳的间隔时间,如果为0 表示客户端不接收服务端心跳
*/
stompClient.heartbeat.outgoing = 10000;
stompClient.heartbeat.incoming = 1;
/*
* 1. stompClient.connect(headers, connectCallback, errorCallback);
* 2. headers表示客户端的认证信息,多个参数 json格式存,这里简单用的httpsessionID,可以根据业务场景变更
* 这里存的信息,在服务端StompHeaderAccessor 对象调用方法可以取到
* 3. connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法;
* errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;
*/
var headers = {"Authorization":"123232"};
stompClient.connect(headers,function (frame) {
console.log('Connected:' + frame);
/*
* 1. 订阅服务,订阅地址为服务器Controller 中的地址
* 2. 如果订阅为公告,地址为Controller 中@SendTo 注解地址
* 3. 如果订阅为私信,地址为setUserDestinationPrefix 前缀+@SendToUser注解地址
* 或者setUserDestinationPrefix 前缀 + controller的convertAndSendToUser地址一致
* 4. 这里演示为公告信息,所有订阅了的用户都能接受
*/
stompClient.subscribe("/user/00145/listener",function (message) {
console.log("接收到信息:" + message);
});
/*
* 1. 因为推送为私信,必须带上或者setUserDestinationPrefix前缀 /user
* 2. 演示自己发送给自己,做websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,
* 别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
*/
stompClient.subscribe('user/00145/listener',function (message) {
var msg = JSON.parse(message.body).msg;
console.log("接收到私信信息SendToUser:" + msg);
alert("接收到私信信息SendToUser:" + msg);
});
/*
* 1. 订阅点对点消息
* 2. 很多博文这里的路径会写成"/user/{accountId}/userTest/callBack”这种,是因为
* @SendToUser发送的代理地址是 /userTest/callBack, 地址将会被转化为 /user/{username}/userTest/callBack
* username,为用户的登录名,也是就是Principal或者本文中的WebSocketUserAuthentication对象getName获取的参数
* 如果在拦截器中配置了认证路径,可以不带参数,不过推荐用带参数的写法
*
*/
stompClient.subscribe('${socketUser}/userTest/callBack',function (message) {
var msg = message.body;
console.log("接收到点对点SendToUser:" + msg);
alert("接收到点对点SendToUser:" + msg);
});
}, function (error) {
console.log('STOMP: ' + error);
//setTimeout(websocketConfig, 10000);
console.log('STOMP: Reconnecting in 10 seconds');
});
}
// 发送公告消息
function sendMsg() {
var msg = $("#message").val();
var data ={"msg":msg};
/**
* 1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
* 2. headers 为发送信息的header,json格式,JavaScript 对象,
* 可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
* 3. body 为发送信息的 body,字符串,可选参数
*/
stompClient.send('${socketPrefix + "/sendChatMsg/" + groupId}',{},JSON.stringify(data));
}
// 发送给自己
function sendMsgOwn() {
var msg = $("#message").val();
var data ={"msg":msg};
/**
* 1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
* 2. headers 为发送信息的header,json格式,JavaScript 对象,
* 可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
* 3. body 为发送信息的 body,字符串,可选参数
*/
stompClient.send("${socketPrefix}/sendChatMsgByOwn",{},JSON.stringify(data));
}
// 发送点对点消息
function sendMsgById() {
var msg = $("#message").val();
var accountId = $("#accountId").val();
var data ={"msg":msg};
/**
* 1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
* 2. headers 为发送信息的header,json格式,JavaScript 对象,
* 可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
* 3. body 为发送信息的 body,字符串,可选参数
* 4. accountId这个参数其实可以通过header传过去,不过因为是restful风格,所以就跟在url上
*/
stompClient.send("${socketPrefix}/sendChatMsgById/" + accountId,{},JSON.stringify(data));
}
// 通过sock对象监听每个事件节点,非必须,这里开启了stomp的websocket 也不会生效了
function sockHandle() {
// 连接成功后的回调函数
socket.onopen = function () {
console.log("------连接成功------");
};
// 监听接受到服务器的消息
socket.onmessage = function (event) {
console.log('-------收到的消息: ' + event.data);
};
// 关闭连接的回调函数
socket.onclose = function (event) {
console.log('--------关闭连接: connection closed.------');
};
// 连接发生错误
socket.onerror = function () {
alert("连接错误", "网络超时或通讯地址错误.");
disconnect();
} ;
}
// 关闭websocket
function disconnect() {
if (socket != null) {
socket.close();
socket = null;
}
}
</script>
</head>
<body>
<div>
<div>我的 </div>
<span>消息</span>
<input type="text" id="message" name="message" />
<input type="button" id="sendMsg" name="sendMsg" value="发送公告" onclick="sendMsg();" />
<input type="button" id="sendMsgOwn" name="sendMsgOwn" value="自己给自己推送" onclick="sendMsgOwn();" />
<br/>
<span>接收人</span>
<input type="text" id="accountId" name="accountId" />
<input type="button" id="sendMsgById" name="sendMsgById" value="点对点消息" onclick="sendMsgById();" />
<br/>
<input type="button" value="断开" onclick="disconnect();" />
</div>
</html>
- Nginx 配置websocket
http 中添加
######websocket start#########
map `$http_upgrade $`connection_upgrade {
default upgrade;
'' close;
}
######websocket
server 中添加
location = '/websocket' {
proxy_pass <http://127.0.0.1:6602>;
proxy_read_timeout 300s;
proxy_http_version 1.1;
proxy_set_header X-Real-IP `$remote_addr;
proxy_set_header X-Forwarded-For $`remote_addr;
proxy_set_header Upgrade `$http_upgrade;
proxy_set_header Connection $`connection_upgrade;
}
- 网关负载均衡
路由配置
server:
port: 6602
spring:
cloud:
gateway:
httpclient:
response-timeout: 180s
connect-timeout: 180000
routes:
- id: websocket-message
uri: lb://message #服务名
predicates:
- Path=/websocket/info/**
filters:
- StripPrefix=0
- id: websocket-api
uri: lb:ws://message
predicates:
- Path=websocket/**
filters:
- StripPrefix=0
#无注册中心,配置静态服务IP健康检查
discovery:
client:
simple:
instances: ## 服务发现实力列表,无注册中心服务实例的地址,新增或删除服务需要手动维护这个列表
message[0]:
uri: http://192.168.1.100:8080
message[1]:
uri: http://192.168.1.101:8080
loadbalancer:
service-discovery:
timeout: 9000
configurations: health-check # 启用health-check
## 全局
health-check:
initial-delay: 3s
clients: ## 服务发现无注册中心服务的实例(实例列表)健康检查
message: # 单个服务指定服务名
health-check:
interval: 30s
initial-delay: 1s
refetch-instances-interval: 5s
path:
message: /actuator/health # 消息服务的监控检查URL
- 分布式情况下消息推送场景
当服务器要通过 WebSocket 发送消息时,该消息首先发送到 STOMP 代理。此 STOMP 代理将传入消息发送到与其连接的所有实例。
这个消息代理例如RabbitMQ、ActiveMQ等
后端主动推送给用户消息,由于负载均衡,后端多个服务,websocket是长连接,连接的应用多个实例中的某一个,所以推送的时候可以结合MQ消息队列的广播模式,推送给消息模块,再由消息模块发送给前端。
链路:
app -> nginx > api gateway(负载均衡) > message service(多个服务)
a.MQ-广播结果信息给消息服务,(不知道具体服务与前端长链接)
b.消息服务(2台)推送消息给前端
c.其中一台已经和前端建立了长链接,推送成功
Netty Socket.IO
Netty Socket.IO 是基于 Netty 框架实现的一个 WebSocket 库,它提供了对 Socket.IO 协议的支持。下面是 Netty Socket.IO 的具体实现:
客户端连接:客户端通过 WebSocket 连接到服务器,建立长连接。
握手协议:握手协议是在客户端和服务器之间进行的一次通信过程,在这个过程中,双方会交换一些信息以确保彼此都能够理解对方所使用的协议版本、编码方式等。
3.事件监听器注册:Netty Socket.IO 提供了多种事件类型,例如连接成功、断开连接、收到消息等。开发者可以通过注册相应的事件监听器来处理这些事件。
数据传输:Netty Socket.IO 支持两种数据传输方式:文本和二进制。开发者可以根据需要选择合适的传输方式。
命名空间管理:Socket.IO 允许创建多个命名空间(namespace),每个命名空间都有自己独立的房间(room)。Netty Socket.IO 提供了命名空间管理功能,允许用户创建、删除或修改命名空间及其相关配置。
房间管理:房间是指在某个命名空间内共享同一个标识符(ID)的客户端集合。Netty Socket.IO 支持动态加入或离开房间,并且可以向房间内的所有客户端广播消息。
跨域支持:Netty Socket.IO 支持跨域访问,允许客户端从不同的域名或 IP 地址连接到服务器。开发者可以通过配置来控制跨域策略。
总之,Netty Socket.IO 是一个功能强大、易于使用的 WebSocket 库,它提供了完整的实现和丰富的功能,为开发者提供了便捷、高效、可靠的数据传输方案。
具体实现步骤如下:
- 创建Spring Boot项目并添加Netty Socket.IO依赖在pom.xml文件中添加以下依赖:
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.17</version>
</dependency>
- 配置Netty Socket.IO服务器创建一个配置类,配置Netty Socket.IO服务器,并将其注入到Spring容器中。
@Configuration
public class NettySocketIOConfig {
@Bean
public SocketIOServer socketIOServer() {
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(8080);
return new SocketIOServer(config);
}
}
- 编写Socket.IO事件处理器编写处理Socket.IO事件的代码,并将其注册到服务器上。
@Component
public class ChatEventHandler {
@Autowired
private SocketIOServer server;
@OnConnect
public void onConnect(SocketIOClient client) {
System.out.println("Client connected: " + client.getSessionId());
server.getBroadcastOperations().sendEvent("message", "A new user has joined the chat.");
}
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
System.out.println("Client disconnected: " + client.getSessionId());
server.getBroadcastOperations().sendEvent("message", "A user has left the chat."); }
@OnEvent("chat")
public void onChat(SocketIOClient client, ChatMessage message) {
System.out.println("Received message: " + message.getContent());
server.getBroadcastOperations().sendEvent("chat", message); }
}
- 启动服务器在Spring Boot应用程序的main方法中启动Netty Socket.IO服务器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
// Start Netty Socket.IO server ApplicationContext context = new AnnotationConfigApplicationContext(NettySocketIOConfig.class);
SocketIOServer server = context.getBean(SocketIOServer.class);
server.start();
}
}
现在可以使用WebSocket或Socket.IO协议来实时通信了。例如,在JavaScript中,你可以使用以下代码连接到服务器并发送消息:
var socket = io.connect('http://localhost:8080');
socket.on('connect', function() {
console.log('Connected to server');
});
socket.emit('chat', { content: 'Hello world!' });
当有新用户加入聊天室或发送消息时,将会触发相应的事件处理器,并向所有客户端广播相关信息。
ReactiveStream
ReactiveStream 是一种标准化的异步流处理 API,旨在解决异步编程中的背压问题。它提供了一组接口和规范,使得不同的库可以互相兼容,并且可以在一个流中使用多个库进行数据处理。
ReactiveStream 的核心概念是 Publisher、Subscriber 和 Subscription。Publisher用于发布数据流,Subscriber 订阅并消费这些数据,Subscription 则负责管理订阅关系和控制背压。
通过 ReactiveStream 规范定义的接口和方法,开发者可以方便地创建自己的 Publisher 和 Subscriber 实现,并与其他遵循该规范的库进行交互。这样就能够实现基于事件驱动、响应式编程模型下高效可靠地异步处理大量数据。
目前已经有多个 Java 平台上常见框架(如 RxJava, Reactor, Akka Streams 等)支持 ReactiveStream 标准。同时也有其他语言平台开始出现对ReactiveStream 的支持(如 Kotlin Coroutines)。
ReactiveStream是一种基于流的异步编程模型,可以用来处理高并发、大数据量的场景。Spring Boot提供了对ReactiveStream的支持,使得开发者可以更加方便地使用这种编程模型。
具体实现如下:
- 引入相关依赖在pom.xml文件中引入以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
<version>${reactor.version}</version>
</dependency>
- 编写Controller创建一个RestController类,并添加@RequestMapping注解和@GetMapping注解。
@RestController
public class UserController {
@GetMapping("/users")
public Flux<UserDTO> getUsers() {
// TODO: 实现获取用户列表逻辑 }
}
- 使用Flux返回响应结果在getUsers方法中,我们使用Flux作为返回值类型。Flux表示一个包含多个元素的序列,在本例中就是用户列表。我们可以通过Mono.fromCallable方法从数据库或其他数据源中获取数据,并将其转换成Flux对象。
@GetMapping("/users")
public Flux<UserDTO> getUsers() {
return Mono.fromCallable(() -> userService.getUsers())
.flatMapMany(Flux::fromIterable)
.map(user -> new UserDTO(user.getId(), user.getName()));
}
在上面的代码中,我们首先使用Mono.fromCallable方法获取用户列表,并将其转换成Flux对象。然后通过flatMapMany方法将Flux
- 添加异常处理由于异步编程模型存在很多不确定性,因此需要对可能出现的异常进行处理。Spring Boot提供了@ExceptionHandler注解来捕获全局异常或指定类型的异常。
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(Exception.class)
public Mono<ResponseEntity<String>> handleException(Exception e) {
log.error("Unexpected error occurred: {}", e.getMessage());
return Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage()));
}
}
在上面的代码中,我们定义了一个全局异常处理器,用于捕获所有未被其他Handler拦截到的异常。当发生未知错误时,该方法会返回500状态码和错误信息。
- 配置Webflux服务器最后,在应用程序启动类中添加@EnableWebFlux注解并配置Webflux服务器:
@SpringBootApplication
@EnableWebFlux
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public HttpServer httpServer() {
return HttpServer.create().port(8080);
}
}
以上就是ReactiveStream Spring Boot具体实现过程。