目录
- 项目背景介绍
- sanic-web
- Dify\_service handle\_think\_tag报错NoneType
- 问题描述
- debug
- Dify调用不成功,一直转圈圈
- 问题描述
- debug
- 前端markdown格式只显示前5页
- 问题描述
- debug
- 1. 修改代码
- 2.重新构建1.1.3镜像
- 3.更新sanic-web/docker/docker-compose.yaml
- 4. 重新部署
- Dify超时60秒,服务器报错
- 问题描述
- debug
项目背景介绍
sanic-web
项目地址:https://github.com/apconw/sanic-web
一个轻量级、支持全链路且易于二次开发的大模型应用项目(Large Model Data Assistant) 支持DeepSeek/Qwen2.5等大模型 基于 Dify 、Ollama&Vllm、Sanic 和 Text2SQL 📊 等技术构建的一站式大模型应用开发项目,采用 Vue3、TypeScript 和 Vite 5 打造现代UI。它支持通过 ECharts 📈 实现基于大模型的数据图形化问答,具备处理 CSV 文件 📂 表格问答的能力。同时,能方便对接第三方开源 RAG 系统 检索系统 🌐等,以支持广泛的通用知识问答。
这个项目可以作为text2sql的经典案例,通过自然语言来访问业务数据库,最终使用echarts图表可视化展示分析数据。使用了独立开发的web页面,对于前端小伙伴来说也比较友好,这完全可以作为一个AI智能助手的Demo实现。
Dify_service handle_think_tag报错NoneType
问题描述
debug
修改services/dify_service.py/handle_think_tag代码,如下:
@staticmethod
async def handle_think_tag(answer):
"""
处理<think>标签内的内容
:param answer
"""
"""
处理<think>标签内的内容,或JSON格式的thoughts字段
:param answer
"""
think_content = ""
remaining_content = answer
# 会遇到answer可能不能解析到,先尝试解析为JSON
try:
data = json.loads(answer)
if isinstance(data, dict) and "thoughts" in data:
think_content = data["thoughts"]
remaining_content = answer
return think_content, remaining_content
except Exception:
pass
# 再尝试正则提取<think>标签
match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)
if match:
think_content = match.group(1)
remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()
return think_content, remaining_content
# 如果都没有,返回空
return "", answer
Dify调用不成功,一直转圈圈
问题描述
debug
检查本地dify的端口号,修改sanic-web/docker/docker-compose.yaml中dify端口号到本地端口号,比如原端口号是18000,修改成80。
chat-service:
image: apconw/sanic-web:1.1.2
container_name: sanic-web
environment:
- ENV=test
- DIFY_SERVER_URL=http://host.docker.internal:80
- DIFY_DATABASE_QA_API_KEY=app-AXDUw8TtcY7N6TMGHkPaC4VF
- MINIO_ENDPOINT=host.docker.internal:19000
- MINIO_ACCESS_KEY=sIR5eeDkiwoo779yNJbw
- MiNIO_SECRET_KEY=MreuQ3aC1ymHJeo3QfzSg7aPz7PqlxeOw39nZUdE
ports:
- "8088:8088"
extra_hosts:
- "host.docker.internal:host-gateway"
前端markdown格式只显示前5页
问题描述
debug
1. 修改代码
修改web/src/components/MarkdownPreview/MarkdownTable.vue第39行-40行代码,将:data="pagedTableData"改为:data=“tableData”,并移除:pagination="pagination"属性:
<template>
<div style="background-color: #ffffff">
<n-card
title="表格"
embedded
bordered
:content-style="{ 'background-color': '#ffffff' }"
:header-style="{
color: '#26244c',
height: '10px',
'background-color': '#f0effe',
'text-align': 'left',
'font-size': '14px',
'font-family': 'PMingLiU'
}"
:footer-style="{
color: '#666',
'background-color': '#ffffff',
'text-align': 'left',
'font-size': '14px',
'font-family': 'PMingLiU'
}"
>
<div
style="
display: flex;
justify-content: space-between;
margin-bottom: 10px;
"
></div>
<n-data-table
style="
height: 550px;
width: 850px;
margin: 0px 10px;
background-color: #ffffff;
"
:columns="columns"
:data="tableData"
:max-height="550"
virtual-scroll
virtual-scroll-x
:scroll-x="scrollX"
:min-row-height="minRowHeight"
:height-for-row="heightForRow"
virtual-scroll-header
:header-height="48"
/>
<template #footer>
数据来源: 大模型生成的数据, 以上信息仅供参考
</template>
</n-card>
</div>
</template>
2.重新构建1.1.3镜像
# 进入web的docker目录
cd docker
# 查看原始Dockerfile ,这步也可以省略
cat Dockerfile
# 使用原始Dockerfile构建新镜像
docker build -t apconw/chat-vue3-mvp:1.1.3 -f Dockerfile ..
3.更新sanic-web/docker/docker-compose.yaml
services:
chat-web:
image: apconw/chat-vue3-mvp:1.1.3 # 更新为新版本
container_name: chat-vue3-mvp
ports:
- "8081:80"
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- chat-service
4. 重新部署
docker-compose down
docker-compose up -d
Dify超时60秒,服务器报错
问题描述
2025/05/09 08:16:19 [error] 20#20: *1 upstream timed out (110: Operation timed out) while reading response header from upstream, client: 192.168.65.1, server: localhost, request: “POST /sanic/dify/get_answer HTTP/1.1”, upstream: “http://192.168.65.254:8088/dify/get_answer”, host: “localhost:8081”, referrer: “http://localhost:8081/chat”
192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_answer HTTP/1.1” 504 497 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”
192.168.65.1 - - [09/May/2025:08:16:19 +0000] “POST /sanic/dify/get_dify_suggested HTTP/1.1” 200 63 “http://localhost:8081/chat” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36” “-”
debug
修改dify_service.py中的DiFyRequest,添加心跳机制:
- 添加心跳机制:
- 新增send_heartbeat方法发送SSE心跳
- 在处理请求过程中每10秒发送一次心跳
- 在开始、结束和错误处理时也发送心跳
- 增强错误处理:
- 添加send_error_message方法,统一错误信息发送
- 完善异常处理,确保错误信息能发送到客户端
- 增加超时时间:
- 将aiohttp的超时设置从2分钟增加到5分钟
- 确保连接正确关闭:
- 在finally块中确保发送最后的心跳消息
- 确保调用res_end方法正确关闭连接
class DiFyRequest:
"""
DiFy操作服务类
"""
def __init__(self):
pass
async def exec_query(self, res):
"""
执行查询并处理流式响应
"""
try:
# 获取请求体内容 从res流对象获取request-body
req_body_content = res.request.body
# 将字节流解码为字符串
body_str = req_body_content.decode("utf-8")
req_obj = json.loads(body_str)
logging.info(f"query param: {body_str}")
# str(uuid.uuid4())
chat_id = req_obj.get("chat_id")
qa_type = req_obj.get("qa_type")
# 使用正则表达式移除所有空白字符(包括空格、制表符、换行符等)
query = req_obj.get("query")
cleaned_query = re.sub(r"\s+", "", query)
# 获取登录用户信息
token = res.request.headers.get("Authorization")
if not token:
raise MyException(SysCodeEnum.c_401)
if token.startswith("Bearer "):
token = token.split(" ")[1]
# 封装问答上下文信息
qa_context = QaContext(token, cleaned_query, chat_id)
# 判断请求类别
app_key = self._get_authorization_token(qa_type)
# 构建请求参数
dify_service_url, body_params, headers = self._build_request(chat_id, cleaned_query, app_key, qa_type)
# 收集流式输出结果
t02_answer_data = []
# 收集业务数据流式输出结果
t04_answer_data = {}
# 发送初始连接消息
await self.send_heartbeat(res, "开始处理请求")
# 心跳计时器
last_heartbeat = time.time()
async with aiohttp.ClientSession(read_bufsize=1024 * 16) as session:
async with session.post(
dify_service_url,
headers=headers,
json=body_params,
timeout=aiohttp.ClientTimeout(total=60 * 5), # 增加到5分钟超时
) as response:
logging.info(f"dify response status: {response.status}")
if response.status == 200:
data_type = ""
bus_data = ""
while True:
# 发送心跳保持连接
current_time = time.time()
if current_time - last_heartbeat > 10: # 每10秒发送一次心跳
await self.send_heartbeat(res, "处理中...")
last_heartbeat = current_time
reader = response.content
reader._high_water = 10 * 1024 * 1024 # 设置为10MB
chunk = await reader.readline()
if not chunk:
# 发送最后的心跳
await self.send_heartbeat(res, "读取数据完成")
break
str_chunk = chunk.decode("utf-8")
# 处理数据块
if str_chunk.startswith("data"):
# 更新最后心跳时间
last_heartbeat = time.time()
str_data = str_chunk[5:]
data_json = json.loads(str_data)
event_name = data_json.get("event")
conversation_id = data_json.get("conversation_id")
message_id = data_json.get("message_id")
task_id = data_json.get("task_id")
# 处理消息事件...
# 这里保留原有的事件处理逻辑
if DiFyCodeEnum.MESSAGE.value[0] == event_name:
answer = data_json.get("answer")
if answer and answer.startswith("dify_"):
event_list = answer.split("_")
if event_list[1] == "0":
# 输出开始
data_type = event_list[2]
if data_type == DataTypeEnum.ANSWER.value[0]:
await self.send_message(
res,
answer,
{"data": {"messageType": "begin"}, "dataType": data_type},
)
elif event_list[1] == "1":
# 输出结束
data_type = event_list[2]
if data_type == DataTypeEnum.ANSWER.value[0]:
await self.send_message(
res,
answer,
{"data": {"messageType": "end"}, "dataType": data_type},
)
# 输出业务数据
elif bus_data and data_type == DataTypeEnum.BUS_DATA.value[0]:
res_data = process(json.loads(bus_data)["data"])
await self.send_message(
res,
answer,
{"data": res_data, "dataType": data_type},
)
t04_answer_data = {"data": res_data, "dataType": data_type}
data_type = ""
elif len(data_type) > 0:
# 这里输出 t02之间的内容
if data_type == DataTypeEnum.ANSWER.value[0]:
await self.send_message(
res,
answer,
{"data": {"messageType": "continue", "content": answer}, "dataType": data_type},
)
t02_answer_data.append(answer)
# 这里设置业务数据
if data_type == DataTypeEnum.BUS_DATA.value[0]:
bus_data = answer
elif DiFyCodeEnum.MESSAGE_ERROR.value[0] == event_name:
# 输出异常情况日志
error_msg = data_json.get("message")
logging.error(f"Error 调用dify失败错误信息: {data_json}")
await res.write(
"data:"
+ json.dumps(
{
"data": {"messageType": "error", "content": "调用失败请查看dify日志,错误信息: " + error_msg},
"dataType": DataTypeEnum.ANSWER.value[0],
},
ensure_ascii=False,
)
+ "\n\n"
)
elif DiFyCodeEnum.MESSAGE_END.value[0] == event_name:
t02_message_json = {
"data": {"messageType": "continue", "content": "".join(t02_answer_data)},
"dataType": DataTypeEnum.ANSWER.value[0],
}
print(t02_message_json)
if t02_message_json:
await self._save_message(t02_message_json, qa_context, conversation_id, message_id, task_id, qa_type)
if t04_answer_data:
await self._save_message(t04_answer_data, qa_context, conversation_id, message_id, task_id, qa_type)
t02_answer_data = []
t04_answer_data = {}
except Exception as e:
logging.error(f"Error during get_answer: {e}")
traceback.print_exception(e)
# 发送错误信息
await self.send_error_message(res, f"处理请求出错: {str(e)}")
return {"error": str(e)} # 返回错误信息作为字典
finally:
# 确保连接正确关闭
await self.send_heartbeat(res, "请求处理完成")
await self.res_end(res)
async def send_heartbeat(self, res, message="心跳"):
"""
发送心跳信息保持连接活跃
"""
try:
await res.write(
f"data:{json.dumps({'heartbeat': True, 'message': message}, ensure_ascii=False)}\n\n"
)
except Exception as e:
logging.error(f"发送心跳失败: {e}")
async def send_error_message(self, res, error_message):
"""
发送错误信息
"""
try:
await res.write(
"data:"
+ json.dumps(
{
"data": {"messageType": "error", "content": error_message},
"dataType": DataTypeEnum.ANSWER.value[0],
},
ensure_ascii=False,
)
+ "\n\n"
)
except Exception as e:
logging.error(f"发送错误信息失败: {e}")
@staticmethod
async def handle_think_tag(answer):
"""
处理<think>标签内的内容
:param answer
"""
"""
处理<think>标签内的内容,或JSON格式的thoughts字段
:param answer
"""
think_content = ""
remaining_content = answer
# 会遇到answer可能不能解析到,先尝试解析为JSON
try:
data = json.loads(answer)
if isinstance(data, dict) and "thoughts" in data:
think_content = data["thoughts"]
remaining_content = answer
return think_content, remaining_content
except Exception:
pass
# 再尝试正则提取<think>标签
match = re.search(r"<think>(.*?)</think>", answer, re.DOTALL)
if match:
think_content = match.group(1)
remaining_content = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()
return think_content, remaining_content
# 如果都没有,返回空
return "", answer
@staticmethod
async def _save_message(message, qa_context, conversation_id, message_id, task_id, qa_type):
"""
保存消息记录并发送SSE数据
:param message:
:param qa_context:
:param conversation_id:
:param message_id:
:param task_id:
:param qa_type:
:return:
"""
# 保存用户问答记录 1.保存用户问题 2.保存用户答案 t02 和 t04
if "content" in message["data"]:
await add_question_record(
qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, message, "", qa_type
)
elif message["dataType"] == DataTypeEnum.BUS_DATA.value[0]:
await add_question_record(
qa_context.token, conversation_id, message_id, task_id, qa_context.chat_id, qa_context.question, "", message, qa_type
)
async def send_message(self, response, answer, message):
"""
SSE 格式发送数据,每一行以 data: 开头
"""
if answer.lstrip().startswith("<think>"):
# 处理deepseek模型思考过程样式
think_content, remaining_content = await self.handle_think_tag(answer)
# 发送<think>标签内的内容
message = {
"data": {"messageType": "continue", "content": "> " + think_content.replace("\n", "") + "\n\n" + remaining_content},
"dataType": "t02",
}
await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")
else:
await response.write("data:" + json.dumps(message, ensure_ascii=False) + "\n\n")
@staticmethod
async def res_begin(res, chat_id):
"""
:param res:
:param chat_id:
:return:
"""
await res.write(
"data:"
+ json.dumps(
{
"data": {"id": chat_id},
"dataType": DataTypeEnum.TASK_ID.value[0],
}
)
+ "\n\n"
)
@staticmethod
async def res_end(res):
"""
:param res:
:return:
"""
await res.write(
"data:"
+ json.dumps(
{
"data": "DONE",
"dataType": DataTypeEnum.STREAM_END.value[0],
}
)
+ "\n\n"
)
@staticmethod
def _build_request(chat_id, query, app_key, qa_type):
"""
构建请求参数
:param chat_id: 对话id
:param app_key: api key
:param query: 用户问题
:param qa_type: 问答类型
:return:
"""
# 通用问答时,使用上次会话id 实现多轮对话效果
conversation_id = ""
if qa_type == DiFyAppEnum.COMMON_QA.value[0]:
qa_record = query_user_qa_record(chat_id)
if qa_record and len(qa_record) > 0:
conversation_id = qa_record[0]["conversation_id"]
body_params = {
"query": query,
"inputs": {"qa_type": qa_type},
"response_mode": "streaming",
"conversation_id": conversation_id,
"user": "abc-123",
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {app_key}",
}
dify_service_url = DiFyRestApi.build_url(DiFyRestApi.DIFY_REST_CHAT)
return dify_service_url, body_params, headers
@staticmethod
def _get_authorization_token(qa_type: str):
"""
根据请求类别获取api/token
固定走一个dify流
app-IzudxfuN8uO2bvuCpUHpWhvH master分支默认的数据问答key
:param qa_type
:return:
"""
# 遍历枚举成员并检查第一个元素是否与测试字符串匹配
for member in DiFyAppEnum:
if member.value[0] == qa_type:
return os.getenv("DIFY_DATABASE_QA_API_KEY")
else:
raise ValueError(f"问答类型 '{qa_type}' 不支持")