当你构建用于数据流传输的功能时,比如人工智能聊天响应或实时通知功能,实际的网络环境往往并不像 `fetch` 这个API所显示的那样理想。
连接会中断,代理服务器会缓存响应内容,移动网络也可能会在数据传输过程中从WiFi切换到蜂窝网络。如果你的流处理代码没有考虑到这些情况,用户就会看到响应突然停止,既没有任何错误提示,也无法恢复数据传输。
在本文中,你将使用一个名为 Ore 的开源TypeScript库,作为实际案例来学习如何构建能够适应真实网络环境的流处理客户端:该库支持自动重试机制,遵循官方的Server-Sent Events解析规范,并且可以与React及React Server Components轻松集成。
通过学习本文的内容,你将会明白异步生成器、`Fetch API` 以及Server-Sent Events规范是如何协同工作的,从而构建出比简单的 `fetch` 和 `response.body.getReader()` 循环更加可靠的流处理系统。
目录
先决条件
要顺利完成学习内容,你需要具备以下条件:
-
对TypeScript有基本的了解
-
熟悉`fetch`、`ReadableStream`以及`async`/`await`这些概念
-
具备React的基础知识(尤其是与本文相关的内容部分)
你将学到什么
-
如何使用异步生成器从`fetch`响应中获取原始文本或字节数据
-
如何手动逐字段解析Server-Sent Events格式的数据
-
如何利用“最后事件ID”实现自动重连功能,从而避免丢失任何数据
-
如何使用指数退步策略来处理重试请求
-
如何将流处理客户端与React的状态管理机制及React Server Components结合使用
什么是服务器发送的事件?
服务器发送的事件(SSE)是一种Web标准,它允许通过单个HTTP连接从服务器向客户端进行单向数据流传输。与WebSocket不同,SSE使用的是普通的HTTP协议,因此它可以通过现有的基础设施如负载均衡器和代理服务器来正常工作,而无需进行任何特殊配置。
SSE响应在网络传输中的格式如下:
event: update
id: 42
data: {"status": "processing"}
event: update
id: 43
data: {"status": "complete"}
每个事件之间都会用空行分隔。data字段用于携带数据内容,event用于标识事件的类型,而id则帮助客户端确定自己在数据流中的位置,从而便于重新连接。
浏览器内置了EventSource API来处理这类事件,但该API存在一些限制:不支持自定义请求头,无法发送POST请求,而且不同浏览器的重新连接行为也并不一致。对于那些比较复杂的应用场景,通常需要自行解析这些数据流。
为什么需要开发自定义的流式传输客户端?
在许多流式传输的场景中,比如AI聊天系统的响应机制,并没有使用SSE规范。这些数据实际上只是以原始文本的形式依次传送到客户端而已。然而,在一些其他场景中,比如实时通知功能,SPE提供的数据结构确实能够带来很多便利:例如带有名称的事件、用于恢复数据流的标识符,以及由服务器控制的重试机制。
为了应对这两种情况,我们提供了两个独立的函数:
-
stream()用于处理原始文本或字节流,完全不考虑数据的格式问题 -
streamSSE()用于解析符合SPE规范的数据流
这两个函数都是异步生成器,因此在使用方式上是完全相同的:
for await (const chunk of stream("https://api.example.com/chat")) {
console.log(chunk);
}
如何使用异步生成器来传输原始数据块?
最简单的应用场景就是传输原始文本。这种方式在处理AI聊天系统的响应或日志数据时非常有用,因为这些数据并没有特定的结构,只是按顺序传来的字节序列而已。
下面是stream()函数的核心实现:
export async function* stream(
url: string,
options?: StreamOptions
): AsyncGenerator<string | Uint8Array, void, unknown>> {
const { headers, retries = 3, signal, decode = true } = options || {};
let retryCount = 0;
while (retryCount <= retries) {
try {
const response = await fetch(url, { method: "GET", headers, signal });
if (!response.body) {
throw new Error("响应体为空");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield decode ? decoder.decode(value, { stream: true }) : value;
}
} finally {
reader.releaseLock();
}
return;
} catch (error: any) {
if (signal?.aborted) throw error;
retryCount++;
if (retryCount > retries) {
throw new Error(`最大重试次数已达到。最后一次错误信息:${error.message}`);
}
await new Promise((r) => setTimeout(r, 1000 * retryCount));
}
}
}
有几项设计决策值得特别提及。
该函数是一个异步生成器(`async function*`),因此调用者可以使用`for await…of`来简化代码,而无需手动管理读取器和循环。这就是直接提供原始的`ReadableStream`与提供易于使用的接口之间的区别。
无论循环是通过`break`提前结束还是因为异常而终止,`finally`块都会释放读取器锁。忽略这一点是导致数据流泄漏的常见原因。
重试机制仅会捕获来自`fetch`调用和读取循环的错误。如果失败是由于`AbortSignal`引起的,系统会立即重新抛出异常,而不会进行重试——因为对于故意引发的取消操作来说,重试是没有意义的。
如何手动解析服务器发送的事件
SSE规范采用了一种简单的文本格式,但要正确解析这些数据,就需要处理一些边缘情况:例如跨多行分布的事件、以冒号开头的注释行、没有值的字段,以及数据块末尾不完整的行。
以下是`streamSSE()`函数内部的核心状态机逻辑:
let buffer = "";
let currentEvent: Partial = { data: "", event: null, id: null };
let hasData = false;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split(/\r\n|\r|\n/);
buffer = lines.pop() || ""; // 保留最后不完整的行,以备后续处理
for (const line of lines) {
if (line === "") {
if (hasData) {
const event: SSEEvent = {
id: currentEvent.id ?? lastEventId,
event: currentEvent.event ?? null,
data: currentEvent.data!.endsWith("\n")
? currentEvent.data!.slice(0, -1)
: currentEvent.data!,
retry: currentEvent.retry,
};
if (event.id) lastEventId = event.id;
yield event;
currentEvent = { data: "", event: null, id: null };
hasData = false;
}
continue;
}
if (line.startsWith(":")) continue; // 注释行,忽略
const colonIndex = line.indexOf(":");
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
let valueStr = colonIndex === -1 ? "" : line.slice(colonIndex + 1);
if (valueStr.startsWith(" ")) valueStr = valueStr.slice(1);
switch (field) {
case "data":
currentEvent.data += valueStr + "\n";
hasData = true;
break;
case "event":
currentEvent.event = valueStr;
break;
case "id":
if (valueStr.indexOf("\0") === -1) currentEvent.id = valueStr;
break;
case "retry":
const retry = parseInt(valueStr, 10);
if (!isNaN(retry)) retryInterval = retry;
break;
}
}
}
网络传输的数据块并不遵循行边界的规定。一次 `read()` 调用可能会在某行的中间位置结束,因此那一行中剩余的、可能尚未被完全读取的内容会被保存在 `buffer` 中,并会在后续的数据处理过程中被添加到下一块数据中,而不会被立即进行处理。如果采用简单的 `response.text()` 方法来获取数据内容,然后再对字符串进行分割处理,那么就很容易犯这种错误。
空行用于标记事件的结束。SSE事件并没有固定长度的头部信息,规范中明确指出空行用于划分事件的边界,因此解析器只有在检测到空行时才会认为一个事件已经结束。
根据规范,如果id字段中包含空字节,那么这个字段将会被直接忽略。这是一个很细节的问题,但几乎没有任何手动实现的方案能够一次性就处理正确。
如何利用最后一个事件的ID来实现重新连接
正是这一特性使得SSE相比普通的fetch流具有明显的优势:在连接中断后,它可以自动恢复传输而不会丢失已经接收到的数据。
let lastEventId: string | null = null;
while (retryCount <= retries) {
const headers = { ...customHeaders };
if (lastEventId) {
(headers as any)["Last-Event-ID"] = lastEventId;
}
const response = await fetch(url, { method: "GET", headers, signal });
// ... 读取并解析接收到的事件,同时更新lastEventId的值
}
每当有一个包含id字段的事件被接收到时,lastEventId的值就会更新。如果连接中断后客户端重新连接,它会在请求头中包含Last-Event-ID字段。一个合规的服务器会利用这个字段从正确的位置继续传输数据,而无需重新开始传输或跳过之前的内容。
不过,这一机制只有当服务器真正遵守这一规范时才能发挥作用。因此,这实际上是客户端与服务器之间的一种约定,而不是客户端单方面能够确保的事情。但客户端必须正确地跟踪并发送这个字段,这样才能使这种约定得以生效。
如何使用退避策略来处理重试请求
无论是stream()还是streamSSE()》,在遇到错误时都会尝试重新连接,但具体的处理方式会根据错误的原因而有所不同。
stream()采用简单的线性退避策略,其退避时间与重试次数成正比:
await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));
streamSSE()则会优先使用服务器在规范中指定的retry字段来决定退避时间;如果没有提供这个字段,它会使用默认值:
let retryInterval = 1000;
// ... 如果服务器提供了retry字段,就会使用该值
await new Promise((r) => setTimeout(r, retryInterval));
在实际应用中,让服务器来决定退避时间是非常重要的。因为当服务器负载较重时,它可以让客户端适当延长退避时间,而这正是SSE规范所鼓励的合作行为。
在这两个函数中,任何被中止的AbortSignal都会立即终止重试循环。将故意取消操作视为可重试的错误是一种常见的错误,解决办法就是在决定是否重新尝试之前检查signal?.aborted的值。
如何将这种方法与React结合使用
由于这两个函数都是异步生成器,因此要将它们与React的状态管理机制结合在一起,只需循环处理每个数据块,并在每次处理完一个数据块后调用setState即可:
function ChatComponent() {
const [messages, setMessages] = useState("");
useEffect(() => {
const controller = new AbortController();
(async () => {
try {
for await (const chunk of stream("/api/chat", { signal: controller.signal})) {
setMessages((prev) => prev + chunk);
}
} catch (err: any) {
if (err.name !== "AbortError") console.error(err);
}
})();
return () => controller.abort();
}, []);
return {messages};
}
这里,调用controllerabort()的清理函数起到了关键作用。如果没有这个函数,当用户离开该组件时,如果数据流仍在运行,那么获取数据的操作会继续在后台进行,从而导致已卸载的组件仍然会被更新。
如何将这种方法与React服务器组件结合使用
由于这些生成器是逐个产生数据值的,因此你也可以直接从异步迭代器中触发递归的Suspense效果,每当有新的数据块到达时,就立即将其发送给客户端:
async function StreamViewer({ iterator }: { iterator: AsyncIterator }) {
const { value, done } = await iterator.next();
if (done) return null;
return (
{value}
);
}
export default function Page() {
const dataStream = stream("https://api.example.com/stream");
const iterator = dataStream[Symbol.asyncIterator]();
return (
);
}
每次递归调用都会等待下一个数据块的到来,然后为剩余的部分渲染相应的Suspense元素。React会逐个将HTML片段发送给客户端,而不会等到所有数据都接收完毕才进行渲染。
结论
一个可靠的流式传输系统需要能够处理多种情况:连接可能会中断,数据块可能会被分割成多行传递,而且还需要区分取消操作和失败情况。
Ore采用的方法基于以下几项核心理念:
-
将流式数据源定义为异步生成器,这样使用者就可以使用
for await...of语法来处理这些数据。 -
手动逐字段解析SSE数据,严格遵守规范中规定的换行符分割规则,并将未完成解析的字符串分块存储起来。
-
跟踪
Last-Event-ID信息,以便在连接中断后能够重新建立连接,而无需从头开始重新传输数据。 -
将重试机制与取消操作视为不同的处理流程来处理。
-
在核心设计上保持中立性,同时为React和React服务器组件提供简洁的集成接口。
正是这种组合使得某些在演示环境中能够正常运行的流媒体客户端,能够在实际的网络环境下依然保持稳定的性能。您可以在github.com/glamboyosa/ore处查看完整的源代码。




