作者简介
我是摘星,一名专注于云计算和AI技术的开发者。本次通过华为云MaaS平台体验DeepSeek系列模型,将实际使用经验分享给大家,希望能帮助开发者快速掌握华为云AI服务的核心能力。
目录
作者简介
1. 引言
2. 技术选型与架构设计
2.1 技术栈选择
2.2 系统架构设计
2.3 核心工作流程
3. 环境准备与部署
3.1 华为云Flexus环境配置
3.2 基础环境安装
3.3 Python依赖安装
4. 核心功能实现
4.1 文档预处理模块
4.2 向量数据库模块
4.3 DeepSeek-V3集成模块
4.4 问答引擎核心模块
5. Web服务接口实现
5.1 FastAPI后端服务
5.2 Streamlit前端界面
6. 容器化部署
6.1 Docker配置
6.2 Docker Compose配置
7. 系统优化与监控
7.1 性能优化策略
7.2 监控与日志系统
8. 测试与验证
8.1 单元测试
8.2 性能测试
9. 部署指南
9.1 生产环境部署
9.2 Nginx配置
10. 总结与展望
10.1 项目总结
10.2 使用效果分析
10.3 未来改进方向
10.4 技术要点回顾
参考文献
1. 引言
随着人工智能技术的快速发展,企业对于智能化知识管理和问答系统的需求日益增长。传统的知识管理方式往往存在信息检索效率低、知识分散、更新不及时等问题。而基于大语言模型的知识库问答机器人能够有效解决这些痛点,为企业提供高效、准确的知识服务。
本文将详细介绍如何利用华为云Flexus云服务器和DeepSeek-V3大语言模型,构建一个功能完善的企业知识库问答机器人。通过本实战项目,您将学会:
- 如何设计企业级知识库问答系统架构
- 如何部署和配置DeepSeek-V3模型
- 如何实现知识库的向量化存储和检索
- 如何构建用户友好的问答界面
- 如何优化系统性能和准确性
图1: 企业知识库问答机器人系统架构图
2. 技术选型与架构设计
2.1 技术栈选择
在构建企业知识库问答机器人时,我们选择以下技术栈:
- 云平台: 华为云Flexus云服务器
- 大语言模型: DeepSeek-V3
- 向量数据库: ChromaDB
- 文本嵌入: Sentence-Transformers
- Web框架: FastAPI + Streamlit
- 数据处理: Python + pandas
- 容器化: Docker
2.2 系统架构设计
整个系统采用模块化设计,主要包含以下几个核心组件:
图1: 企业知识库问答机器人系统架构图
2.3 核心工作流程
系统的核心工作流程如下:
图2: 系统工作流程图
3. 环境准备与部署
3.1 华为云Flexus环境配置
首先,我们需要在华为云上创建Flexus云服务器实例:
# 推荐配置
# CPU: 8核
# 内存: 32GB
# 存储: 500GB SSD
# 操作系统: Ubuntu 22.04 LTS
3.2 基础环境安装
在服务器上安装必要的软件环境:
#!/bin/bash
# 更新系统包
sudo apt update && sudo apt upgrade -y
# 安装Python 3.11
sudo apt install python3.11 python3.11-pip python3.11-venv -y
# 安装Docker
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
# 安装Docker Compose
sudo curl -L "https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
# 创建项目目录
mkdir -p /opt/enterprise-qa-bot
cd /opt/enterprise-qa-bot
3.3 Python依赖安装
创建虚拟环境并安装依赖:
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
streamlit==1.28.1
chromadb==0.4.15
sentence-transformers==2.2.2
openai==1.3.6
pandas==2.1.3
numpy==1.24.3
pydantic==2.5.0
python-multipart==0.0.6
aiofiles==23.2.1
langchain==0.0.340
langchain-community==0.0.1
python-docx==0.8.11
PyPDF2==3.0.1
tiktoken==0.5.1
# 创建虚拟环境
python3.11 -m venv venv
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
4. 核心功能实现
4.1 文档预处理模块
首先实现文档预处理模块,用于处理各种格式的企业文档:
# document_processor.py
import os
import pandas as pd
from typing import List, Dict
from pathlib import Path
import PyPDF2
from docx import Document
import tiktoken
class DocumentProcessor:
"""
文档预处理器类
支持处理PDF、Word、TXT等格式的文档
"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
"""
初始化文档处理器
Args:
chunk_size: 文本块大小
chunk_overlap: 文本块重叠大小
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.encoding = tiktoken.get_encoding("cl100k_base")
def extract_text_from_pdf(self, file_path: str) -> str:
"""
从PDF文件中提取文本
Args:
file_path: PDF文件路径
Returns:
提取的文本内容
"""
text = ""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
except Exception as e:
print(f"处理PDF文件 {file_path} 时出错: {str(e)}")
return text
def extract_text_from_docx(self, file_path: str) -> str:
"""
从Word文档中提取文本
Args:
file_path: Word文档路径
Returns:
提取的文本内容
"""
text = ""
try:
doc = Document(file_path)
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
except Exception as e:
print(f"处理Word文档 {file_path} 时出错: {str(e)}")
return text
def extract_text_from_txt(self, file_path: str, encoding: str = 'utf-8') -> str:
"""
从文本文件中提取内容
Args:
file_path: 文本文件路径
encoding: 文件编码
Returns:
文件内容
"""
try:
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
except UnicodeDecodeError:
# 尝试其他编码
for enc in ['gbk', 'gb2312', 'latin-1']:
try:
with open(file_path, 'r', encoding=enc) as file:
return file.read()
except UnicodeDecodeError:
continue
except Exception as e:
print(f"处理文本文件 {file_path} 时出错: {str(e)}")
return ""
def split_text_into_chunks(self, text: str, metadata: Dict = None) -> List[Dict]:
"""
将文本分割成块
Args:
text: 输入文本
metadata: 文档元数据
Returns:
文本块列表
"""
if not text.strip():
return []
# 使用tiktoken计算token数量
tokens = self.encoding.encode(text)
chunks = []
start = 0
chunk_id = 0
while start < len(tokens):
# 确定当前块的结束位置
end = min(start + self.chunk_size, len(tokens))
# 解码当前块
chunk_tokens = tokens[start:end]
chunk_text = self.encoding.decode(chunk_tokens)
# 创建块数据
chunk_data = {
'id': f"{metadata.get('filename', 'unknown')}_{chunk_id}",
'text': chunk_text.strip(),
'metadata': {
**(metadata or {}),
'chunk_id': chunk_id,
'start_token': start,
'end_token': end
}
}
chunks.append(chunk_data)
# 移动到下一个块的开始位置(考虑重叠)
start = end - self.chunk_overlap
chunk_id += 1
return chunks
def process_documents(self, document_dir: str) -> List[Dict]:
"""
批量处理文档目录
Args:
document_dir: 文档目录路径
Returns:
处理后的文档块列表
"""
all_chunks = []
# 支持的文件格式
supported_extensions = {'.pdf', '.docx', '.txt', '.md'}
# 遍历文档目录
for file_path in Path(document_dir).rglob('*'):
if file_path.suffix.lower() in supported_extensions:
print(f"正在处理文件: {file_path}")
# 根据文件格式提取文本
if file_path.suffix.lower() == '.pdf':
text = self.extract_text_from_pdf(str(file_path))
elif file_path.suffix.lower() == '.docx':
text = self.extract_text_from_docx(str(file_path))
else: # .txt, .md
text = self.extract_text_from_txt(str(file_path))
# 准备元数据
metadata = {
'filename': file_path.name,
'filepath': str(file_path),
'file_type': file_path.suffix.lower(),
'file_size': file_path.stat().st_size
}
# 分割文本并添加到结果中
chunks = self.split_text_into_chunks(text, metadata)
all_chunks.extend(chunks)
print(f"文件 {file_path.name} 处理完成,生成 {len(chunks)} 个文本块")
print(f"总共处理了 {len(all_chunks)} 个文本块")
return all_chunks
4.2 向量数据库模块
实现向量数据库管理模块,用于存储和检索文档向量:
# vector_store.py
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional
import numpy as np
class VectorStore:
"""
向量数据库管理类
基于ChromaDB实现文档向量存储和检索
"""
def __init__(self,
persist_directory: str = "./chroma_db",
collection_name: str = "enterprise_knowledge",
embedding_model: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
"""
初始化向量数据库
Args:
persist_directory: 数据库持久化目录
collection_name: 集合名称
embedding_model: 嵌入模型名称
"""
self.persist_directory = persist_directory
self.collection_name = collection_name
# 初始化ChromaDB客户端
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(allow_reset=True)
)
# 初始化嵌入模型
print(f"正在加载嵌入模型: {embedding_model}")
self.embedding_model = SentenceTransformer(embedding_model)
print("嵌入模型加载完成")
# 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"description": "企业知识库向量存储"}
)
def add_documents(self, documents: List[Dict]) -> None:
"""
添加文档到向量数据库
Args:
documents: 文档列表,每个文档包含id、text和metadata
"""
if not documents:
print("没有文档需要添加")
return
# 提取文本内容
texts = [doc['text'] for doc in documents]
ids = [doc['id'] for doc in documents]
metadatas = [doc.get('metadata', {}) for doc in documents]
print(f"正在生成 {len(texts)} 个文档的向量表示...")
# 生成嵌入向量
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=True,
normalize_embeddings=True
).tolist()
print("向量生成完成,正在存储到数据库...")
# 批量添加到ChromaDB
batch_size = 100
for i in range(0, len(documents), batch_size):
batch_ids = ids[i:i+batch_size]
batch_embeddings = embeddings[i:i+batch_size]
batch_texts = texts[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
self.collection.add(
ids=batch_ids,
embeddings=batch_embeddings,
documents=batch_texts,
metadatas=batch_metadatas
)
print(f"已处理 {min(i+batch_size, len(documents))}/{len(documents)} 个文档")
print("所有文档已成功添加到向量数据库")
def search_similar_documents(self,
query: str,
top_k: int = 5,
similarity_threshold: float = 0.7) -> List[Dict]:
"""
检索相似文档
Args:
query: 查询文本
top_k: 返回的文档数量
similarity_threshold: 相似度阈值
Returns:
相似文档列表
"""
# 生成查询向量
query_embedding = self.embedding_model.encode([query], normalize_embeddings=True)[0].tolist()
# 在ChromaDB中搜索
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=['documents', 'metadatas', 'distances']
)
# 格式化结果
similar_docs = []
for i in range(len(results['ids'][0])):
distance = results['distances'][0][i]
similarity = 1 - distance # ChromaDB使用欧几里得距离
# 过滤低相似度的文档
if similarity >= similarity_threshold:
doc = {
'id': results['ids'][0][i],
'text': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'similarity': similarity
}
similar_docs.append(doc)
return similar_docs
def get_collection_stats(self) -> Dict:
"""
获取集合统计信息
Returns:
统计信息字典
"""
count = self.collection.count()
return {
'total_documents': count,
'collection_name': self.collection_name,
'persist_directory': self.persist_directory
}
def clear_collection(self) -> None:
"""清空集合"""
self.client.delete_collection(self.collection_name)
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"description": "企业知识库向量存储"}
)
print("集合已清空")
4.3 DeepSeek-V3集成模块
实现与DeepSeek-V3模型的集成:
# deepseek_client.py
import openai
from typing import List, Dict, Optional
import json
import time
class DeepSeekClient:
"""
DeepSeek-V3模型客户端
提供与DeepSeek模型的交互接口
"""
def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
"""
初始化DeepSeek客户端
Args:
api_key: DeepSeek API密钥
base_url: API基础URL
"""
self.client = openai.OpenAI(
api_key=api_key,
base_url=base_url
)
# 模型配置
self.model_name = "deepseek-chat"
self.max_tokens = 2000
self.temperature = 0.1
def generate_answer(self,
question: str,
context_documents: List[Dict],
system_prompt: Optional[str] = None) -> Dict:
"""
基于上下文文档生成答案
Args:
question: 用户问题
context_documents: 相关上下文文档
system_prompt: 系统提示词
Returns:
生成的答案和元信息
"""
# 构建上下文
context = self._build_context(context_documents)
# 默认系统提示词
if system_prompt is None:
system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。
回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答
上下文信息:
{context}
"""
# 构建消息
messages = [
{
"role": "system",
"content": system_prompt.format(context=context)
},
{
"role": "user",
"content": f"问题:{question}"
}
]
try:
# 调用DeepSeek API
start_time = time.time()
response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
max_tokens=self.max_tokens,
temperature=self.temperature,
stream=False
)
response_time = time.time() - start_time
# 提取回答
answer = response.choices[0].message.content
# 构建返回结果
result = {
'answer': answer,
'question': question,
'context_documents': context_documents,
'response_time': response_time,
'model_info': {
'model': self.model_name,
'temperature': self.temperature,
'max_tokens': self.max_tokens
},
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
}
}
return result
except Exception as e:
print(f"调用DeepSeek API时出错: {str(e)}")
return {
'answer': "抱歉,生成答案时出现错误,请稍后再试。",
'error': str(e),
'question': question
}
def _build_context(self, documents: List[Dict]) -> str:
"""
构建上下文字符串
Args:
documents: 文档列表
Returns:
格式化的上下文字符串
"""
if not documents:
return "暂无相关上下文信息。"
context_parts = []
for i, doc in enumerate(documents, 1):
# 获取文档信息
text = doc.get('text', '')
metadata = doc.get('metadata', {})
similarity = doc.get('similarity', 0)
# 格式化文档
doc_context = f"""
文档{i}(相似度: {similarity:.3f}):
来源:{metadata.get('filename', '未知')}
内容:{text}
---
"""
context_parts.append(doc_context)
return "\n".join(context_parts)
def stream_generate_answer(self,
question: str,
context_documents: List[Dict],
system_prompt: Optional[str] = None):
"""
流式生成答案(用于实时显示)
Args:
question: 用户问题
context_documents: 相关上下文文档
system_prompt: 系统提示词
Yields:
生成的文本片段
"""
# 构建上下文和消息(与generate_answer相同)
context = self._build_context(context_documents)
if system_prompt is None:
system_prompt = """你是一个专业的企业知识库问答助手。请基于提供的上下文信息回答用户的问题。
回答要求:
1. 仅基于提供的上下文信息回答,不要添加无关信息
2. 如果上下文中没有相关信息,请明确说明无法找到相关信息
3. 回答要准确、简洁、有条理
4. 如果可能,请引用具体的文档来源
5. 使用中文回答
上下文信息:
{context}
"""
messages = [
{
"role": "system",
"content": system_prompt.format(context=context)
},
{
"role": "user",
"content": f"问题:{question}"
}
]
try:
# 流式调用API
response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
max_tokens=self.max_tokens,
temperature=self.temperature,
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"错误:{str(e)}"
4.4 问答引擎核心模块
实现整合各个组件的问答引擎:
# qa_engine.py
from typing import List, Dict, Optional
from vector_store import VectorStore
from deepseek_client import DeepSeekClient
from document_processor import DocumentProcessor
import time
class QAEngine:
"""
问答引擎核心类
整合文档处理、向量检索和答案生成功能
"""
def __init__(self,
deepseek_api_key: str,
vector_store_config: Dict = None,
retrieval_config: Dict = None):
"""
初始化问答引擎
Args:
deepseek_api_key: DeepSeek API密钥
vector_store_config: 向量数据库配置
retrieval_config: 检索配置
"""
# 初始化向量数据库
vs_config = vector_store_config or {}
self.vector_store = VectorStore(**vs_config)
# 初始化DeepSeek客户端
self.deepseek_client = DeepSeekClient(api_key=deepseek_api_key)
# 初始化文档处理器
self.doc_processor = DocumentProcessor()
# 检索配置
self.retrieval_config = retrieval_config or {
'top_k': 5,
'similarity_threshold': 0.7
}
# 记录问答历史
self.qa_history = []
def add_knowledge_base(self, documents_dir: str) -> Dict:
"""
添加知识库文档
Args:
documents_dir: 文档目录路径
Returns:
处理结果统计
"""
print(f"开始处理知识库文档目录: {documents_dir}")
start_time = time.time()
# 处理文档
document_chunks = self.doc_processor.process_documents(documents_dir)
if not document_chunks:
return {
'success': False,
'message': '没有找到可处理的文档',
'processed_docs': 0,
'processing_time': 0
}
# 添加到向量数据库
self.vector_store.add_documents(document_chunks)
processing_time = time.time() - start_time
return {
'success': True,
'message': '知识库添加成功',
'processed_docs': len(document_chunks),
'processing_time': processing_time
}
def ask_question(self, question: str, include_sources: bool = True) -> Dict:
"""
回答问题
Args:
question: 用户问题
include_sources: 是否包含来源信息
Returns:
问答结果
"""
start_time = time.time()
# 检索相关文档
print(f"检索问题相关文档: {question}")
relevant_docs = self.vector_store.search_similar_documents(
query=question,
**self.retrieval_config
)
if not relevant_docs:
result = {
'question': question,
'answer': '抱歉,我在知识库中没有找到与您问题相关的信息。请尝试换个问法或确认知识库是否包含相关内容。',
'sources': [],
'retrieval_time': time.time() - start_time,
'relevant_docs_count': 0
}
else:
# 生成答案
print(f"找到 {len(relevant_docs)} 个相关文档,正在生成答案...")
generation_result = self.deepseek_client.generate_answer(
question=question,
context_documents=relevant_docs
)
# 准备来源信息
sources = []
if include_sources:
for doc in relevant_docs:
source_info = {
'filename': doc['metadata'].get('filename', '未知'),
'similarity': doc['similarity'],
'text_preview': doc['text'][:200] + '...' if len(doc['text']) > 200 else doc['text']
}
sources.append(source_info)
result = {
'question': question,
'answer': generation_result.get('answer', '生成答案时出现错误'),
'sources': sources,
'retrieval_time': time.time() - start_time,
'response_time': generation_result.get('response_time', 0),
'relevant_docs_count': len(relevant_docs),
'model_usage': generation_result.get('usage', {}),
'error': generation_result.get('error')
}
# 记录问答历史
self.qa_history.append({
'timestamp': time.time(),
'question': question,
'answer': result['answer'],
'docs_count': result['relevant_docs_count']
})
return result
def stream_ask_question(self, question: str):
"""
流式问答(用于实时显示)
Args:
question: 用户问题
Yields:
答案片段
"""
# 检索相关文档
relevant_docs = self.vector_store.search_similar_documents(
query=question,
**self.retrieval_config
)
if not relevant_docs:
yield "抱歉,我在知识库中没有找到与您问题相关的信息。"
return
# 流式生成答案
for chunk in self.deepseek_client.stream_generate_answer(
question=question,
context_documents=relevant_docs
):
yield chunk
def get_statistics(self) -> Dict:
"""
获取系统统计信息
Returns:
统计信息字典
"""
vector_stats = self.vector_store.get_collection_stats()
return {
'knowledge_base': vector_stats,
'qa_sessions': len(self.qa_history),
'recent_questions': [qa['question'] for qa in self.qa_history[-10:]]
}
def clear_knowledge_base(self) -> None:
"""清空知识库"""
self.vector_store.clear_collection()
print("知识库已清空")
def export_qa_history(self, filename: str = None) -> str:
"""
导出问答历史
Args:
filename: 导出文件名
Returns:
导出文件路径
"""
if filename is None:
filename = f"qa_history_{int(time.time())}.json"
import json
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.qa_history, f, ensure_ascii=False, indent=2)
return filename
5. Web服务接口实现
5.1 FastAPI后端服务
# main.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import os
import tempfile
import shutil
from qa_engine import QAEngine
app = FastAPI(title="企业知识库问答API", version="1.0.0")
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化问答引擎
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not DEEPSEEK_API_KEY:
raise ValueError("请设置DEEPSEEK_API_KEY环境变量")
qa_engine = QAEngine(deepseek_api_key=DEEPSEEK_API_KEY)
# 请求模型
class QuestionRequest(BaseModel):
question: str
include_sources: bool = True
class KnowledgeBaseRequest(BaseModel):
documents_dir: str
# 响应模型
class QuestionResponse(BaseModel):
question: str
answer: str
sources: List[dict] = []
retrieval_time: float
response_time: float = 0
relevant_docs_count: int
error: Optional[str] = None
@app.get("/")
async def root():
"""根路径"""
return {"message": "企业知识库问答API服务正在运行"}
@app.post("/ask", response_model=QuestionResponse)
async def ask_question(request: QuestionRequest):
"""
问答接口
"""
try:
result = qa_engine.ask_question(
question=request.question,
include_sources=request.include_sources
)
return QuestionResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/knowledge-base/add")
async def add_knowledge_base(request: KnowledgeBaseRequest):
"""
添加知识库
"""
try:
if not os.path.exists(request.documents_dir):
raise HTTPException(status_code=404, detail="文档目录不存在")
result = qa_engine.add_knowledge_base(request.documents_dir)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/knowledge-base/upload")
async def upload_documents(files: List[UploadFile] = File(...)):
"""
上传文档到知识库
"""
try:
# 创建临时目录
temp_dir = tempfile.mkdtemp()
# 保存上传的文件
for file in files:
file_path = os.path.join(temp_dir, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 处理文档
result = qa_engine.add_knowledge_base(temp_dir)
# 清理临时目录
shutil.rmtree(temp_dir)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/statistics")
async def get_statistics():
"""
获取系统统计信息
"""
try:
return qa_engine.get_statistics()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/knowledge-base")
async def clear_knowledge_base():
"""
清空知识库
"""
try:
qa_engine.clear_knowledge_base()
return {"message": "知识库已清空"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""
健康检查
"""
return {"status": "healthy", "service": "enterprise-qa-bot"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
5.2 Streamlit前端界面
# streamlit_app.py
import streamlit as st
import requests
import json
import time
from typing import List, Dict
# 配置页面
st.set_page_config(
page_title="企业知识库问答机器人",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# API配置
API_BASE_URL = "http://localhost:8000"
def init_session_state():
"""初始化会话状态"""
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
if 'knowledge_base_stats' not in st.session_state:
st.session_state.knowledge_base_stats = {}
def call_api(endpoint: str, method: str = "GET", data: dict = None, files: dict = None):
"""调用API"""
url = f"{API_BASE_URL}{endpoint}"
try:
if method == "GET":
response = requests.get(url)
elif method == "POST":
if files:
response = requests.post(url, files=files)
else:
response = requests.post(url, json=data)
elif method == "DELETE":
response = requests.delete(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
st.error(f"API调用失败: {str(e)}")
return None
def display_chat_message(role: str, content: str, sources: List[Dict] = None):
"""显示聊天消息"""
with st.chat_message(role):
st.write(content)
# 显示来源信息
if sources and role == "assistant":
with st.expander("📚 参考来源", expanded=False):
for i, source in enumerate(sources, 1):
st.write(f"**来源 {i}**: {source['filename']}")
st.write(f"**相似度**: {source['similarity']:.3f}")
st.write(f"**内容预览**: {source['text_preview']}")
st.write("---")
def main():
"""主函数"""
init_session_state()
# 标题
st.title("🤖 企业知识库问答机器人")
st.caption("基于DeepSeek-V3构建的智能问答系统")
# 侧边栏 - 知识库管理
with st.sidebar:
st.header("📁 知识库管理")
# 上传文档
st.subheader("上传文档")
uploaded_files = st.file_uploader(
"选择文档文件",
type=['pdf', 'docx', 'txt', 'md'],
accept_multiple_files=True,
help="支持PDF、Word、文本和Markdown格式"
)
if st.button("上传并处理文档", disabled=not uploaded_files):
with st.spinner("正在处理文档..."):
files = {"files": [("files", file) for file in uploaded_files]}
result = call_api("/knowledge-base/upload", "POST", files=files)
if result and result.get('success'):
st.success(f"成功处理 {result['processed_docs']} 个文档块")
st.info(f"处理时间: {result['processing_time']:.2f} 秒")
else:
st.error("文档处理失败")
# 知识库统计
st.subheader("知识库统计")
if st.button("刷新统计信息"):
stats = call_api("/statistics")
if stats:
st.session_state.knowledge_base_stats = stats
if st.session_state.knowledge_base_stats:
stats = st.session_state.knowledge_base_stats
kb_stats = stats.get('knowledge_base', {})
st.metric("文档总数", kb_stats.get('total_documents', 0))
st.metric("问答会话", stats.get('qa_sessions', 0))
# 最近问题
recent_questions = stats.get('recent_questions', [])
if recent_questions:
st.write("**最近问题:**")
for q in recent_questions[-5:]:
st.write(f"• {q}")
# 清空知识库
st.subheader("危险操作")
if st.button("🗑️ 清空知识库", type="secondary"):
if st.checkbox("确认清空知识库"):
result = call_api("/knowledge-base", "DELETE")
if result:
st.success("知识库已清空")
st.session_state.chat_history = []
# 主界面 - 聊天
st.header("💬 智能问答")
# 显示聊天历史
for message in st.session_state.chat_history:
display_chat_message(
message["role"],
message["content"],
message.get("sources")
)
# 问题输入
if question := st.chat_input("请输入您的问题..."):
# 显示用户问题
display_chat_message("user", question)
st.session_state.chat_history.append({
"role": "user",
"content": question
})
# 生成回答
with st.chat_message("assistant"):
with st.spinner("正在思考..."):
# 调用问答API
response = call_api("/ask", "POST", {
"question": question,
"include_sources": True
})
if response:
answer = response.get('answer', '生成答案时出现错误')
sources = response.get('sources', [])
# 显示答案
st.write(answer)
# 显示性能信息
col1, col2, col3 = st.columns(3)
with col1:
st.metric("检索时间", f"{response.get('retrieval_time', 0):.2f}s")
with col2:
st.metric("生成时间", f"{response.get('response_time', 0):.2f}s")
with col3:
st.metric("相关文档", response.get('relevant_docs_count', 0))
# 显示来源
if sources:
with st.expander("📚 参考来源", expanded=False):
for i, source in enumerate(sources, 1):
st.write(f"**来源 {i}**: {source['filename']}")
st.write(f"**相似度**: {source['similarity']:.3f}")
st.write(f"**内容预览**: {source['text_preview']}")
st.write("---")
# 保存到聊天历史
st.session_state.chat_history.append({
"role": "assistant",
"content": answer,
"sources": sources
})
else:
error_msg = "抱歉,服务暂时不可用,请稍后再试。"
st.error(error_msg)
st.session_state.chat_history.append({
"role": "assistant",
"content": error_msg
})
# 底部信息
st.markdown("---")
st.markdown(
"💡 **使用提示**: "
"1. 先上传相关文档到知识库 "
"2. 然后就可以基于文档内容进行问答 "
"3. 系统会自动检索最相关的内容来回答问题"
)
if __name__ == "__main__":
main()
6. 容器化部署
6.1 Docker配置
创建Dockerfile用于容器化部署:
# Dockerfile
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建必要的目录
RUN mkdir -p /app/chroma_db /app/uploads /app/logs
# 暴露端口
EXPOSE 8000 8501
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动脚本
COPY start.sh /start.sh
RUN chmod +x /start.sh
CMD ["/start.sh"]
启动脚本:
#!/bin/bash
# start.sh
echo "启动企业知识库问答机器人..."
# 启动FastAPI后端服务
echo "启动API服务..."
uvicorn main:app --host 0.0.0.0 --port 8000 &
# 等待API服务启动
sleep 10
# 启动Streamlit前端
echo "启动前端界面..."
streamlit run streamlit_app.py --server.port 8501 --server.address 0.0.0.0 &
# 保持容器运行
wait
6.2 Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
qa-bot:
build: .
container_name: enterprise-qa-bot
ports:
- "8000:8000" # API服务
- "8501:8501" # Streamlit界面
environment:
- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
volumes:
- ./data:/app/data
- ./chroma_db:/app/chroma_db
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
nginx:
image: nginx:alpine
container_name: qa-bot-nginx
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- qa-bot
restart: unless-stopped
volumes:
chroma_db:
logs:
7. 系统优化与监控
7.1 性能优化策略
为了提高系统性能,我们可以实施以下优化策略:
# performance_optimizer.py
import functools
import time
from typing import Dict, List
import threading
from concurrent.futures import ThreadPoolExecutor
class PerformanceOptimizer:
"""
性能优化器
提供缓存、批处理、异步处理等优化功能
"""
def __init__(self):
self.cache = {}
self.cache_lock = threading.Lock()
self.max_cache_size = 1000
self.cache_ttl = 3600 # 1小时
def cache_result(self, ttl: int = None):
"""
结果缓存装饰器
Args:
ttl: 缓存存活时间(秒)
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
with self.cache_lock:
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
cache_age = time.time() - timestamp
if cache_age < (ttl or self.cache_ttl):
print(f"缓存命中: {func.__name__}")
return cached_result
else:
# 缓存过期,删除
del self.cache[cache_key]
# 执行函数
result = func(*args, **kwargs)
with self.cache_lock:
# 清理过期缓存
if len(self.cache) >= self.max_cache_size:
self._cleanup_cache()
# 存储结果
self.cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
def _cleanup_cache(self):
"""清理过期缓存"""
current_time = time.time()
expired_keys = []
for key, (_, timestamp) in self.cache.items():
if current_time - timestamp > self.cache_ttl:
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
print(f"清理了 {len(expired_keys)} 个过期缓存项")
# 应用性能优化
optimizer = PerformanceOptimizer()
# 在QAEngine中应用缓存
class OptimizedQAEngine(QAEngine):
"""优化版问答引擎"""
@optimizer.cache_result(ttl=1800) # 30分钟缓存
def ask_question(self, question: str, include_sources: bool = True) -> Dict:
"""带缓存的问答方法"""
return super().ask_question(question, include_sources)
def batch_ask_questions(self, questions: List[str]) -> List[Dict]:
"""批量问答"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(self.ask_question, q)
for q in questions
]
results = [future.result() for future in futures]
return results
7.2 监控与日志系统
# monitoring.py
import logging
import time
import psutil
import threading
from typing import Dict
from datetime import datetime
import json
class SystemMonitor:
"""
系统监控类
监控API性能、资源使用情况等
"""
def __init__(self, log_file: str = "system_monitor.log"):
self.log_file = log_file
self.setup_logging()
self.metrics = {
'api_calls': 0,
'successful_queries': 0,
'failed_queries': 0,
'total_response_time': 0,
'avg_response_time': 0,
'cache_hits': 0,
'cache_misses': 0
}
self.metrics_lock = threading.Lock()
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_system, daemon=True)
self.monitor_thread.start()
def setup_logging(self):
"""设置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('SystemMonitor')
def log_api_call(self, endpoint: str, response_time: float, success: bool):
"""记录API调用"""
with self.metrics_lock:
self.metrics['api_calls'] += 1
self.metrics['total_response_time'] += response_time
self.metrics['avg_response_time'] = (
self.metrics['total_response_time'] / self.metrics['api_calls']
)
if success:
self.metrics['successful_queries'] += 1
else:
self.metrics['failed_queries'] += 1
self.logger.info(
f"API调用 - 端点: {endpoint}, "
f"响应时间: {response_time:.3f}s, "
f"成功: {success}"
)
def log_cache_event(self, event_type: str):
"""记录缓存事件"""
with self.metrics_lock:
if event_type == 'hit':
self.metrics['cache_hits'] += 1
elif event_type == 'miss':
self.metrics['cache_misses'] += 1
def get_system_metrics(self) -> Dict:
"""获取系统指标"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
with self.metrics_lock:
metrics = self.metrics.copy()
return {
'timestamp': datetime.now().isoformat(),
'system': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3),
'disk_percent': disk.percent,
'disk_free_gb': disk.free / (1024**3)
},
'application': metrics
}
def _monitor_system(self):
"""系统监控后台线程"""
while True:
try:
metrics = self.get_system_metrics()
# 记录关键指标
if metrics['system']['cpu_percent'] > 80:
self.logger.warning(f"CPU使用率过高: {metrics['system']['cpu_percent']:.1f}%")
if metrics['system']['memory_percent'] > 85:
self.logger.warning(f"内存使用率过高: {metrics['system']['memory_percent']:.1f}%")
# 每5分钟记录一次详细指标
self.logger.info(f"系统指标: {json.dumps(metrics, ensure_ascii=False, indent=2)}")
time.sleep(300) # 5分钟
except Exception as e:
self.logger.error(f"监控系统时出错: {str(e)}")
time.sleep(60)
# 全局监控实例
monitor = SystemMonitor()
8. 测试与验证
8.1 单元测试
# test_qa_engine.py
import unittest
import tempfile
import os
from qa_engine import QAEngine
from document_processor import DocumentProcessor
from vector_store import VectorStore
class TestQAEngine(unittest.TestCase):
"""问答引擎测试类"""
def setUp(self):
"""测试前设置"""
self.test_api_key = "test_api_key"
self.temp_dir = tempfile.mkdtemp()
# 创建测试文档
test_doc_path = os.path.join(self.temp_dir, "test_doc.txt")
with open(test_doc_path, 'w', encoding='utf-8') as f:
f.write("""
人工智能(AI)是计算机科学的一个分支,它试图理解智能的本质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。
该领域的研究包括机器人、语言识别、图像识别、自然语言处理和专家系统等。
""")
def tearDown(self):
"""测试后清理"""
import shutil
shutil.rmtree(self.temp_dir)
def test_document_processing(self):
"""测试文档处理功能"""
processor = DocumentProcessor()
chunks = processor.process_documents(self.temp_dir)
self.assertGreater(len(chunks), 0)
self.assertIn('text', chunks[0])
self.assertIn('metadata', chunks[0])
def test_vector_store(self):
"""测试向量数据库功能"""
vector_store = VectorStore(persist_directory=tempfile.mkdtemp())
# 测试文档
test_docs = [
{
'id': 'doc1',
'text': '人工智能是计算机科学的分支',
'metadata': {'source': 'test'}
}
]
# 添加文档
vector_store.add_documents(test_docs)
# 搜索文档
results = vector_store.search_similar_documents("人工智能", top_k=1)
self.assertGreater(len(results), 0)
self.assertIn('人工智能', results[0]['text'])
if __name__ == '__main__':
unittest.main()
8.2 性能测试
# performance_test.py
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import requests
class PerformanceTest:
"""性能测试类"""
def __init__(self, api_base_url: str = "http://localhost:8000"):
self.api_base_url = api_base_url
self.test_questions = [
"什么是人工智能?",
"机器学习的主要方法有哪些?",
"深度学习和机器学习的区别是什么?",
"自然语言处理包括哪些技术?",
"计算机视觉的应用领域有哪些?"
]
def test_single_request(self, question: str) -> float:
"""测试单个请求的响应时间"""
start_time = time.time()
try:
response = requests.post(
f"{self.api_base_url}/ask",
json={"question": question, "include_sources": True},
timeout=30
)
response.raise_for_status()
end_time = time.time()
return end_time - start_time
except Exception as e:
print(f"请求失败: {str(e)}")
return -1
def test_concurrent_requests(self, num_threads: int = 5, num_requests: int = 20):
"""测试并发请求性能"""
print(f"开始并发测试 - {num_threads} 线程, {num_requests} 请求")
questions = (self.test_questions * (num_requests // len(self.test_questions) + 1))[:num_requests]
response_times = []
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(self.test_single_request, q) for q in questions]
for future in futures:
response_time = future.result()
if response_time > 0:
response_times.append(response_time)
if response_times:
avg_time = statistics.mean(response_times)
median_time = statistics.median(response_times)
p95_time = sorted(response_times)[int(len(response_times) * 0.95)]
print(f"性能测试结果:")
print(f"- 成功请求: {len(response_times)}/{num_requests}")
print(f"- 平均响应时间: {avg_time:.3f}s")
print(f"- 中位数响应时间: {median_time:.3f}s")
print(f"- 95%分位数响应时间: {p95_time:.3f}s")
print(f"- 最快响应时间: {min(response_times):.3f}s")
print(f"- 最慢响应时间: {max(response_times):.3f}s")
else:
print("所有请求都失败了")
if __name__ == "__main__":
tester = PerformanceTest()
tester.test_concurrent_requests(num_threads=3, num_requests=15)
9. 部署指南
9.1 生产环境部署
#!/bin/bash
# deploy.sh - 生产环境部署脚本
echo "开始部署企业知识库问答机器人..."
# 1. 创建项目目录
PROJECT_DIR="/opt/enterprise-qa-bot"
sudo mkdir -p $PROJECT_DIR
cd $PROJECT_DIR
# 2. 克隆代码或复制文件
# git clone https://github.com/your-repo/enterprise-qa-bot.git .
# 3. 设置环境变量
cat > .env << EOF
DEEPSEEK_API_KEY=your_deepseek_api_key_here
ENVIRONMENT=production
LOG_LEVEL=INFO
EOF
# 4. 构建Docker镜像
echo "构建Docker镜像..."
docker-compose build
# 5. 启动服务
echo "启动服务..."
docker-compose up -d
# 6. 等待服务启动
echo "等待服务启动..."
sleep 30
# 7. 健康检查
echo "执行健康检查..."
if curl -f http://localhost:8000/health; then
echo "✅ API服务启动成功"
else
echo "❌ API服务启动失败"
exit 1
fi
if curl -f http://localhost:8501; then
echo "✅ 前端服务启动成功"
else
echo "❌ 前端服务启动失败"
exit 1
fi
echo "🎉 部署完成!"
echo "API服务地址: http://your-server:8000"
echo "前端界面地址: http://your-server:8501"
9.2 Nginx配置
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream api_backend {
server qa-bot:8000;
}
upstream frontend_backend {
server qa-bot:8501;
}
server {
listen 80;
server_name your-domain.com;
# API代理
location /api/ {
proxy_pass http://api_backend/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# 前端代理
location / {
proxy_pass http://frontend_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket支持(Streamlit需要)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
# 静态文件缓存
location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg)$ {
expires 1y;
add_header Cache-Control "public, immutable";
}
}
}
10. 总结与展望
10.1 项目总结
通过本项目的实战开发,我们成功构建了一个基于DeepSeek-V3的企业知识库问答机器人。该系统具有以下特点:
技术优势:
- 先进的语言模型: 采用DeepSeek-V3,具备强大的中文理解和生成能力
- 高效的向量检索: 基于Sentence-Transformers和ChromaDB实现精准的语义搜索
- 模块化架构: 清晰的代码结构,便于维护和扩展
- 容器化部署: 支持Docker部署,便于环境管理和扩容
功能特性:
- 多格式文档支持: 支持PDF、Word、文本等多种文档格式
- 智能文档分割: 基于token数量的智能文本分块
- 实时问答: 支持流式输出,提供良好的用户体验
- 来源追溯: 提供答案来源信息,增强可信度
性能优化:
- 结果缓存: 减少重复计算,提高响应速度
- 并发处理: 支持多线程处理,提高系统吞吐量
- 监控告警: 完善的监控体系,保障系统稳定运行
10.2 使用效果分析
基于实际测试,系统在以下方面表现优秀:
图3: 系统性能指标统计图
10.3 未来改进方向
- 多模态支持: 增加图片、表格等多模态内容的理解能力
- 知识图谱集成: 结合知识图谱技术,提供更加结构化的知识服务
- 个性化推荐: 基于用户历史行为,提供个性化的问题推荐
- 实时学习: 支持在线学习新知识,持续优化回答质量
- 多语言支持: 扩展对多种语言的支持能力
10.4 技术要点回顾
在本项目中,我们重点运用了以下技术:
- 华为云Flexus: 提供稳定可靠的云计算基础设施
- DeepSeek-V3: 提供强大的自然语言理解和生成能力
- 向量数据库: 实现高效的语义检索
- RAG架构: 结合检索和生成,提供准确的知识问答
- 微服务架构: 实现系统的模块化和可扩展性
通过本项目的实践,我们深入理解了企业级AI应用的开发流程,掌握了从需求分析、架构设计、代码实现到部署运维的完整技术栈。这为后续开发更复杂的AI应用奠定了坚实的基础。
参考文献
- DeepSeek官方文档
- 华为云Flexus产品介绍
- ChromaDB官方文档
- Sentence-Transformers文档
- FastAPI官方文档
- Streamlit官方文档
- RAG技术原理与实践
- 企业级AI应用开发最佳实践