一、背景
项目中消息推送,简单的有短轮询、长轮询,还有SSE(Server-Sent Events)、以及最强大复杂的WebSocket。
至于技术选型,SSE和WebSocket区别,网上有很多,我也不整理了,大佬的链接
《网页端IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket》。
其实实现很简单,写这篇文章的目的,主要是将处理过程中的一些问题,记录解决方案。
二、后端实现
其实这里网上也是很多demo,我简写一点demo
1、引入依赖
这里需要用的下面依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2、sse工具类
package com.asiainfo.common.utils.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* Des: sse工具类
* Author: SiQiangMing 2025/5/28 15:50
*/
@Slf4j
public class SseEmitterUtil {
/**
* 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
*/
private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 用户创建sse链接
* Author: SiQiangMing 2025/5/28 17:21
* @param userId: 用户id
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
*/
public static SseEmitter connect(Long userId) {
// 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
log.info("----------------------------创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId
, sseEmitterMap.size());
return sseEmitter;
}
/**
* 给制定用户发送消息
* Author: SiQiangMing 2025/5/28 17:21
* @param userId: 用户id
* @param sseMessage: 消息
* @return void
*/
public static void sendMessage(Long userId, String sseMessage) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(sseMessage);
log.info("----------------------------用户 {} 推送消息 {}", userId, sseMessage);
} catch (IOException e) {
log.error("----------------------------用户 {} 推送消息异常", userId);
disconnect(userId);
}
} else {
log.error("----------------------------消息推送 用户 {} 不存在,链接总数 {}"
, userId, sseEmitterMap.size());
}
}
/**
* 判断用户是否存在sse链接
* Author: SiQiangMing 2025/5/29 10:02
* @param userId:
* @return boolean
*/
public static boolean checkSseExist(Long userId) {
if (userId == null) {
return false;
}
return sseEmitterMap.containsKey(userId);
}
/**
* 群发所有人,这里用来测试异常的排除链接
* Author: SiQiangMing 2025/5/28 17:20
* @param message: 消息
* @return void
*/
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("----------------------------用户 {} 推送异常", k);
disconnect(k);
}
});
}
/**
* 移除用户连接
* Author: SiQiangMing 2025/5/28 17:20
* @param userId: 用户id
* @return void
*/
public static void disconnect(Long userId) {
if (sseEmitterMap.containsKey(userId)) {
sseEmitterMap.get(userId).complete();
sseEmitterMap.remove(userId);
log.info("----------------------------移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());
} else {
log.error("-----------------------------移除用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());
}
}
/**
* 结束回调
* Author: SiQiangMing 2025/5/28 17:19
* @param userId: 用户id
* @return java.lang.Runnable
*/
private static Runnable completionCallBack(Long userId) {
return () -> {
log.info("----------------------------用户 {} 结束连接", userId);
};
}
/**
* 超时回调
* Author: SiQiangMing 2025/5/28 17:20
* @param userId: 用户id
* @return java.lang.Runnable
*/
private static Runnable timeoutCallBack(Long userId) {
return () -> {
log.error("----------------------------用户 {} 连接超时", userId);
disconnect(userId);
};
}
/**
* 异常回调
* Author: SiQiangMing 2025/5/28 17:20
* @param userId: 用户id
* @return java.util.function.Consumer<java.lang.Throwable>
*/
private static Consumer<Throwable> errorCallBack(Long userId) {
return throwable -> {
log.error("----------------------------用户 {} 连接异常", userId);
disconnect(userId);
};
}
}
三、前端
前端创建链接,请求后端的创建接口,注意页面销毁的时候,关闭sse链接
mounted() {
// sse链接
createEventSource() {
// new
this.eventSource = new EventSource("后端的创建sse路径");
// 收到消息
this.eventSource.onmessage = (e) => {
// 消息处理 e.data
};
// 异常处理
this.eventSource.onerror = (error) => {
console.error(error);
};
},
},
unmounted() {
// 组件销毁时关闭 SSE 连接
if (this.eventSource) {
this.eventSource.close();
}
},
3、创建sse
/**
* 处理客户端的连接请求
* Author: SiQiangMing 2025/5/26 16:06
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
*/
@GetMapping(value = "/xxx", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter xxx() {
// 返回 SseEmitter 给客户端
Long userId = SecurityUtils.getUserId();
SseEmitter sseEmitter = SseEmitterUtil.connect(userId);
// 可以直接带初始化信息返回
SseEmitterUtil.sendMessage(userId, "消息");
return sseEmitter;
}
四、nginx配置
在这里遇到了一些问题,记录下解决方案。
1、在idea开发工具都正常,部署到生产环境后,sse后端能推送,前端没有收到消息。排查浏览器网络请求,nginx日志发现,客户端主动关闭了链接。
在nginx.conf中增加配置
location /精准匹配sse创建路径 {
add_header 'Cache-Control' 'no-cache'; //不缓存数据,每次请求时都会从服务器获取最新的内容
proxy_set_header Connection ''; // 的作用是清除或覆盖 Connection头
proxy_http_version 1.1;
proxy_set_header Host $host; //确保后端服务器接收到的 Host 值与客户端原始请求的 Host 一致,或符合后端服务器的预期。
proxy_pass http://xxx:port/sse创建路径;
}
2、SSE链接一分钟请求一次,频繁创建。
在之前的配置中追加配置
location /精准匹配sse创建路径 {
proxy_connect_timeout 3600s; // 解决1分钟重连,
proxy_send_timeout 3600s; // 解决1分钟重连,
proxy_read_timeout 3600s; // 解决1分钟重连,
add_header 'Cache-Control' 'no-cache'; //不缓存数据,每次请求时都会从服务器获取最新的内容
proxy_set_header Connection ''; // 的作用是清除或覆盖 Connection头
proxy_http_version 1.1;
proxy_set_header Host $host; //确保后端服务器接收到的 Host 值与客户端原始请求的 Host 一致,或符合后端服务器的预期。
proxy_pass http://xxx:port/sse创建路径;
}
3、正常情况,链接保持了40分钟,还正常
4、并发数问题
因为这里使用的http,所以版本是HTTP/1.1,同一个端口并发sse,只有6个,有两种解决方案,后期使用HTTP/2.0,默认100并发,满足要求。