什么是事件流
事件流(Event Stream) 是一种处理和传递事件的方式,通常用于系统中的异步消息传递或实时数据流。在事件驱动架构(Event-Driven Architecture)中,事件流扮演着至关重要的角色。
事件流的概念:
-
事件(Event):
事件是指系统中的某个状态变更或者操作的发生。例如,用户点击按钮、订单创建、传感器检测到的温度变化等都可以视为事件。 -
事件流(Event Stream):
事件流是指一系列有序的事件,这些事件按照时间顺序传输并可能被处理。事件流是一个数据流,它将事件按顺序传递给系统中的不同组件或服务。 -
流的特性:
- 时间序列:事件流通常按照时间顺序传递,也就是说,先发生的事件会先传递。
- 异步性:事件流通常是异步的,意味着事件的生产者和消费者之间不一定同步工作,消费者可以在稍后的时间消费事件。
- 连续性:事件流是一个连续的过程,事件会不断地产生并被处理。
事件流的应用场景:
-
日志流:
在现代分布式系统中,日志通常会以事件流的形式处理和传输。例如,系统的操作、错误或者状态变更会作为事件记录下来,并通过事件流传输到日志收集和分析系统中。 -
实时数据处理:
事件流在实时数据处理场景中非常重要,比如流式处理框架(例如Apache Kafka、Apache Flink等)就是用于处理不断产生的事件流。这些流可以表示实时的交易、用户活动、传感器数据等。 -
消息传递:
在消息队列系统中,消息也可以被视为事件流的一部分。生产者发布消息(事件),消费者接收并处理这些消息。 -
事件驱动架构(EDA):
在事件驱动架构中,系统的各个组件通过事件进行通信。每当一个事件发生时,系统的某个部分会被触发并响应这些事件。
事件流与传统流的不同:
- 传统流:通常是指一系列数据或任务的流转,它通常是在一定的顺序和时间点进行处理。
- 事件流:是一种更加灵活的流动方式,侧重于异步传递、处理和反应的模式。事件流中的每个事件通常是独立的、无状态的,并且具有明确的触发条件。
事件流的优势:
- 解耦:生产者和消费者之间通过事件流进行通信,从而降低了系统组件之间的耦合度。
- 扩展性:可以通过增加事件消费者来横向扩展系统处理能力。
- 实时性:适合处理实时数据流,事件流可以实时反映系统中的变化。
事件流的实现方式
(1) Servlet 编程中的响应对象:HttpServletResponse
HttpServletResponse
是经典的 Servlet 编程中的响应对象,可以用于向客户端实时推送数据。主要适用于简单的事件流场景。
实现方式:
- 使用
HttpServletResponse
来直接写数据到客户端。 - 通常通过长连接保持客户端和服务器之间的连接,以便持续推送事件数据。
示例代码:
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
@WebServlet("/event-stream")
public class EventStreamServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// 设置响应头,告知客户端这是一个事件流
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
// 获取输出流
PrintWriter out = response.getWriter();
// 模拟一个简单的事件流
int endIndex = 100;
while (true) {
endIndex--;
if( endIndex <= 0 ){
break;
}
out.println("data: " + System.currentTimeMillis());
out.println(); // 事件的分隔符
out.flush();
try {
Thread.sleep(1000); // 每秒推送一次事件
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
特点:
- 基本实现:简单地使用
HttpServletResponse
输出流来推送数据。 - 客户端与服务器之间通过长连接保持数据传输。
- 没有事件的流控或背压处理,适用于低负载、简单的应用场景。
客户端代码(HTML + JavaScript):
<!DOCTYPE html>
<html>
<head>
<title>SSE Example</title>
<script>
const eventSource = new EventSource("/sse-stream");
eventSource.onmessage = function(event) {
console.log("Received event: ", event.data);
document.getElementById("output").innerText = event.data;
};
</script>
</head>
<body>
<h1>Server Sent Events Example</h1>
<div id="output"></div>
</body>
</html>
(2) SseEmitter
仅需
spring-boot-starter-web
即可实现基本SSE功能
SSE(Server-Sent Events)是一种基于HTTP的单向通信协议,允许服务器主动推送实时数据到客户端。其核心特点包括轻量级协议、自动重连机制和浏览器原生支持
实现方式:
- 使用
SseEmitter
长连接保持客户端和服务器之间的连接,以便持续推送事件数据。 - 注意事项
-
- 超时设置:SSE连接默认无超时限制,需显式设置SseEmitter超时参数以避免资源泄漏。
-
- 响应格式:确保响应头包含Content-Type: text/event-stream,事件数据需遵循data: \n\n格式。
-
- 连接管理:使用ConcurrentHashMap管理客户端连接,及时清理断开或超时的SseEmitter实例。
-
- 异常处理:捕获IOException并调用completeWithError()释放资源,避免内存泄漏
-
示例代码:
@RestController
public class SseController {
// 使用线程安全集合管理连接
private static final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 建立SSE连接
@GetMapping(value = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestParam String clientId) {
// 设置30秒超时(根据业务调整)
SseEmitter emitter = new SseEmitter(30_000L);
emitters.put(clientId, emitter);
// 注册连接清理回调
emitter.onCompletion(() -> emitters.remove(clientId));
emitter.onTimeout(() -> emitters.remove(clientId));
emitter.onError(e -> {
log.error("SSE Error: {}", e.getMessage());
emitters.remove(clientId);
});
// 立即发送初始化握手信号
try {
emitter.send(SseEmitter.event()
.name("INIT")
.data("{\"status\": \"CONNECTED\"}")
);
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
// 消息推送方法(可从其他服务调用)
public void pushMessage(String clientId, String message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.name("MESSAGE")
.data(message)
);
} catch (IOException e) {
emitter.completeWithError(e);
emitters.remove(clientId);
}
}
}
}
特点:
- SseEmitter: 这是一个用于发送事件流的类。每次调用 emitter.send() 时,都会向客户端推送一个新的事件。
- 线程: 在新线程中模拟事件流的生成,每1秒发送一个事件。你可以根据需要调整事件的生成方式。
- complete() 和 completeWithError(): 这两个方法分别用于表示事件流的正常结束和错误结束。
客户端代码(HTML + JavaScript):
// 建立连接
const eventSource = new EventSource('/sse/connect?clientId=123');
// 监听消息
eventSource.addEventListener('MESSAGE', (e) => {
console.log('Received:', JSON.parse(e.data));
});
// 错误处理
eventSource.onerror = (err) => {
console.error('SSE Error:', err);
eventSource.close(); // 手动关闭连接
};
(3) WebFlux 实现事件流
在使用 Spring WebFlux 构建事件流时,可以通过响应式编程的方式实现高效的事件推送。WebFlux 是 Spring 5 引入的响应式编程模块,主要用于构建异步和非阻塞的应用程序。WebFlux 通过
Mono
和Flux
来表示异步的数据流,可以非常方便地实现事件流推送。
1. WebFlux 实现事件流的核心概念
- Mono: 表示一个单一的异步值,通常用于返回单个对象或空值。
- Flux: 表示多个异步值的流,通常用于返回多个对象。
WebFlux 是基于非阻塞 I/O 的,能够在高并发的环境下进行高效的事件流处理。它允许服务器以流的方式向客户端推送数据,特别适用于实时应用,如实时通知、事件推送等。
2. 实现事件流的步骤
下面是如何通过 WebFlux 实现事件流的一个简单示例。
示例:通过 WebFlux 推送事件流
- 创建 Spring Boot 项目并添加 WebFlux 依赖
首先,确保你有一个 Spring Boot 项目,并且在pom.xml
中包含 WebFlux 依赖:
<dependencies>
<!-- WebFlux 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Spring Boot 启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
- 创建事件流控制器
在 WebFlux 中,我们可以使用 Flux
来构建一个事件流。Flux
可以通过不同的方式发出多个数据流。在下面的例子中,我们通过每秒钟发出一个新的时间戳来模拟一个事件流。
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.time.Duration;
@RestController
public class EventStreamController {
@GetMapping("/events")
public Flux<String> streamEvents() {
// 使用 Flux.interval 每秒钟发送一次事件
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event at: " + System.currentTimeMillis());
}
}
解释:
Flux.interval(Duration.ofSeconds(1))
创建一个每秒产生一个事件的流。.map()
方法用来将每个时间戳转换成字符串,表示事件数据。- 这个流会一直持续下去,直到客户端关闭连接。
- 创建客户端来接收事件流
在客户端,您可以使用 JavaScript 的EventSource
来接收服务器推送的事件。以下是一个简单的 HTML 页面,展示了如何接收并显示服务器推送的事件:
<!DOCTYPE html>
<html>
<head>
<title>WebFlux Event Stream</title>
<script>
const eventSource = new EventSource("/events");
eventSource.onmessage = function(event) {
document.getElementById("event-data").innerText = event.data;
};
</script>
</head>
<body>
<h1>WebFlux Event Stream</h1>
<div id="event-data"></div>
</body>
</html>
解释:
EventSource("/events")
会建立一个长连接,接收从/events
路径推送的事件流。- 每次收到事件时,
onmessage
事件处理程序会将事件内容显示在页面上。 - WebFlux 是实现事件流的理想选择,特别是当需要处理高并发、高吞吐量的事件流时。
- 通过
Flux
和Mono
可以简洁地构建异步事件流。 - 客户端可以通过
EventSource
实现与服务器的事件流通信,提供实时的推送体验。
3. 运行和测试
- 启动 Spring Boot 应用。
- 访问
http://localhost:8080/
查看实时事件流。 - 每秒钟,页面会显示当前的时间戳,表示从服务器推送的事件。
4. WebFlux 特点
- 异步和非阻塞: WebFlux 使用响应式编程模型,处理请求时不会阻塞线程,适合高并发场景。
- 低延迟: 通过
Flux
和Mono
,服务器可以高效地处理多个客户端的事件流。 - 高可扩展性: 适合构建大规模的实时应用(如实时数据分析、推送通知、直播系统等)。
- 与传统 Servlet 的区别: WebFlux 与传统的 Servlet API 不同,它支持响应式和非阻塞操作,能够更好地应对高负载和高并发的场景。
实战用法(AI流式问答)
!!!!!更推荐使用WebFlux !!!!
Springboot
以智普AI的免费AI问答接口为例
配置webclient
package com.gt.quality.ai;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
/**
* 万里悲秋常作客,百年多病独登台
*
* @author : makeJava
*/
@Component
public class AiLocalConfig {
HttpClient httpClient = HttpClient.create().
// 设置连接超时时间为60秒
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000).
// 设置响应超时时间为60秒
responseTimeout(Duration.ofMillis(60000)).
doOnConnected(con -> con.addHandlerLast(
// 设置读取超时时间为60秒
new ReadTimeoutHandler(60)
// 设置写入超时时间为60秒
).addHandlerLast(new WriteTimeoutHandler(60)));
/**
* 智普AI对话使用接口的处理器
* 设置为 5MB缓冲区
*
* @param builder Builder
* @return WebClient
*/
@Bean(name = "useDialogue")
public WebClient useDialogue(WebClient.Builder builder) {
return builder
.baseUrl("https://open.bigmodel.cn/api/paas/v4/chat/completions")
// 设置为 5MB
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(5 * 1024 * 1024))
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
}
接口使用
/**
* 事件流---式调用问答
*/
@RequestMapping(value = "/zhi_pu_say", method = {RequestMethod.GET, RequestMethod.POST}, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "流式问答")
public Flux<String> testSseInvoke(@RequestParam(value = "question", defaultValue = "2025年国家GPD第一季度的详细情况?", required = false) String question) {
log.info("question:{}", question);
return zhiPuAiProxyService.streamInvoke(question);
}
/**
* 事件流---式调用问答
*/
@RequestMapping(value = "/jsonToSay", method = RequestMethod.POST, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@Operation(summary = "流式问答")
public Flux<String> jsonToSay(@RequestBody String question) {
return zhiPuAiProxyService.streamInvoke(question);
}
业务http的流是调用智普AI
@Override
public Flux<String> streamInvoke(String question) {
// Create a map to store the request body
Map<String, Object> body = new HashMap<>();
// Set the model to "glm-4-flash"
body.put("model", "glm-4-flash");
// Create a list to store the questions
List<Map<String, Object>> questionList = getAiRoleAndUserBody(question);
// Add the list of questions to the request body
body.put("messages", questionList);
// Set the request_id to a random UUID
body.put("request_id", UUID.randomUUID().toString());
// Set the do_sample to true
body.put("do_sample", true);
// Set the temperature to 0.95
body.put("temperature", 0.95);
// Set the stream to true
body.put("stream", true);
// Set the max_tokens to 4095
body.put("max_tokens", 4095);
// Create a map to store the response format
Map<String, Object> responseFormat = new HashMap<>();
// Set the type of the response format to "json_object"
responseFormat.put("type", "json_object");
// Add the response format to the request body
body.put("response_format", responseFormat);
// function、retrieval、web_search。
body.put("type", "web_search");
try {
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", MediaType.APPLICATION_JSON_VALUE);
headers.set("Accept", MediaType.TEXT_EVENT_STREAM_VALUE);
headers.set("Accept-Encoding", "gzip, deflate, br");
headers.set("Connection", "keep-alive");
headers.set("Authorization", "Bearer " + ZhiPuAIConstant.ZHI_PU_AI_API_KEY);
// 数据库对话分析
return webClient.post()
.headers(httpHeaders -> httpHeaders.putAll(headers))
.bodyValue(JSONUtil.toJsonStr(body))
.retrieve().bodyToFlux(String.class)
.map(s -> s.replaceAll("data:", ""));
} catch (Throwable e) {
log.error(e.getMessage(), e);
return Flux.error(new BusinessSelfException("系统走神了,请稍后再试..."));
}
}
/**
* 获取ai角色和用户信息
* @param question question
* @return
*/
private List<Map<String, Object>> getAiRoleAndUserBody(String question) {
List<Map<String, Object>> questionList = new ArrayList<>();
// Create a map to store the first question
Map<String, Object> questionMap = new HashMap<>();
// Set the role of the first question to "system"
questionMap.put("role", "system");
// Set the content of the first question
questionMap.put("content", "你是一个乐于回答各种问题的小助手,你的任务是提供专业、准确、有洞察力的建议。");
// Add the first question to the list
questionList.add(questionMap);
// Create a map to store the second question
Map<String, Object> questionMap2 = new HashMap<>();
// Set the role of the second question to "user"
questionMap2.put("role", "user");
// Set the content of the second question
questionMap2.put("content", question);
// Add the second question to the list
questionList.add(questionMap2);
return questionList;
}