通义千问1.5-1.8B-Chat-GPTQ-Int4项目实战:微信小程序智能客服后端开发
通义千问1.5-1.8B-Chat-GPTQ-Int4项目实战微信小程序智能客服后端开发最近在做一个微信小程序项目客户想在里面加个智能客服能自动回答一些常见问题比如产品咨询、订单状态查询这些。一开始想用现成的云服务但考虑到数据隐私和长期成本还是决定自己搭一个。正好手头有台带GPU的服务器就琢磨着把通义千问这个轻量级模型用起来。通义千问1.5-1.8B-Chat这个版本经过GPTQ-Int4量化后模型体积小推理速度也快对硬件要求不高特别适合部署成API服务。今天我就把整个从模型服务封装到微信小程序集成的过程以及中间遇到的一些坑和解决方案跟大家详细聊聊。如果你也想在小程序里加个AI客服或者想了解怎么把大模型变成可调用的后端服务这篇应该能给你一些参考。1. 项目整体思路与准备工作做这个事核心目标就一个让微信小程序能像调用普通接口一样和部署在自家服务器上的通义千问模型对话。听起来简单但拆开来看得搞定好几件事。首先模型得能跑起来并且提供一个标准的HTTP接口。其次微信小程序那边要能安全、稳定地调用这个接口。最后对话不能是“一问一答”就忘得有点记忆力能联系上下文这才像个客服。我用的硬件是一台Ubuntu 20.04的服务器显卡是RTX 306012GB显存这个配置跑1.8B的Int4模型绰绰有余。软件环境方面Python 3.8以上装好CUDA和对应的PyTorch就行。模型我用的是通义千问1.5-1.8B-Chat的GPTQ-Int4量化版。选择它一是因为体积小部署方便二是因为Int4量化在几乎不损失精度的情况下大幅提升了推理速度并降低了显存占用这对响应速度要求高的客服场景很关键。2. 模型服务的封装与启动模型下载下来后不能直接裸奔着用。我们需要把它包装成一个Web服务这里我用的是FastAPI因为它轻量、异步支持好写起来也快。首先安装必要的库pip install fastapi uvicorn transformers torch accelerate接下来是核心的服务端代码。我创建了一个app.py文件from fastapi import FastAPI, HTTPException from pydantic import BaseModel from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer from threading import Thread import uvicorn import torch app FastAPI(titleQwen-Chat-API) # 定义请求和响应的数据格式 class ChatRequest(BaseModel): message: str session_id: str default # 用于区分不同对话会话 max_new_tokens: int 512 temperature: float 0.7 class ChatResponse(BaseModel): response: str session_id: str # 全局加载模型和分词器避免每次请求重复加载 print(正在加载模型和分词器...) MODEL_PATH ./Qwen-1_8B-Chat-GPTQ-Int4 # 替换为你的模型实际路径 tokenizer AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_codeTrue) model AutoModelForCausalLM.from_pretrained( MODEL_PATH, device_mapauto, # 自动分配模型层到GPU/CPU torch_dtypetorch.float16, trust_remote_codeTrue ) print(模型加载完毕) # 一个简单的内存字典用于存储不同会话的对话历史生产环境请用Redis等 conversation_histories {} def build_prompt(history, new_message): 构建符合通义千问Chat格式的对话Prompt prompt for old_query, old_response in history: prompt f|im_start|user\n{old_query}|im_end|\n|im_start|assistant\n{old_response}|im_end|\n prompt f|im_start|user\n{new_message}|im_end|\n|im_start|assistant\n return prompt app.post(/chat, response_modelChatResponse) async def chat_with_model(request: ChatRequest): try: # 1. 获取或初始化当前会话的历史记录 history conversation_histories.get(request.session_id, []) # 2. 构建完整Prompt full_prompt build_prompt(history, request.message) inputs tokenizer(full_prompt, return_tensorspt).to(model.device) # 3. 生成回复 with torch.no_grad(): outputs model.generate( **inputs, max_new_tokensrequest.max_new_tokens, temperaturerequest.temperature, do_sampleTrue, pad_token_idtokenizer.eos_token_id ) # 4. 解码并提取助理的新回复 full_response tokenizer.decode(outputs[0][inputs[input_ids].shape[1]:], skip_special_tokensTrue) # 5. 更新对话历史限制历史长度避免过长 history.append((request.message, full_response)) if len(history) 5: # 只保留最近5轮对话历史 history history[-5:] conversation_histories[request.session_id] history return ChatResponse(responsefull_response, session_idrequest.session_id) except Exception as e: raise HTTPException(status_code500, detailf模型生成错误: {str(e)}) if __name__ __main__: # 启动服务host0.0.0.0允许外部访问生产环境务必放在Nginx等反向代理后 uvicorn.run(app, host0.0.0.0, port8000)这段代码干了这么几件事定义了一个标准的/chat的POST接口。用session_id来区分不同用户的对话这样每个用户的聊天记录是独立的。用conversation_histories这个字典在内存里存历史对话注意这只是demo服务器重启数据就没了后面会讲怎么优化。按照通义千问Chat模型要求的格式|im_start|和|im_end|标签来拼接对话历史让模型知道上下文。生成回复后把新的问答对存到历史里并限制只保留最近5轮防止Prompt太长。启动服务很简单python app.py服务跑起来后你可以在浏览器访问http://你的服务器IP:8000/docs看到自动生成的API文档并且可以在这里手动测试接口。3. 微信小程序端API调用设计服务端准备好了小程序那头怎么调呢直接调用wx.request当然可以但不够优雅也不好维护。我的做法是封装一个专门的聊天模块。在小程序项目的utils目录下我创建了一个aiChat.js文件// utils/aiChat.js const API_BASE_URL https://你的域名或IP:8000; // 务必使用HTTPS小程序要求。 class AIChatService { constructor(sessionId null) { // 如果没有传入sessionId则生成一个唯一ID用于标识一次完整的对话会话 this.sessionId sessionId || mini_ Date.now() _ Math.random().toString(36).substr(2, 9); this.isGenerating false; } /** * 发送消息给AI客服 * param {string} message 用户输入的消息 * param {function} onSuccess 成功回调 * param {function} onError 失败回调 */ async sendMessage(message, onSuccess, onError) { if (this.isGenerating) { wx.showToast({ title: AI正在思考中..., icon: none }); return; } this.isGenerating true; wx.showLoading({ title: AI思考中..., mask: true }); try { const response await new Promise((resolve, reject) { wx.request({ url: ${API_BASE_URL}/chat, method: POST, data: { message: message, session_id: this.sessionId, max_new_tokens: 256, // 小程序回复不宜过长 temperature: 0.8, // 稍高的温度让回复更活泼 }, header: { content-type: application/json, }, success: (res) { if (res.statusCode 200) { resolve(res.data); } else { reject(new Error(请求失败: ${res.statusCode})); } }, fail: (err) { reject(err); }, }); }); wx.hideLoading(); this.isGenerating false; if (onSuccess typeof onSuccess function) { onSuccess(response.response); // 返回AI的回复文本 } } catch (error) { console.error(AI聊天请求错误:, error); wx.hideLoading(); wx.showToast({ title: 客服暂时走神了请稍后再试, icon: none }); this.isGenerating false; if (onError typeof onError function) { onError(error); } } } // 可选重置当前会话清空历史 resetSession(newSessionId null) { this.sessionId newSessionId || mini_ Date.now() _ Math.random().toString(36).substr(2, 9); } } module.exports AIChatService;然后在你的小程序页面比如chatPage.js里就可以这样用了// pages/chat/chatPage.js const AIChatService require(../../utils/aiChat.js); Page({ data: { messageList: [], // 消息列表 {type: user/ai, content: ...} inputValue: , aiService: null, }, onLoad() { // 初始化AI聊天服务可以传入一个固定的sessionId实现“记住我”功能 const sessionId wx.getStorageSync(ai_chat_session_id) || null; this.setData({ aiService: new AIChatService(sessionId) }); // 保存sessionId下次进入同一会话 if (!sessionId) { wx.setStorageSync(ai_chat_session_id, this.data.aiService.sessionId); } }, onInputChange(e) { this.setData({ inputValue: e.detail.value }); }, sendMessage() { const userMsg this.data.inputValue.trim(); if (!userMsg) return; // 1. 更新UI显示用户消息 const newList this.data.messageList.concat([{ type: user, content: userMsg }]); this.setData({ messageList: newList, inputValue: }); // 2. 调用AI服务 this.data.aiService.sendMessage( userMsg, // 成功回调 (aiResponse) { const updatedList this.data.messageList.concat([{ type: ai, content: aiResponse }]); this.setData({ messageList: updatedList }); // 滚动到底部 wx.pageScrollTo({ selector: #chat-end, duration: 300 }); }, // 失败回调 (error) { console.error(发送失败, error); } ); }, })这样封装的好处很明显业务逻辑清晰调用简单还能方便地控制加载状态、错误提示以及管理对话会话。4. 关键问题对话上下文管理与性能优化刚才的Demo版用内存字典存历史问题很大。一是数据丢二是如果用户多了内存肯定撑不住。在实际项目里我换成了Redis。# 改进后的历史管理部分 (utils/redis_manager.py) import redis import json import hashlib class ConversationManager: def __init__(self, redis_urlredis://localhost:6379, max_history_len5): self.redis_client redis.from_url(redis_url, decode_responsesTrue) self.max_len max_history_len self.key_prefix chat_history: def _get_key(self, session_id): # 对session_id做简单哈希作为Redis key的一部分 return self.key_prefix hashlib.md5(session_id.encode()).hexdigest() def get_history(self, session_id): key self._get_key(session_id) history_json self.redis_client.get(key) if history_json: return json.loads(history_json) return [] def save_history(self, session_id, history): key self._get_key(session_id) # 只保存最近N轮对话 trimmed_history history[-self.max_len:] self.redis_client.setex(key, 3600 * 24, json.dumps(trimmed_history)) # 设置24小时过期 # 然后在FastAPI app中替换掉原来的字典 from utils.redis_manager import ConversationManager conv_manager ConversationManager() app.post(/chat) async def chat_with_model(request: ChatRequest): history conv_manager.get_history(request.session_id) # ... 构建prompt、生成回复 ... history.append((request.message, full_response)) conv_manager.save_history(request.session_id, history) # ...用了Redis之后对话历史能持久化保存并且可以设置过期时间管理起来方便多了。另一个问题是性能。虽然1.8B模型不算大但如果小程序用户突然暴增一个GPU实例可能处理不过来请求。我采取了两个简单的策略异步生成与超时控制使用asyncio和模型本身的流式生成或异步支持避免一个长请求阻塞其他请求。同时在代码里设置合理的超时时间。请求队列与限流在FastAPI前面加一个简单的内存队列比如用asyncio.Queue或者用更专业的celery把生成任务丢到后台队列里处理前端轮询结果。对于公开服务一定要在Nginx或API网关层做限流防止被刷。# 简单的异步处理思路伪代码 from fastapi import BackgroundTasks import asyncio task_queue asyncio.Queue() result_cache {} # 用redis更好 async def worker(): while True: session_id, message await task_queue.get() # ... 调用模型生成 ... result_cache[session_id] generated_text task_queue.task_done() app.post(/chat/async) async def async_chat(request: ChatRequest, background_tasks: BackgroundTasks): task_id request.session_id _ str(time.time()) await task_queue.put((task_id, request.message)) return {task_id: task_id, status: queued} app.get(/chat/result/{task_id}) async def get_result(task_id: str): result result_cache.get(task_id) if result: return {status: completed, response: result} else: return {status: processing}对于小程序来说短轮询每隔几秒查一次结果是可以接受的这样能有效缓解瞬时高并发对模型推理的直接冲击。5. 总结把通义千问这样的模型集成到微信小程序里做客服技术上已经非常可行了。整个过程就像搭积木先把模型用FastAPI包装成一个可靠的HTTP服务处理好对话上下文然后在小程序端用清晰的方式封装调用逻辑最后根据实际访问量考虑用Redis存历史、用队列缓解并发压力。我按照这个思路做的客服模块已经稳定运行了一段时间。对于常见问题比如“你们的服务怎么收费”、“我的订单到哪了”模型都能给出准确、流畅的回答用户体验还不错。当然它毕竟不是万能的对于特别复杂或者需要实时查数据库的问题我们设定了规则让AI引导用户转接人工客服。这个方案的好处是自主可控数据都在自己服务器上心里踏实。成本也主要就是一台带GPU的云服务器如果访问量不大甚至中等配置的显卡都够用。如果你也想试试建议先从简单的内存版本开始跑通整个流程然后再逐步引入Redis、队列这些组件这样迭代起来会更顺畅。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2444295.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!