使用SSE解决获取状态不一致问题
- 1. 问题描述
- 2. SSE介绍
- 2.1 SSE 的工作原理
- 2.2 SSE 的事件格式规范
- 2.3 SSE与其他技术对比
- 2.4 SSE 的优缺点
- 3. 实战代码
1. 问题描述
目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中需要轮询文件上传的状态的接口,还有要调用整个任务状态的接口,因为我们的Mysql是单例的,有多个大文件上传的时候会出现任务状态不一致的问题,经过调研有WebSocket与SSE两种方式,但是我只需要把信息推给前端,所以使用个更轻量的SSE。
2. SSE介绍
SSE(Server-Sent Events) 是 HTML5 标准中的一项技术,它允许服务器通过单向连接持续地将数据推送给客户端(通常是浏览器)。
- 与传统的 HTTP 请求响应不同,SSE 是一种服务器主动推送数据的机制。
- 与 WebSocket 不同,它是单向通信:服务器 -> 客户端。
SSE 使用标准的 HTTP 协议 和 文本/event-stream 格式,通常用于实时消息推送,如股票行情、社交动态通知、在线状态更新等。
2.1 SSE 的工作原理
- 客户端通过普通的 HTTP 请求发起连接:
GET /events HTTP/1.1
Accept: text/event-stream
- 服务器响应:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- 事件流
event: message
id: 123
data: 这是要推送的数据
- 客户端自动处理每一条数据,并在连接断开时自动重连。
2.2 SSE 的事件格式规范
每条消息由一组字段组成,字段以冒号:
分隔,每条消息之间由两个换行符 \n\n
分隔:
示例:
id: 1001
event: update
data: {"progress": 50}
2.3 SSE与其他技术对比
2.4 SSE 的优缺点
✅ 优点:
- 使用标准 HTTP 协议,兼容性好;
- 实现简单,开发成本低;
- 支持自动重连机制;
- 支持事件类型、多行数据等;
- 适用于需要实时更新、但无需客户端发送大量消息的场景。
❌ 缺点:
- 只支持单向通信;
- 不支持二进制数据;
- 不支持所有浏览器(如 IE);
- 有并发连接数限制(部分浏览器每个域名默认最大连接数限制);
- 需配置好心跳机制、防止代理服务器中断连接。
3. 实战代码
@GetMapping("/file-upload-status")
@Operation(summary = "文件上传状态SSE")
public SseEmitter fileUploadStatus(@RequestParam("taskCode") String taskCode,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
log.info("文件上传状态SSE, 请求参数: {}, Last-Event-ID: {}", taskCode, lastEventId);
try {
return dataConfigService.fileUploadStatus(taskCode, lastEventId);
} catch (Exception e) {
log.error("文件上传状态SSE,失败,错误信息:{}", e.getMessage());
throw e;
}
}
/**
* 用于存储每个任务对应的 SseEmitter 实例。
* key 为任务唯一标识 taskCode,value 为与该任务关联的 SseEmitter。
* 使用 ConcurrentHashMap 是为了支持多线程环境下的并发读写,确保线程安全。
* <p>
* 场景:
* - 客户端发起 SSE 连接时,将对应的 SseEmitter 存入此 Map。
* - 后台定时任务或上传进度更新时可以通过 taskCode 拿到对应的 emitter 推送数据。
* - 当连接断开(如超时、关闭、出错)时,从 Map 中移除对应的 emitter,避免内存泄漏。
*/
private final Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
/**
* 定时任务线程池,用于定期推送每个任务的上传状态到前端客户端(通过 SseEmitter)。
* 使用 Runtime.getRuntime().availableProcessors() 获取当前机器的可用处理器核心数,
* 并以此设置线程池大小,合理利用系统资源。
* <p>
* 特点:
* - ScheduledExecutorService 支持定时或周期性任务调度,适合做周期性状态推送。
* - 使用线程池而不是单线程可以支持多个任务并发状态推送。
* - 可根据系统并发量和任务复杂度调整线程池大小。
*/
private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 缓存最近一次推送的状态(用于断线续传)
*/
private final Map<String, DataFileUploadStatusVO> lastEventDataMap = new ConcurrentHashMap<>();
@Override
public SseEmi
tter fileUploadStatus(String taskCode, String lastEventId) {
// 检查任务代码是否为空
if (StringUtils.isBlank(taskCode)) {
throw new ServiceException(400, "请求参数不能为空");
}
// 创建 SseEmitter 实例,设置超时时间为 1 小时(单位:毫秒)
SseEmitter emitter = new SseEmitter(60 * 60 * 1000L);
// 将当前任务的 emitter 注册到全局 emitterMap 中,用于后续状态推送
emitterMap.put(taskCode, emitter);
// 定义清理逻辑(连接关闭时自动移除 emitter,避免内存泄漏)
Runnable cleanup = () -> {
emitterMap.remove(taskCode);
lastEventDataMap.remove(taskCode);
};
// 注册回调:客户端主动关闭连接时调用
emitter.onCompletion(cleanup);
// 注册回调:连接超时后自动关闭并清理
emitter.onTimeout(cleanup);
// 注册回调:出现异常时进行日志记录并清理资源
emitter.onError(e -> {
log.error("文件上传状态SSE 错误,taskCode:{}, 错误信息:{}", taskCode, e.getMessage(), e);
cleanup.run();
});
// 尝试断点续传(只在客户端提供 Last-Event-ID 的情况下)
if (StringUtils.isNotBlank(lastEventId)) {
DataFileUploadStatusVO lastData = lastEventDataMap.get(taskCode);
if (lastData != null) {
try {
emitter.send(SseEmitter.event()
.id(lastEventId)
.name("upload-status")
.reconnectTime(5000L)
.data(lastData, MediaType.APPLICATION_JSON));
log.info("文件上传状态SSE,断点续传成功,taskCode:{}, lastEventId:{}, 数据:{}", taskCode, lastEventId,
JSON.toJSONString(lastData));
} catch (IOException e) {
log.error("文件上传状态SSE,断点续传失败,taskCode:{}, 错误:{}", taskCode, e.getMessage(), e);
}
}
}
// 创建一个原子容器用于持有定时任务引用,以便后续取消任务
AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
// 启动定时任务
ScheduledFuture<?> future = scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
// 从全局映射中获取当前连接的 emitter,若为空说明连接已关闭,停止执行
SseEmitter currentEmitter = emitterMap.get(taskCode);
if (currentEmitter == null) {
return;
}
// 查询文件上传状态
List<DataConfigRespVO> dataConfigList = getDataConfigList(taskCode);
// 查询任务信息
TaskRespVO taskRespVO = taskService.getTask(taskCode);
// 将数据库实体转换为状态对象列表
List<DataFileUploadStatusVO.FileStatus> fileStatusList = dataConfigList.stream()
.map(item -> {
DataFileUploadStatusVO.FileStatus fileStatus = new DataFileUploadStatusVO.FileStatus();
fileStatus.setId(item.getId());
fileStatus.setCode(item.getCode());
fileStatus.setImportMethod(item.getImportMethod());
fileStatus.setFilename(item.getFilename());
fileStatus.setFileUrl(item.getFileUrl());
fileStatus.setSize(item.getSize());
fileStatus.setUploadStatusCode(item.getUploadStatusCode());
return fileStatus;
})
.collect(Collectors.toList());
// 构建完整状态返回对象
DataFileUploadStatusVO statusVO = new DataFileUploadStatusVO();
statusVO.setTaskCode(taskCode);
statusVO.setCurrentTime(TimeUtil.getCurrentTime());
statusVO.setFileStatusList(fileStatusList);
statusVO.setName(taskRespVO.getName());
statusVO.setDataType(taskRespVO.getDataType());
statusVO.setDataTypeStr(taskRespVO.getDataTypeStr());
statusVO.setImportType(taskRespVO.getImportType());
statusVO.setCurrentLinkCode(taskRespVO.getCurrentLinkCode());
statusVO.setCurrentLink(taskRespVO.getCurrentLink());
statusVO.setCurrentLinkStatusCode(taskRespVO.getCurrentLinkStatusCode());
statusVO.setCurrentLinkStatus(taskRespVO.getCurrentLinkStatus());
String eventId = String.valueOf(System.currentTimeMillis());
// 推送状态
try {
// 设置状态
statusVO.setFinished(fileUploadFished(statusVO));
// 保存最后一次推送的数据,用于断线续传
lastEventDataMap.put(taskCode, statusVO);
currentEmitter.send(SseEmitter.event()
// 事件 ID,供断线续传
.id(eventId)
// 事件名
.name("upload-status")
// 告诉客户端:断线后5秒再重连
.reconnectTime(5000L)
// 推送数据为 JSON
.data(statusVO, MediaType.APPLICATION_JSON));
log.info("文件上传状态SSE,当前时间:{},taskCode:{},推送数据:{}", TimeUtil.getCurrentTime(), taskCode,
JSON.toJSONString(statusVO));
} catch (IOException ioException) {
// 客户端断开连接或传输异常,主动清理资源并中止定时任务
log.error("文件上传状态SSE,错误:{},taskCode:{}", ioException.getMessage(), taskCode);
currentEmitter.completeWithError(ioException);
cleanup.run();
futureRef.get().cancel(true);
return;
}
// 判断任务是否完成
if (fileUploadFished(statusVO)) {
// 设置任务完成
statusVO.setFinished(true);
currentEmitter.send(SseEmitter.event()
.name("upload-status")
.data(statusVO, MediaType.APPLICATION_JSON));
// 主动关闭连接
currentEmitter.complete();
// 清理资源并取消定时任务
cleanup.run();
futureRef.get().cancel(true);
log.info("文件上传状态SSE,任务已完成,taskCode:{},当前时间:{},emitterMap:{},大小:{}", taskCode,
TimeUtil.getCurrentTime(), JSON.toJSONString(emitterMap), emitterMap.size());
}
} catch (Exception e) {
log.error("文件上传状态SSE, SSE推送失败,taskCode:{},错误信息:{}", taskCode, e.getMessage(), e);
SseEmitter failedEmitter = emitterMap.get(taskCode);
if (failedEmitter != null) {
failedEmitter.completeWithError(e);
}
cleanup.run();
futureRef.get().cancel(true);
}
// 每2秒执行一次
}, 0, 2, TimeUnit.SECONDS);
// 记录定时任务引用
futureRef.set(future);
// 返回 emitter 给前端,保持连接
return emitter;
}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:27","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:29","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:31","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}
event:upload-status
data:{"finished":false,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:33","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":1}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":1,"currentLinkStatus":"执行中"}
event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}
event:upload-status
data:{"finished":true,"taskCode":"366d1b2c760f4afd8b709e64c188a27c","currentTime":"2025-06-06 09:53:35","fileStatusList":[{"id":3922,"code":"64d9c1ab4abc441280abb61bba72a611","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本1.txt","fileUrl":"data-config/file/64d9c1ab4abc441280abb61bba72a611_新闻文本1.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.39 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3923,"code":"33735f65a0b940a996ded37ff4066299","importMethod":{"code":1,"desc":"文件导入"},"filename":"英文文本.txt","fileUrl":"data-config/file/33735f65a0b940a996ded37ff4066299_英文文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"1.77 KB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2},{"id":3924,"code":"e7c06f1da1a34e4fb8ee46eeac6f9f2e","importMethod":{"code":1,"desc":"文件导入"},"filename":"新闻文本.txt","fileUrl":"data-config/file/e7c06f1da1a34e4fb8ee46eeac6f9f2e_新闻文本.txt","tableName":null,"fieldName":null,"createTime":null,"introduction":null,"size":"2.84 MB","includeHeader":null,"csvSeparator":null,"uploadStatusCode":2}],"name":"测试文件上传123","dataType":2,"dataTypeStr":"非结构化数据","importType":"file","currentLinkCode":2,"currentLink":"数据配置","currentLinkStatusCode":2,"currentLinkStatus":"已完成"}