AI 的“打字机效果”到底怎么实现?从我的聊天项目说起
从项目中学习 NDJSON 流式协议本文基于 X-Chat 项目中的 AI 流式回复链路整理。项目由 Vue3 前端、Spring Boot 主后端、FastAPI AI 服务组成。本文重点讲清楚一个问题Python AI 服务如何一边生成答案一边把内容传给 Java 后端再实时显示到前端页面上1. 先看项目背景X-Chat 是一个即时通讯项目同时接入了一个独立的 AI 服务。它的整体架构大致是Vue3 前端 - Spring Boot 主后端 - 用户鉴权 / 会话管理 / 消息落库 / WebSocket 推送 - HTTP 调用 FastAPI AI 服务 /chat/stream - 意图判断 - RAG 检索 / Agent 调用 - NDJSON 流式返回也就是说前端不会直接访问 Python AI 服务。前端只和 Java 后端通信Java 后端再根据业务权限、会话、群聊等信息去调用 Python。这样设计有两个好处业务权限更安全群成员校验、聊天会话校验都在 Java 后端完成避免前端绕过 Java 直接访问 AI 服务。AI 服务更独立Python 只专心处理 RAG、Agent、模型调用和流式输出Java 继续负责主业务。在这个项目里AI 流式回复的核心协议就是NDJSON。2. NDJSON 是什么NDJSON 全称是Newline Delimited JSON可以翻译成“换行分隔的 JSON”。它的格式非常简单{type:delta,delta:你好} {type:delta,delta:我是} {type:delta,delta: X-Chat AI} {type:done,content:你好我是 X-Chat AI}每一行都是一个完整的 JSON 对象行与行之间用换行符\n分隔。这和普通 JSON 最大的区别是普通 JSON 通常要等整个数据结构完整之后才能解析比如[ {type:delta,delta:你好}, {type:delta,delta:我是}, {type:done,content:你好我是 X-Chat AI} ]这种数组形式必须等最后的]出现整体才是合法 JSON。对于 AI 回复来说这就不够友好因为模型可能要几秒钟才生成完整答案。而 NDJSON 是“一行一个事件”。服务端每生成一点内容就可以立刻发一行客户端读到一行就可以立刻处理一行。这样用户看到的就是类似 ChatGPT 的“打字机效果”。3. 为什么这个项目适合用 NDJSONAI 回复天然是流式的。用户问一句话模型不是一次性吐出完整答案而是不断生成 token 或文本片段。项目希望前端可以边生成边展示而不是等模型完全答完才显示。因此Python AI 服务不能只返回一个普通 JSON{content:完整回答}因为这种方式必须等完整回答生成完。更适合的方式是{type:delta,delta:第一小段} {type:delta,delta:第二小段} {type:delta,delta:第三小段} {type:done,content:完整回答}这正是 NDJSON 的优势结构简单每行独立可解析适合 HTTP 长连接Java、Python、JavaScript 都很好处理不需要前后端直接建立 AI 专用 WebSocket。4. 项目里的完整调用链路在 X-Chat 中一次 AI 回复大致经历下面几步用户发送消息给 X-Chat AI - Vue 调用 Java 后端 /api/chat/sendMessage - Java 判断这是发给 AI 机器人的消息 - 用户消息先落库并推送 - 事务提交后Java 异步调用 Python /chat/stream - Python 一行一行返回 NDJSON 事件 - Java 用 OkHttp 按行读取 - Java 把事件转换成 WebSocket 消息 - Vue 根据 messageId 更新同一条 AI 消息这里要注意一个关键点NDJSON 只存在于 Python 到 Java 这一段。Java 到前端不是 NDJSON而是 WebSocket 事件。也就是说Python FastAPI --NDJSON over HTTP-- Java Spring Boot --WebSocket-- Vue 前端5. Python 端如何输出 NDJSON对应文件X-RAG Agent/api.pyPython 的流式接口是app.post(/chat/stream) def chat_stream(request: ChatStreamRequest): return StreamingResponse( stream_xchat_answer(request), media_typeapplication/x-ndjson; charsetutf-8, )这里有两个重点。第一个重点是StreamingResponse。普通接口一般是先算完结果再一次性返回。但StreamingResponse可以接收一个生成器。生成器每yield一段内容FastAPI 就可以把这一段内容写回 HTTP 响应里。第二个重点是media_typemedia_typeapplication/x-ndjson; charsetutf-8这表示接口返回的是 NDJSON并且使用 UTF-8 编码。项目里还有一个非常关键的函数def encode_event(payload: dict) - str: return json.dumps(payload, ensure_asciiFalse) \n这个函数做了两件事把 Python 字典转成 JSON 字符串在末尾加上\n。其中\n就是 NDJSON 的核心。没有这个换行Java 后端就没法稳定地按行读取一个个事件。6. Python 端流式事件有哪些这个项目中常见的 NDJSON 事件有五类。6.1 delta增量文本{type:delta,delta:回答片段}delta表示 AI 新生成的一小段文本。Java 收到后会立刻推给前端前端把它追加到当前 AI 消息后面。6.2 done回答完成{type:done,content:完整回答}done表示本次 AI 回复结束。它通常会带上完整内容方便 Java 后端最终落库。6.3 error错误事件{type:error,message:错误原因}如果 Python 侧模型调用、知识库检索或其他逻辑出错就可以通过error告诉 Java。Java 再把占位消息更新成用户可读的错误提示。6.4 tool_call工具开始执行{ type: tool_call, tool_name: rag_summarize, display_name: 知识库检索, status: start, message: 正在检索知识库 }这个事件表示 AI 准备调用某个工具例如知识库检索、意图判断等。6.5 tool_result工具执行结果{ type: tool_result, tool_name: rag_summarize, display_name: 知识库检索, status: success, summary: 命中 3 条知识库片段 }这个事件表示工具执行完成并返回一个摘要。前端可以把它展示成“执行轨迹”让用户知道 AI 不是凭空回答而是先进行了检索或判断。7. Python 端普通 Agent 回复是怎么流式输出的项目里的普通 Agent 流式输出逻辑可以简化成这样def stream_agent(prompt: str): full_text [] try: for delta in get_agent().execute_stream(prompt): if not delta: continue full_text.append(delta) yield encode_event({type: delta, delta: delta}) yield encode_event({type: done, content: .join(full_text)}) except Exception as exc: yield encode_event({type: error, message: str(exc)})这段代码的思路非常清楚模型每生成一小段deltaPython 就马上yield一个delta事件同时用full_text累计完整答案生成结束后再发一个done事件如果异常则发error事件。换成更直观的例子假设模型生成了你好我是 X-Chat AI。Python 可能实际返回{type:delta,delta:你好} {type:delta,delta:我是} {type:delta,delta: X-Chat AI。} {type:done,content:你好我是 X-Chat AI。}8. Python 端RAG 问答为什么还有 tool_call 和 tool_resultX-Chat 不只是普通 AI 聊天它还支持知识库问答。比如用户问项目里的 Netty WebSocket 是怎么推送消息的这类问题可能需要先查知识库再组织答案。项目中的 RAG 链路大致是意图判断 - 知识库检索 - 命中片段 - 生成回答 - 返回来源和完整答案因此Python 不只返回文本还会返回工具事件。例如{type:tool_call,tool_name:intent_router,display_name:意图判断,status:start,message:正在判断应该使用哪类能力} {type:tool_result,tool_name:intent_router,display_name:意图判断,status:success,summary:识别为知识库问答} {type:tool_call,tool_name:rag_summarize,display_name:知识库检索,status:start,message:正在检索知识库} {type:tool_result,tool_name:rag_summarize,display_name:知识库检索,status:success,summary:命中 3 条知识库片段} {type:delta,delta:根据知识库资料Netty WebSocket 在项目中负责...} {type:done,content:根据知识库资料Netty WebSocket 在项目中负责...}这样前端就可以展示类似正在判断意图... 识别为知识库问答 正在检索知识库... 命中 3 条知识库片段 AI 正在回答...对于用户来说这种过程展示会更可信也更容易理解 AI 为什么这么回答。9. Java 端如何按行读取 NDJSON对应文件src/main/java/com/xchat/backend/ai/AiFastApiClient.javaJava 后端通过 OkHttp 调用 PythonRequest request new Request.Builder() .url(buildUrl(/chat/stream)) .post(RequestBody.create(json, JSON)) .build(); try (Response response client.newCall(request).execute()) { ResponseBody body response.body(); readNdjson(body.source(), listener); }真正处理 NDJSON 的逻辑可以简化成这样private void readNdjson(BufferedSource source, StreamListener listener) throws IOException { String line; while ((line source.readUtf8Line()) ! null) { if (line.isBlank()) { continue; } JsonNode node objectMapper.readTree(line); String type node.path(type).asText(); if (delta.equals(type)) { listener.onDelta(node.path(delta).asText()); } else if (tool_call.equals(type)) { listener.onToolCall(node); } else if (tool_result.equals(type)) { listener.onToolResult(node); } else if (done.equals(type)) { listener.onDone(node.path(content).asText()); } else if (error.equals(type)) { throw new IOException(node.path(message).asText(AI service error)); } } }这段代码就是 NDJSON 协议在 Java 端的核心。它的处理步骤是readUtf8Line()读取一行空行跳过用ObjectMapper把这一行解析成 JSON根据type分发给不同回调每读到一个delta就立即通知上层。这里有一个容易忽略的点Java 不是等 Python 整个响应结束后才处理而是 Python 发一行Java 就可以读一行。这正是流式效果成立的关键。10. Java 端为什么还要把 NDJSON 转成 WebSocket因为前端没有直接连接 Python 服务。前端连接的是 Java 后端的 WebSocket。所以 Java 收到 Python 的 NDJSON 事件后需要转换成前端能识别的 WebSocket 事件。对应文件src/main/java/com/xchat/backend/ai/AiChatService.java核心逻辑可以简化成aiFastApiClient.streamChat(request, new AiFastApiClient.StreamListener() { Override public void onDelta(String delta) { contentBuilder.append(delta); pushAiDelta(userId, replyMessage.getMessageId(), delta); } Override public void onDone(String content) { doneContent[0] content; } Override public void onToolCall(JsonNode event) { pushAiToolEvent(userId, replyMessage.getMessageId(), ai_tool_call, event); } Override public void onToolResult(JsonNode event) { pushAiToolEvent(userId, replyMessage.getMessageId(), ai_tool_result, event); } });这里可以看到映射关系Python NDJSON 事件Java 回调前端 WebSocket 事件deltaonDeltaai_stream_deltadoneonDoneai_stream_doneerror抛异常后处理ai_stream_errortool_callonToolCallai_tool_calltool_resultonToolResultai_tool_result也就是说Java 后端在这里扮演了一个“协议转换器”的角色NDJSON 事件 - Java 回调 - WebSocket 事件11. Java 端为什么要先创建一条 AI 占位消息在真正开始读取 Python 流之前Java 会先创建一条 AI 回复消息。此时这条消息可能还是空内容它的作用是给前端一个稳定的messageId。然后 Java 先推送{type:ai_stream_start,data:{messageId:123,messageContent:}}前端收到后先在聊天窗口里显示一个空的 AI 气泡。后面每次收到deltaJava 都带着同一个messageId推给前端{type:ai_stream_delta,data:{messageId:123,delta:你好}} {type:ai_stream_delta,data:{messageId:123,delta:我是}} {type:ai_stream_delta,data:{messageId:123,delta: X-Chat AI}}前端根据messageId123找到同一条消息并把内容不断追加上去。这样做有一个非常重要的好处多个 delta 不会变成多条聊天气泡而是合并成同一条 AI 消息。如果没有占位消息和messageId前端可能会显示成这样AI你好 AI我是 AI X-Chat AI这是错误体验。正确体验应该是AI你好我是 X-Chat AI12. 前端如何把 delta 追加到同一条消息对应文件frontend/src/api/websocket.js frontend/src/stores/chatStore.js前端收到ai_stream_start时把空消息加入消息列表if (data.type ai_stream_start) { chatStore.addMessage(data.data) }收到ai_stream_delta时不新增消息而是追加内容if (data.type ai_stream_delta) { chatStore.appendAiDelta(data.data.messageId, data.data.delta) }appendAiDelta的核心逻辑是appendAiDelta(messageId, delta) { if (!delta) return const target this.findMessageById(messageId) if (!target) return target.messageContent (target.messageContent || ) delta target.status 0 }通过这段代码理解流式 UI根据messageId找到那条 AI 占位消息把新来的delta拼到messageContent后面Vue 响应式数据变化页面自动刷新用户就看到了“文字逐渐出现”的效果。13. 一次完整示例假设用户问知识库中 Netty 是什么13.1 Vue 调用 Java前端先调用普通发送消息接口POST /api/chat/sendMessage此时前端并不直接请求 Python。13.2 Java 识别这是 AI 消息Java 发现接收人是固定 AI 联系人于是先保存用户消息推送用户消息等事务提交后异步启动 AI 回复。这样即使 AI 服务慢也不会阻塞用户发送消息。13.3 Java 请求 PythonJava 向 Python/chat/stream发送请求体大致如下{ mode: chat, user_id: 10001, message: 知识库中 Netty 是什么, context_messages: [], scope_type: global, scope_id: }这些字段的含义是字段含义modeAI 工作模式例如普通聊天、上下文问答、聊天总结user_id当前用户 IDmessage用户问题context_messages最近聊天上下文scope_type知识库范围例如全局或群聊scope_id具体群 ID或全局为空13.4 Python 返回 NDJSONPython 可能返回{type:tool_call,tool_name:intent_router,display_name:意图判断,status:start,message:正在判断应该使用哪类能力} {type:tool_result,tool_name:intent_router,display_name:意图判断,status:success,summary:识别为知识库问答} {type:tool_call,tool_name:rag_summarize,display_name:知识库检索,status:start,message:正在检索知识库} {type:tool_result,tool_name:rag_summarize,display_name:知识库检索,status:success,summary:命中 3 条知识库片段} {type:delta,delta:根据知识库资料Netty 是一个高性能网络通信框架...} {type:done,content:根据知识库资料Netty 是一个高性能网络通信框架...}13.5 Java 转成 WebSocketJava 再推给前端{type:ai_stream_start,data:{messageId:123,messageContent:}} {type:ai_tool_call,data:{messageId:123,displayName:意图判断,status:start}} {type:ai_tool_result,data:{messageId:123,displayName:意图判断,summary:识别为知识库问答}} {type:ai_tool_call,data:{messageId:123,displayName:知识库检索,status:start}} {type:ai_tool_result,data:{messageId:123,displayName:知识库检索,summary:命中 3 条知识库片段}} {type:ai_stream_delta,data:{messageId:123,delta:根据知识库资料Netty 是一个高性能网络通信框架...}} {type:ai_stream_done,data:{messageId:123,messageContent:根据知识库资料Netty 是一个高性能网络通信框架...}}13.6 Vue 更新页面前端最终做三件事ai_stream_start创建空 AI 气泡ai_stream_delta不断追加文本ai_stream_done用后端最终保存的消息覆盖本地临时消息。最终用户看到的就是一条逐步生成的 AI 回复。14. 这里为什么不用普通 HTTP JSON普通 HTTP JSON 当然也能实现 AI 回复但用户体验会差一些。普通 JSON 的流程是用户提问 - 后端等待 AI 生成完整答案 - 一次性返回完整 JSON - 前端显示答案这意味着用户在等待过程中什么也看不到。NDJSON 的流程是用户提问 - 后端收到第一段内容 - 前端显示第一段 - 后端收到第二段内容 - 前端追加第二段 - ... - 最终完成这会让用户感觉系统更快、更流畅。15. 那为什么不用 SSESSE也就是 Server-Sent Events也常用于服务端向客户端推送流式数据。它的格式通常是data: {delta:你好} data: {delta:我是}SSE 很适合浏览器直接接收服务端流式事件。但在这个项目里前端并不直接请求 Python而是 Java 后端作为中间层Python - Java - VueJava 只需要简单地按行读取 Python 的事件然后再转 WebSocket。NDJSON 在这种“服务端到服务端”的场景里非常轻量处理起来也很直接。所以这个项目的选择是合理的Python 到 Java用 NDJSON简单、轻量、易解析Java 到 Vue用已有 Netty WebSocket和聊天系统实时推送能力复用。16. 那为什么不让 Python 直接 WebSocket 推给前端表面上看Python 直接 WebSocket 给前端也可以实现流式回复。但这样会带来几个问题前端要同时连接 Java 和 Python 两套服务Python 需要理解用户身份、群权限、会话权限消息落库和最终状态同步会变复杂AI 服务会和业务系统强耦合。X-Chat 的设计是让 Java 作为统一入口前端只信任 Java Java 再调用 Python Python 只负责 AI 能力这种方式更适合业务型项目。17. 项目里的一个细节事务提交后再调用 AIJava 后端并不是一收到用户消息就立刻调用 AI而是在用户消息落库并且事务提交后才启动 AI 回复。这个设计很重要。如果事务还没提交AI 就开始回复可能出现一种尴尬情况用户消息写库失败了 但 AI 回复已经生成并推给前端了这样聊天记录就会出现不一致。所以项目中使用了类似这样的逻辑TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { Override public void afterCommit() { aiChatService.scheduleChatReply(userId, messageContent); } });意思是只有数据库事务真正提交成功后才启动 AI 回复。18. 项目里的另一个细节异常也要更新占位消息流式系统里很容易出现一种问题前端已经创建了 AI 占位气泡但后端调用 AI 失败了。如果不处理异常用户就会看到一条永远空白或一直加载中的消息。项目里的做法是先创建 AI 占位消息如果 Python 正常返回就不断追加 delta如果 Python 抛错就把占位消息更新成错误提示再通过ai_stream_error推给前端。这说明流式系统不能只考虑“成功路径”还必须考虑Python 服务未启动模型接口失败RAG 检索失败HTTP 流中断前端没有找到对应 messageId。19. 一个可以继续优化的点sources 事件Python 的 RAG 链路中会返回sources事件用于携带知识库来源信息例如文档名、片段标题等。不过 Java 当前主要处理delta / done / error / tool_call / tool_result对于未知事件会忽略。因此如果想进一步优化前端展示可以扩展一条链路Python sources - Java onSources - WebSocket ai_sources - 前端挂到同一条 message 的 sources 字段 - 页面展示引用来源这样用户不仅能看到“命中 3 条知识库片段”还能看到具体引用来自哪些文档。这是一个很适合作为后续功能增强的小任务。20. 项目重点学到了什么通过这个项目可以把 NDJSON 流式协议理解成一句话服务端把多个 JSON 事件按行输出客户端按行读取并立即处理从而实现流式效果。在 X-Chat 中它的落地方式是PythonStreamingResponse yield 每行 JSON \n JavaOkHttp readUtf8Line ObjectMapper listener 回调 前端WebSocket messageId 追加 delta 到同一条消息整个链路最核心的不是某一个框架而是这几个设计思想一行一个事件每条 JSON 都能独立解析事件类型分发通过type区分 delta、done、error、tool_call、tool_result后端协议转换Java 把 Python 的 NDJSON 转成前端 WebSocket同一消息聚合前端通过messageId把多个增量合成一条 AI 回复最终状态落库done 后使用完整内容保存保证刷新后聊天记录一致异常兜底流式过程中失败也要更新前端状态。21. 总结NDJSON 并不复杂它甚至可以说是最简单的流式协议之一JSON 换行但放到真实项目里它能解决一个很实际的问题AI 回复生成慢用户不想等到最后才看到结果。X-Chat 项目中Python FastAPI 使用StreamingResponse一行一行返回 NDJSONJava 后端使用 OkHttp 按行读取并把事件转换成 WebSocketVue 前端根据messageId把增量文本追加到同一条消息里。这套设计既保持了 AI 服务和业务系统的解耦又实现了自然的流式聊天体验。最后用一张简化图收尾用户提问 ↓ Vue 发送消息 ↓ Spring Boot 落库 异步调用 AI ↓ FastAPI 返回 NDJSON {type:delta,delta:...}\n {type:done,content:...}\n ↓ Spring Boot 按行读取并转 WebSocket ↓ Vue 按 messageId 追加文本 ↓ 用户看到流式 AI 回复核心模型生成一点发送一行读取一行更新一次。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2596387.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!