前端流式输出

摘要

自chatGPT诞生以来,前端页面的流式输出,已经非常常见了。但凡涉及到大模型、AI智能体开发的项目,都离不开前端的流式输出。

本文所采用的环境

node.js v18.20.1

koa 3.0

正文

传统的、标准的、典型的SSE

SSE(Server-Sent Events)是一种用于在浏览器中从服务器接收数据的技术。它允许服务器向浏览器推送数据,而不需要浏览器主动请求数据。

服务端关键

  • 响应方法 get
  • 响应头设置 Content-Type: text/event-stream

koa 代码

import Router from "koa-router";
import { PassThrough } from "node:stream";

const demo = new Router();
demo.get("/sse", async (ctx) => {
    ctx.set({
        "Content-Type": "text/event-stream;charset=utf-8",
        "Cache-Control": "no-cache",
        Connection: "keep-alive",
    });

    let stream = new PassThrough();
    ctx.status = 200;
    ctx.body = stream;
    let str = "大家好,这里展示SSE流式输出的效果。";
    sse(str, 0, stream);
});

function sse(str, i, stream) {
    if (i === str.length) return stream.end();
    stream.write("data: " + JSON.stringify({ value: str[i] }));
    stream.write("\n\n");
    setTimeout(() => {
        sse(str, i + 1, stream);
    }, Math.random() * 1000);
}

服务端注意事项

  • SSE 消息体 每条消息以\n\n结束
  • 允许自定义消息体,具体可参考EventSource

前端关键

  • 使用EventSource客户端
  • EventSource仅支持GET请求
  • 服务端消息体要严格按照SSE规范输出
  • 监听onmessage事件或者服务端自定义事件

前端代码

const source = new EventSource("http://localhost:3000/demo/sse");
source.onmessage = (e) => {
    console.log(e.data);
};

前端注意事项

  • 单向通信:SSE是一种单向通信协议,即服务器向客户端推送数据,如需双向通信,需要使用WebSocket。
  • 自动重连:EventSource内置了自动重连功能,当连接断开时,会自动重新连接。在上面的例子中,当服务端stream.end()时,EventSource会自动重新连接。所以应当在合适的时机,前端主动断开连接,防止无限重连(source.close())。

总结

标准的EventSource实现简单,但仅支持GET请求,无法传输大量数据;不支持自定义Header,也无法实现鉴权。因此,使用场景有限,一般用来实现简单聊天应用、单纯从服务端推送数据到客户端的场景。

为了满足更多的需求,前端开始花式整活~

非标准SSE方式1-服务端按标准的SSE输出,浏览器端使用fetch

浏览器端代码

fetch("http://localhost:3000/demo/sse").then(async (response) => {
    if (response.ok) {
        let decoder = new TextDecoder("utf-8", { stream: true });
        let reader = response.body?.getReader();
        while (true) {
            let { done, value } = await reader.read();
            if (done) break;
            let data = decoder.decode(value);
            console.log("data:", data);
        }
    }
});

注意事项

  • value 原始值是一个ArrayBuffer,需要使用TextDecoder进行解码。

非标准SSE方式2-服务端按标准的SSE输出,浏览器端使用普通的XMLHttpRequest

浏览器端代码

let xhr = new XMLHttpRequest();
xhr.open("GET", "http://localhost:3000/demo/sse", true);
xhr.onprogress = (e) => {
    console.log(xhr.responseText);
};
xhr.send();

同学们尝试一下,可以发现,这种方式返回的数据是历次消息体拼接后的结果。熟悉XMLHttpRequest的同学应该知道,这其实跟文件上传监听上传进度一样的道理,得到的是实时的进度,这样是不是就能理解为何的返回数据是历次消息体拼接后的结果了?

注意事项

  • XMLHttpRequest并没有真正建立长连接,所以当消息超长时,会面临超时问题。所以,该方式实际是一种“伪流式传输
  • 由于返回的是历次消息体拼接后的结果,如需处理单个消息体,需要自己实现。

非标准SSE方式3-服务端按POST方式输出SSE,浏览器端使用(fetch/XMLHttpRequest)接收

此种方式前端代码和方式1、方式2一致,服务端改成POST方式输出SSE,实际效果一致,仅需把GET换成POST,可以支持前端大量数据的传输,可以传输大文件,不在赘述。

非标准SSE方式4-服务端按chunked方式输出(分块传输),浏览器端使用fetch接收

服务端代码

import Router from "koa-router";
import { PassThrough } from "node:stream";

const demo = new Router();

demo.post("/chunked", async (ctx) => {
    ctx.set({
        "Content-Type": "text/plain;charset=utf-8",
        "Transfer-Encoding": "chunked",
        Connection: "keep-alive",
    });
    let stream = new PassThrough();
    ctx.status = 200;
    ctx.body = stream;
    let str = "大家好,这里展示chunked流式输出的效果。";
    chunked(str, 0, stream);
});

function chunked(str, i, res) {
    if (i === str.length) return res.end();
    res.write(str[i] + "\r\n");
    setTimeout(() => {
        chunked(str, i + 1, res);
    }, Math.random() * 1000);
}

前端代码

fetch("http://localhost:3000/demo/chunked", {
    method: "POST",
}).then(async (response) => {
    if (response.ok) {
        let decoder = new TextDecoder("utf-8", { stream: true });
        let reader = response.body?.getReader();
        while (true) {
            let { done, value } = await reader.read();
            if (done) break;
            let data = decoder.decode(value);
            console.log("data:", data);
        }
    }
});

注意事项

  • chunked方式,服务端返回的响应头中,需要设置Transfer-Encoding: chunked,表示返回的是分块传输。
  • 实际效果发现与SSE一样
  • 仔细观察可发现,SSE中会默认设置响应头Transfer-Encoding: chunked

结尾

想要实现流式传输,最佳方案如下:

  • 服务端按照不必严格按照SSE消息体方式传输,GET/POST均可,只需设置如下响应头即可。

    Content-Type: text/event-stream

    Connection: keep-alive

    Cache-Control: no-cache

    Transfer-Encoding: chunked

  • 浏览器端使用fetch接收