前言
在最近的一个需求,又双叒叕需要给项目加入Ai的功能,原本要求的效果是,查询到对应的数据后,完全展示出来,也就是常规的post请求,我大呼:“Easy”,直接开码、测试。紧接着用户使用一段时候后遇到了一个很现实的问题:长时间的等待。我们需要在GPT返回全部数据后,前端才能接受并展示,一旦询问的时间过长,就会让用户等待很久,这时候我们需要将前端的展示效果改为想ChatGPT那样的打字机效果。当然在以往前两年AI刚出来非常爆火的时候,上述效果我也完成过,由于做的是公开网站,没有额外的鉴权和参数处理,当时使用的技术是EventSource。可当我复刻以前的操作时发现好像并不太一样,这又不得不让我重温了一遍SSE的知识
对于SSE,我以前写过一篇关于EventSource/WebSocket/Fetch的介绍文章,可做参考
整理思路
先让我们复习下如何使用EventSource
const eventSource = new EventSource('/api/steam');
// 每次连接开启时调用
evtSource.onopen = function () {
console.log("连接开始启动");
};
// 每次接受数据时调用
evtSource.onmessage = (e) => {
console.log('输入每次接受的数据',e)
};
// 每次连接发生错误时调用
evtSource.onerror = function () {
console.log("连接发生错误");
};
// 关闭(可以在最后的数据传递关闭的标识来进行处理)
eventSource.close()
从上述可知,后端提供返回流式的接口后,表面看上去前端优先选择应该为EventSource,但是实际场景是,我们可能会有大量的参数和Header(token等)需要传递给接口,而且EventSource只支持GET请求,无法直接达到我们需要的效果,那就止步于此了吗?不,经过查阅资料,我大致掌握后述类似的可行性方案
EventSource polyfil
它支持让我们传递Header的参数
import { EventSourcePolyfill } from 'event-source-polyfill';
const eventSource = new EventSourcePolyfill('/api/steam', {
headers: {
'Authorization': 'value'
}
});
// 每次连接开启时调用
evtSource.onopen = function () {
console.log("连接开始启动");
};
// 每次接受数据时调用
evtSource.onmessage = (e) => {
console.log('输入每次接受的数据',e)
};
// 每次连接发生错误时调用
evtSource.onerror = function () {
console.log("连接发生错误");
};
// 关闭(可以在最后的数据传递关闭的标识来进行处理)
eventSource.close()
Fetch
关于它的作用已经在另外文章讲述过了,此处不再赘述,fetch本身不直接支持流式输出,因为fetch 并没有onmessage方法,我们可以fetch用于发起SSE请求,而EventSource用于处理服务器端推送的数据。结合两个API简单实现一个fetchStream方法,直接上示例代码
/**
* 将Uint8Array转换为字符串
* @param {Uint8Array} fileData
* @returns
*/
const Uint8ArrayToString = (fileData) => {
const utf8 = Array.from(fileData)
.map((item) => String.fromCharCode(item))
.join('');
return decodeURIComponent(escape(utf8));
};
/**
* 基于fetch/ReadableStream 自定义封装的fetch请求
* @param {*} url 请求链接
* @param {*} params
* @returns
*/
const fetchStream = (url, params) => {
const { onmessage, onclose, ...otherParams } = params;
const push = async (controller, reader) => {
const { value, done } = await reader.read();
if (done) {
controller.close();
onclose?.();
} else {
onmessage?.(Uint8ArrayToString(value));
controller.enqueue(value);
push(controller, reader);
}
};
return fetch(url, otherParams)
.then((response) => {
const reader = response.body.getReader();
const stream = new ReadableStream({
start(controller) {
push(controller, reader);
},
});
return stream;
})
.then((stream) => new Response(stream, { headers: { 'Content-Type': 'text/html' } }).text());
};
/**
* 使用fetch
*/
fetchStream(`/stream?message=${message}`, {
method: 'GET',
headers: {
accept: 'text/event-stream',
'Content-Type': 'application/json',
authorization:'value',
},
onmessage: (res) => {
console.log(res);
},
});
fetch-event-source
它支持让我们传递Header的参数,同时支持POST请求
import { fetchEventSource } from '@microsoft/fetch-event-source';
const ctrl = new AbortController();
const url = 'http:xxx/api/stream';
fetchEventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer ',
},
body: JSON.stringify({
code: 'A123',
content: '你是谁?'
}),
openWhenHidden: true, // 控制是否在切换页面时中断请求
signal: ctrl?.signal,
onopen: async (response) => {
if (
response.ok &&
response.headers.get('content-type') === 'text/event-stream'
) return;
if (response.status !== 200) {
//错误处理
}
},
onmessage: async (msg) => {
console.log(msg)
// 处理其他情况
},
onclose: () => {
ctrl.abort(); //出错后不要重试
console.log('close');
},
onerror: (err) => {
ctrl.abort();
console.log('err', err);
},
}).catch((err) => {
console.log({ err });
});
踩坑注意
- 框架内部代理无法使用。若使用了自身的框架代理,若没做特殊处理并不会走事件流的形式,而是在数据统一接受完成后一次性返回。因此这里我们直接写入http形式的请求地址
- 不同源时cookie无法携带。因为使用了http形式而不是代理,这就导致了本机调试时是无法携带cookie到服务端,在一些cookie鉴权的场景会导致鉴权失败。这是浏览器的安全策略,我们可以利用谷歌的插件进行非同源的cookie传送