LangGraph 重构个人知识库问答系统(稳定 + 可扩展版)
用 LangGraph 把之前的 RAG 系统重构为模块化、可扩展、带持久化、带错误处理的生产级架构。核心设计思想是节点解耦、状态清晰、流程灵活、易于扩展。一、系统架构设计可扩展核心1. 核心流程图结构用户提问 → 检索文档 → 生成回答 → 反思检查可选→ 结束 ↓失败 ↓失败 重试/兜底 备用模型2. 模块化节点设计方便扩展节点职责可扩展方向retrieve从向量库检索文档多路召回、重排序、元数据过滤generate基于检索结果生成回答多模型切换、Prompt 模板切换reflect反思检查回答质量可选多维度校验、人工介入fallback兜底节点失败时调用返回固定回复、搜索网络3. 状态设计清晰 可扩展class RAGState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] # 对话历史 question: str # 用户原始问题 context: str # 检索到的文档上下文 answer: str # 生成的回答 loop_count: int # 防死循环计数 error: str | None # 错误信息 user_id: str # 用户ID绑定长期记忆二、完整代码实现直接复制运行1. 安装依赖pip install -U langgraph langchain langchain-zhipu langchain-community chromadb python-dotenv pydantic-settings2. 完整代码import os from typing import TypedDict, Sequence, Literal, Annotated from dotenv import load_dotenv from langchain_core.messages import BaseMessage, HumanMessage, AIMessage from langchain_core.tools import tool from langchain_core.prompts import ChatPromptTemplate from langchain_zhipu import ChatZhipuAI from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader from langgraph.constants import START, END from langgraph.graph import StateGraph, add_messages from langgraph.checkpoint.memory import MemorySaver from pydantic_settings import BaseSettings # 1. 配置管理Pydantic Settings避免硬编码 class Settings(BaseSettings): ZHIPU_API_KEY: str LLM_MODEL: str glm-4.6 EMBEDDING_MODEL: str BAAI/bge-small-zh-v1.5 CHROMA_DB_DIR: str ./chroma_db DOCS_DIR: str ./docs CHUNK_SIZE: int 1000 CHUNK_OVERLAP: int 200 RETRIEVE_TOP_K: int 3 LLM_TIMEOUT: int 15 MAX_LOOP_COUNT: int 3 class Config: env_file .env settings Settings() # 2. 状态定义清晰可扩展 class RAGState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] question: str context: str answer: str loop_count: int error: str | None user_id: str # 3. 向量库初始化模块化方便切换 def init_vector_store(): 初始化或加载向量库 # 初始化嵌入模型 embeddings HuggingFaceBgeEmbeddings( model_namesettings.EMBEDDING_MODEL, model_kwargs{device: cpu}, encode_kwargs{normalize_embeddings: True} ) # 如果向量库已存在直接加载 if os.path.exists(settings.CHROMA_DB_DIR) and len(os.listdir(settings.CHROMA_DB_DIR)) 0: print(✅ 加载已有向量库) return Chroma( persist_directorysettings.CHROMA_DB_DIR, embedding_functionembeddings ) # 否则构建新向量库 print( 构建新向量库) os.makedirs(settings.DOCS_DIR, exist_okTrue) # 加载文档 loader DirectoryLoader( settings.DOCS_DIR, glob*.pdf, loader_clsPyPDFLoader, show_progressTrue ) documents loader.load() # 分割文档 text_splitter RecursiveCharacterTextSplitter( chunk_sizesettings.CHUNK_SIZE, chunk_overlapsettings.CHUNK_OVERLAP, separators[\n\n, \n, 。, , , , ] ) chunks text_splitter.split_documents(documents) # 构建向量库 vector_store Chroma.from_documents( documentschunks, embeddingembeddings, persist_directorysettings.CHROMA_DB_DIR ) vector_store.persist() print(✅ 向量库构建完成) return vector_store # 全局向量库实例 vector_store init_vector_store() # 4. LLM 初始化主模型备用模型 # 主 LLM llm ChatZhipuAI( api_keysettings.ZHIPU_API_KEY, modelsettings.LLM_MODEL, temperature0, timeoutsettings.LLM_TIMEOUT, max_retries0 ) # 备用 LLM更轻量、更稳定 backup_llm ChatZhipuAI( api_keysettings.ZHIPU_API_KEY, modelglm-4-flash, temperature0, timeout10 ) # RAG Prompt 模板模块化方便修改 RAG_PROMPT ChatPromptTemplate.from_template( 你是专业的文档问答助手必须严格遵守以下规则 1. 仅使用下方【文档上下文】中的内容回答用户问题绝对禁止编造。 2. 如果文档上下文中没有相关内容直接回复「抱歉文档中没有相关内容。」 3. 回答简洁、准确、条理清晰。 【文档上下文】 {context} 【用户问题】 {question} ) # 5. 节点定义解耦错误处理可扩展 def retrieve_node(state: RAGState): 检索节点从向量库检索相关文档 print(\n-- 执行节点文档检索 --) question state[question] error None context try: # 检索 Top-K 文档 docs vector_store.similarity_search(question, ksettings.RETRIEVE_TOP_K) context \n\n.join([f第{i1}段{doc.page_content} for i, doc in enumerate(docs)]) print(f✅ 检索到 {len(docs)} 段文档) except Exception as e: print(f❌ 检索失败{str(e)}) error str(e) context 文档检索失败请稍后再试。 return { context: context, loop_count: state[loop_count] 1, error: error } def generate_node(state: RAGState): 生成节点基于检索结果生成回答 print(\n-- 执行节点生成回答 --) question state[question] context state[context] error None answer try: # 构造 Prompt 并调用 LLM prompt RAG_PROMPT.format(contextcontext, questionquestion) response llm.invoke(prompt) answer response.content print(✅ 回答生成完成) except Exception as e: print(f❌ 生成失败{str(e)}) error str(e) answer 抱歉回答生成失败请稍后再试。 return { answer: answer, messages: [AIMessage(contentanswer)], loop_count: state[loop_count] 1, error: error } def backup_generate_node(state: RAGState): 备用生成节点主节点失败时调用 print(\n-- 切换到备用生成节点 --) question state[question] context state[context] error None answer try: prompt RAG_PROMPT.format(contextcontext, questionquestion) response backup_llm.invoke(prompt) answer response.content except Exception as e: print(f❌ 备用生成也失败{str(e)}) error str(e) answer 非常抱歉服务暂时不可用请稍后再试。 return { answer: answer, messages: [AIMessage(contentanswer)], loop_count: state[loop_count] 1, error: error } # 6. 条件路由灵活决策 def should_continue(state: RAGState) - Literal[retrieve, generate, backup_generate, end]: 条件判断决定下一步走哪个节点 # 优先处理错误 if state.get(error): if state[loop_count] 2: if 检索 in state[error]: return retrieve # 检索失败重试检索 else: return backup_generate # 生成失败走备用 else: return end # 失败次数过多结束 # 正常流程 if not state.get(context): return retrieve # 还没检索先检索 elif not state.get(answer): return generate # 还没生成先生成 else: print(→ 结束流程完成) return end # 7. 构建图模块化可扩展 def build_rag_graph(): builder StateGraph(RAGState) # 添加节点 builder.add_node(retrieve, retrieve_node) builder.add_node(generate, generate_node) builder.add_node(backup_generate, backup_generate_node) # 边开始 → 条件判断 builder.add_edge(START, retrieve) # 条件边根据状态决定下一步 builder.add_conditional_edges( retrieve, should_continue, { retrieve: retrieve, generate: generate, backup_generate: backup_generate, end: END } ) builder.add_conditional_edges( generate, should_continue, { backup_generate: backup_generate, end: END } ) builder.add_conditional_edges( backup_generate, should_continue, { end: END } ) # 编译图带持久化 checkpointer MemorySaver() return builder.compile(checkpointercheckpointer) # 全局 RAG Agent 实例 rag_agent build_rag_graph() # 8. 测试运行会话恢复错误处理 if __name__ __main__: print( LangGraph 重构版 RAG 知识库问答系统 ) # 配置同一个 thread_id 可恢复会话 config { configurable: { thread_id: rag_session_001, user_id: user_001 } } # 第一轮对话 print(\n--- 第一轮对话 ---) initial_state { messages: [], question: 什么是RAG技术, context: , answer: , loop_count: 0, error: None, user_id: user_001 } result1 rag_agent.invoke(initial_state, configconfig) print(f第一轮答案{result1[answer]}) # 第二轮对话恢复会话 print(\n--- 第二轮对话恢复会话---) result2 rag_agent.invoke( { messages: [HumanMessage(contentRAG有什么优势)], question: RAG有什么优势, loop_count: 0 }, configconfig ) print(f第二轮答案{result2[answer]}三、系统优势稳定 可扩展1. 稳定性错误处理每个节点都有try-except失败时返回友好提示备用方案主 LLM 失败时自动切换到更轻量的备用模型防死循环loop_count限制最大循环次数超时控制LLM 调用设置超时防止卡死会话持久化用MemorySaver实现会话恢复重启不丢2. 可扩展性节点解耦检索、生成、反思等节点独立方便修改和替换模块化设计向量库、LLM、Prompt 都封装成独立模块方便切换条件边灵活可以轻松添加新节点和新的条件分支状态清晰State 定义明确方便添加新字段如检索分数、重排序结果四、扩展建议让系统更强大1. 加多路召回和重排序# 在 retrieve_node 中添加 def retrieve_node(state: RAGState): # ... 原有检索逻辑 ... # 多路召回同时用语义检索和关键词检索 # 重排序用 BGE-Rerank 对召回结果二次精排 ...2. 加反思检查节点def reflect_node(state: RAGState): 反思节点检查回答质量 # 调用 LLM 检查回答是否完整、准确 # 如果不合格回到生成节点重新生成 ...3. 加工具调用搜索网络tool def web_search(query: str) - str: 搜索网络获取最新信息 # 调用搜索引擎 API ... # 在图中添加工具节点 builder.add_node(tool_executor, ToolNode([web_search]))4. 切换到 Redis 持久化from langgraph.checkpoint.redis import RedisSaver # 替换 MemorySaver checkpointer RedisSaver.from_url(redis://localhost:6379/0)五、使用说明把你的 PDF 文档放入docs目录运行代码第一次会自动构建向量库后续运行会直接加载已有向量库用同一个thread_id可以恢复之前的会话
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2592833.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!