SseEmitter
是 Spring Framework 中用于实现 Server-Sent Events (SSE) 的一个类。SSE 是一种允许服务器向客户端推送实时更新的技术,特别适合需要从服务器到客户端的单向消息传递场景,如股票价格更新、社交媒体的新消息通知等。
Server-Sent Events (SSE) 简介
- 单向通信:SSE 支持从服务器到客户端的单向通信,即服务器可以主动向客户端发送数据,而不需要客户端发起请求。
- 持久连接:一旦建立连接,该连接会保持打开状态,直到客户端或服务器决定关闭它。
- 自动重连:如果连接意外断开,浏览器会自动尝试重新连接。
- 文本数据:SSE 仅支持传输 UTF-8 编码的文本数据,不支持二进制数据。
SseEmitter
类的作用
在 Spring 应用中,SseEmitter
提供了一种简便的方式来实现 SSE 功能。它允许开发者通过 HTTP 响应流式地向客户端发送事件。
主要特性
- 异步处理:
SseEmitter
支持异步处理,这意味着它可以长时间运行而不阻塞主线程。 - 事件驱动:可以通过调用
send()
方法来发送事件给客户端。 - 超时管理:可以设置超时时间,超过这个时间如果没有新的事件发生,连接将被关闭。
- 错误处理:提供了对错误处理的支持,可以通过
onError()
和onCompletion()
回调方法来处理异常和连接关闭的情况。
使用示例
下面是一个简单的例子,展示了如何使用 SseEmitter
来实现 SSE:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/events")
public class SseController {
private final ExecutorService executorService = Executors.newCachedThreadPool();
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe() {
SseEmitter emitter = new SseEmitter();
// 模拟一个后台任务,定期发送事件
executorService.execute(() -> {
try {
for (int i = 0; i < 10; i++) {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("SSE event " + i)
.id(String.valueOf(i))
.name("sse event");
emitter.send(event);
Thread.sleep(1000); // 每秒发送一次
}
emitter.complete();
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
}
});
// 设置超时处理
emitter.onTimeout(() -> {
System.out.println("SSE timeout occurred!");
emitter.complete();
});
// 设置错误处理
emitter.onError((e) -> {
System.out.println("SSE error occurred: " + e.getMessage());
emitter.completeWithError(e);
});
return emitter;
}
}
关键点解释
-
创建
SseEmitter
实例:SseEmitter emitter = new SseEmitter();
创建一个新的
SseEmitter
实例,默认情况下没有设置超时时间。你可以通过构造函数参数指定超时时间(以毫秒为单位)。 -
发送事件:
emitter.send(SseEmitter.event().data("SSE event " + i).id(String.valueOf(i)).name("sse event"));
使用
send()
方法发送事件。SseEmitter.event()
返回一个SseEventBuilder
,可以通过它构建事件对象,并设置事件的数据、ID 和名称等属性。 -
超时处理:
emitter.onTimeout(() -> { System.out.println("SSE timeout occurred!"); emitter.complete(); });
当连接超过设定的超时时间且没有新的事件发生时,触发超时处理逻辑。
-
错误处理:
emitter.onError((e) -> { System.out.println("SSE error occurred: " + e.getMessage()); emitter.completeWithError(e); });
当发生错误时,触发错误处理逻辑,并可以选择性地结束连接。
-
完成连接:
emitter.complete();
手动完成连接,表示不会再有更多事件发送。
客户端接收 SSE
在前端,可以使用 JavaScript 来接收 SSE 事件。例如:
const eventSource = new EventSource('/events');
eventSource.onmessage = function(event) {
console.log('New message:', event.data);
};
eventSource.onerror = function(error) {
console.error('Error occurred:', error);
};
总结
SseEmitter
是 Spring 提供的一个用于实现 Server-Sent Events 的类,简化了从服务器向客户端推送实时更新的过程。- 它支持异步事件发送、超时管理和错误处理,非常适合需要实时更新的应用场景,如社交网络的通知系统、实时监控应用等。
- 结合合适的后端逻辑和前端代码,可以轻松实现实时双向通信的一部分功能(尽管 SSE 只支持单向从服务器到客户端的通信)。