前言
taro.request是可以实现sse长连接的,但是呢其中有俩大坑,找了许多资料也没解决,后续解决办法也与后端商量改用WebSocket来实现。
代码实现
SSEManager.js:
import { getAccessToken } from "../xx/xx";
import { TextDecoder } from 'text-encoding';
import Taro from "@tarojs/taro";
const regexp = /\[DONE\]/; // 规定结束标志
const decoder = new TextDecoder("utf-8");
/**
* 外部需要限制SSE连接,无法获取时,自行处理
**/
export function getSSEConnection(
connectId,
sessionId,
messageEmitter,
closeHandler
) {
console.log("[SSE] getSSEConnection", connectId, sessionId);
let token = getAccessToken();
const requestTask = Taro.request({
url:'http://xxxx:xx/chatConnect',
method: "GET",
data: { id: connectId },
responseType: "arraybuffer", // 响应的数据类型
header: {
Authorization: token,
Accept: "text/event-stream",
},
enableChunked: true, // 关键配置 开启后数据将以分块形式传输
timeout:6 * 1000, // 该配置在sse中没用
fail: (err) => {
console.log("err", err);
if (err.errMsg.includes("timeout")) {
closeHandler('timeout');
console.log('回答超时了',connectId, sessionId);
} else {
closeHandler('fail');
console.log('回答出错了',connectId, sessionId);
}
}
});
// 接收到新的chunk时触发
requestTask.onChunkReceived((chunk) => {
const responseText = decoder.decode(chunk.data);
let plain = responseText.replace(/^data:/gm, "");
plain = plain
.replace(/\n{2,}/g, "\n")
.replace(/^\n+|\n+$/g, "")
.trim();
messageEmitter(sessionId, plain);
if (regexp.test(responseText)) {
console.log('回答完成',connectId, sessionId);
closeHandler('complete');
}
});
return requestTask
}
使用
import { getSSEConnection } from '../sse/SSEManager.js';
import mitt from 'mitt';
const eventBus = mitt();
const sessionId = ref () // 会话id
let chat
const chatSendEvent = async () => {
eventBus.on(sessionId.value, (data) => {
// 对获取的数据块处理
console.log( data);
chat += data
});
// 获取SSE链接
getSSEConnectionEvent()
}
const getSSEConnectionEvent = async () => {
const globalId = nanoid(64); // 链接id 请求参数
requestTaskCleanup.value = getSSEConnection(
globalId,
sessionId.value,
(key, value) => { eventBus.emit(key, value); },
async (why) => {
console.log("[event] close!", why);
if (why == 'complete') {}
// TODO 超时
else if (why == 'timeout') {}
else if (why == 'abort') {}
else {}
eventBus.all.clear();
}
)
// 发送提示词
sendStreamEvent(globalId)
}
以上就是整个过程使用,其中的数据处理根据业务来定。现在来讲讲其中的坑:
坑
超时无法捕获:
在开启分块传输后,timeout配置就不生效了,超时也无法捕获了。
解决:
自己设置定时器,在超时后断开连接,requestTask.abort(); 但是这个方法是无效的,无法断开。
参考以下文章,可知该问题是一直没得到解决的。 微信开放社区https://developers.weixin.qq.com/community/develop/doc/000caa13bd8fb080ed7d318fb57800
在前端无法主动断开连接的情况下,前端只能实现不接收后端返回的数据块,表面上看起来像是断开了连接。
实现代码如下(SSEManager.js):
在超时的时候 通过一个变量状态来控制是否继续接收返回的数据块。当然也可以通过与后端设置一样的超时时间 通过后端的报错 来直接处理。
import { getAccessToken } from "../xx/xx";
import { TextDecoder } from 'text-encoding';
import Taro from "@tarojs/taro";
const regexp = /\[DONE\]/;
const decoder = new TextDecoder("utf-8");
/**
* 外部需要限制SSE连接,无法获取时,自行处理
*
* **/
export function getSSEConnection(
connectId,
sessionId,
messageEmitter,
closeHandler
) {
console.log("[SSE] getSSEConnection", connectId, sessionId);
let token = getAccessToken();
let shouldIgnoreData = false; // 新增 控制是否接收数据块 为true 不接收
// 新增 设置超时定时器
const timeoutTimer = setTimeout(() => {
if (!shouldIgnoreData) {
closeHandler('timeout');
cleanup();
}
}, 182 * 1000 ); // 保底报错结束 这里保底是指超时时间与后端设置一样的
const requestTask = Taro.request({
url: `http://xxxx:xx/chatConnect`,
method: "GET",
data: { id: connectId },
responseType: "arraybuffer",
header: {
Authorization: token,
Accept: "text/event-stream",
},
enableChunked: true,
});
// 新增 资源清理函数
const cleanup = () => {
if (shouldIgnoreData) return;
shouldIgnoreData = true; // 暂解决超时处理 忽略后续数据
clearTimeout(timeoutTimer);
// 终止请求--------TODO未终止
// 取消监听获取数据块 与使用 shouldIgnoreData 变量来控制数据的接收是一样的 可选其一种方式
requestTask.offChunkReceived(messageEmitter);
requestTask.abort(); // 不生效
};
// 新增 请求报错处理 返回函数
requestTask.onHeadersReceived((res) => {
console.log("SSE request onHeadersReceived:", res);
if (shouldIgnoreData) return; // 终止后 忽略数据
clearTimeout(timeoutTimer);
if (res.statusCode !== 200) {
closeHandler('fail');
cleanup();
}
});
requestTask.onChunkReceived((chunk) => {
if (shouldIgnoreData) return; // 终止后忽略数据
const responseText = decoder.decode(chunk.data);
let plain = responseText.replace(/^data:/gm, "");
plain = plain
.replace(/\n{2,}/g, "\n")
.replace(/^\n+|\n+$/g, "")
.trim();
messageEmitter(sessionId, plain);
if (regexp.test(responseText)) {
console.log('回答完成');
closeHandler('complete');
cleanup();
}
});
// 返回函数 主动不接数据
const f = () => {
closeHandler('abort')
cleanup();
}
return f
}
请求无法暂停:
由于requestTask.abort(); // 不生效 ,前端若想处理也可以通过以上行为 不接收返回数据
来实现,改变视觉效果。
总结
以上都是前端来实现的一种方式,在开发过程中以上问题也可以通过和后端配合来解决。
使用taro.request 和 wx.request 实现sse长连接 都是一样的坑(taro.request是基于wx.request封装的),建议改用WebSocket (自己也与后端商议改用了)。
有更多解决办法,欢迎评论区分享