摘要
自chatGPT诞生以来,前端页面的流式输出,已经非常常见了。但凡涉及到大模型、AI智能体开发的项目,都离不开前端的流式输出。
本文所采用的环境
node.js v18.20.1
koa 3.0
正文
传统的、标准的、典型的SSE
SSE(Server-Sent Events)是一种用于在浏览器中从服务器接收数据的技术。它允许服务器向浏览器推送数据,而不需要浏览器主动请求数据。
服务端关键
- 响应方法 get
- 响应头设置 Content-Type: text/event-stream
koa 代码
import Router from "koa-router";
import { PassThrough } from "node:stream";
const demo = new Router();
demo.get("/sse", async (ctx) => {
ctx.set({
"Content-Type": "text/event-stream;charset=utf-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
let stream = new PassThrough();
ctx.status = 200;
ctx.body = stream;
let str = "大家好,这里展示SSE流式输出的效果。";
sse(str, 0, stream);
});
function sse(str, i, stream) {
if (i === str.length) return stream.end();
stream.write("data: " + JSON.stringify({ value: str[i] }));
stream.write("\n\n");
setTimeout(() => {
sse(str, i + 1, stream);
}, Math.random() * 1000);
}
服务端注意事项
- SSE 消息体 每条消息以
\n\n
结束 - 允许自定义消息体,具体可参考EventSource
前端关键
- 使用EventSource客户端
- EventSource仅支持GET请求
- 服务端消息体要严格按照SSE规范输出
- 监听onmessage事件或者服务端自定义事件
前端代码
const source = new EventSource("http://localhost:3000/demo/sse");
source.onmessage = (e) => {
console.log(e.data);
};
前端注意事项
- 单向通信:SSE是一种单向通信协议,即服务器向客户端推送数据,如需双向通信,需要使用WebSocket。
- 自动重连:EventSource内置了自动重连功能,当连接断开时,会自动重新连接。在上面的例子中,当服务端
stream.end()
时,EventSource会自动重新连接。所以应当在合适的时机,前端主动断开连接,防止无限重连(source.close()
)。
总结
标准的EventSource实现简单,但仅支持GET请求,无法传输大量数据;不支持自定义Header,也无法实现鉴权。因此,使用场景有限,一般用来实现简单聊天应用、单纯从服务端推送数据到客户端的场景。
为了满足更多的需求,前端开始花式整活~
非标准SSE方式1-服务端按标准的SSE输出,浏览器端使用fetch
浏览器端代码
fetch("http://localhost:3000/demo/sse").then(async (response) => {
if (response.ok) {
let decoder = new TextDecoder("utf-8", { stream: true });
let reader = response.body?.getReader();
while (true) {
let { done, value } = await reader.read();
if (done) break;
let data = decoder.decode(value);
console.log("data:", data);
}
}
});
注意事项
- value 原始值是一个
ArrayBuffer
,需要使用TextDecoder
进行解码。
非标准SSE方式2-服务端按标准的SSE输出,浏览器端使用普通的XMLHttpRequest
浏览器端代码
let xhr = new XMLHttpRequest();
xhr.open("GET", "http://localhost:3000/demo/sse", true);
xhr.onprogress = (e) => {
console.log(xhr.responseText);
};
xhr.send();
同学们尝试一下,可以发现,这种方式返回的数据是历次消息体拼接后的结果。熟悉XMLHttpRequest的同学应该知道,这其实跟文件上传监听上传进度一样的道理,得到的是实时的进度,这样是不是就能理解为何的返回数据是历次消息体拼接后的结果了?
注意事项
- XMLHttpRequest并没有真正建立长连接,所以当消息超长时,会面临超时问题。所以,该方式实际是一种“伪流式传输”
- 由于返回的是历次消息体拼接后的结果,如需处理单个消息体,需要自己实现。
非标准SSE方式3-服务端按POST方式输出SSE,浏览器端使用(fetch/XMLHttpRequest)接收
此种方式前端代码和方式1、方式2一致,服务端改成POST方式输出SSE,实际效果一致,仅需把GET换成POST,可以支持前端大量数据的传输,可以传输大文件,不在赘述。
非标准SSE方式4-服务端按chunked方式输出(分块传输),浏览器端使用fetch接收
服务端代码
import Router from "koa-router";
import { PassThrough } from "node:stream";
const demo = new Router();
demo.post("/chunked", async (ctx) => {
ctx.set({
"Content-Type": "text/plain;charset=utf-8",
"Transfer-Encoding": "chunked",
Connection: "keep-alive",
});
let stream = new PassThrough();
ctx.status = 200;
ctx.body = stream;
let str = "大家好,这里展示chunked流式输出的效果。";
chunked(str, 0, stream);
});
function chunked(str, i, res) {
if (i === str.length) return res.end();
res.write(str[i] + "\r\n");
setTimeout(() => {
chunked(str, i + 1, res);
}, Math.random() * 1000);
}
前端代码
fetch("http://localhost:3000/demo/chunked", {
method: "POST",
}).then(async (response) => {
if (response.ok) {
let decoder = new TextDecoder("utf-8", { stream: true });
let reader = response.body?.getReader();
while (true) {
let { done, value } = await reader.read();
if (done) break;
let data = decoder.decode(value);
console.log("data:", data);
}
}
});
注意事项
- chunked方式,服务端返回的响应头中,需要设置Transfer-Encoding: chunked,表示返回的是分块传输。
- 实际效果发现与SSE一样
- 仔细观察可发现,SSE中会默认设置响应头Transfer-Encoding: chunked
结尾
想要实现流式传输,最佳方案如下:
服务端按照不必严格按照SSE消息体方式传输,GET/POST均可,只需设置如下响应头即可。
Content-Type: text/event-stream
Connection: keep-alive
Cache-Control: no-cache
Transfer-Encoding: chunked
浏览器端使用fetch接收