@bdky/ky-sse-hook
v1.0.2
Published
ky afterResponse hook for processing SSE streams
Readme
@bdky/ky-sse-hook
English | 简体中文
一个轻量级的 ky afterResponse 钩子,用于处理 Server-Sent Events (SSE) 流式响应。基于 eventsource-parser 实现符合规范的 SSE 解析。
适用于 AI 对话流式输出、实时数据推送,以及任何需要通过 ky 消费 SSE 响应的场景。
特性
- 无缝 ky 集成 — 直接插入 ky 的
hooks.afterResponse - 符合规范的 SSE 解析 — 基于 eventsource-parser v3
- 回调驱动 API —
onData/onCompleted/onAborted/onEvent/onMessage/onReconnectInterval - 至多一次完成保证 — 内部守卫确保
onCompleted至多触发一次 - 感知中止操作 — 检测
AbortController信号并路由到onAborted - 完整 TypeScript 支持 — 内置 TypeScript 类型声明
安装
# npm
npm install @bdky/ky-sse-hook
# yarn
yarn add @bdky/ky-sse-hook
# pnpm
pnpm add @bdky/ky-sse-hook前置依赖: 需要
ky >= 1.0.0,如果尚未安装请单独安装。
同时发布 ESM 和 CJS 格式,附带 TypeScript 类型声明。
快速开始
import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';
const hook = createHook({
onData(message) {
console.log('收到数据:', message);
},
onCompleted(error) {
if (error) {
console.error('流失败:', error);
return;
}
console.log('流结束');
}
});
await ky.post('https://api.example.com/chat/stream', {
json: {prompt: 'Hello, world!'},
hooks: {afterResponse: [hook]},
timeout: false
});注意: 使用 SSE 流时请设置
timeout: false。长时间运行的流会被 ky 的默认超时中断。
API
createHook(options)
创建一个 ky AfterResponseHook,将响应体作为 SSE 流进行消费。
import type {AfterResponseHook} from 'ky';
const hook: AfterResponseHook = createHook(options);行为说明:
- 如果响应非成功(
response.ok === false)或没有 body,hook 会立即返回,不消费流。 - 响应体通过
ReadableStream读取,以 UTF-8 解码后送入 SSE 解析器。 - hook 会等待流完全消费后才返回,避免 ky 尝试读取已被消费的 body 导致冲突。
CreateHookOptions
| 属性 | 类型 | 必填 | 说明 |
|------|------|------|------|
| onData | (message: string) => void | 是 | 每个 data 行触发。当单条 SSE 消息包含多个 data: 字段时,按 \n 分割后逐行调用 onData。 |
| onCompleted | (error?: Error) => void | 否 | 流结束时调用一次。如果流因错误(非中止)终止,会传入 error 参数。保证至多触发一次。 |
| onAborted | () => void | 否 | 通过 AbortController 中止请求时调用。中止时 onCompleted 不会被调用。 |
| onEvent | (event: EventSourceMessage) => void | 否 | 收到 data 字段非空的 SSE 事件时,传入完整的 EventSourceMessage。空 data 的事件会被静默跳过。在 onData 之前触发。 |
| onMessage | (event: EventSourceMessage) => void | 否 | onEvent 的别名。两者同时提供时都会被调用。空 data 的事件同样被跳过。 |
| onReconnectInterval | (value: number) => void | 否 | SSE 流包含 retry: 指令时调用,参数为重连间隔(毫秒)。 |
onData 与 onEvent / onMessage 的区别
onData接收每个data:行的原始字符串内容。如果单条 SSE 消息有多个data:字段,每行触发一次onData。onEvent/onMessage接收完整的EventSourceMessage对象,包含data、event、id字段。每条 SSE 消息触发一次,在onData之前。
EventSourceMessage
从 eventsource-parser 重新导出。类型结构:
interface EventSourceMessage {
data: string;
event?: string;
id?: string;
}使用示例
AI 对话流式输出
解析 JSON 编码的 SSE 数据并累积响应:
import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';
interface ChatChunk {
answer: string;
is_end: boolean;
}
let fullResponse = '';
const hook = createHook({
onData(data) {
try {
const chunk: ChatChunk = JSON.parse(data);
fullResponse += chunk.answer;
console.log('当前响应:', fullResponse);
}
catch {
console.error('解析失败:', data);
}
},
onCompleted(error) {
if (error) {
console.error('流错误:', error);
return;
}
console.log('最终响应:', fullResponse);
}
});
await ky.post('https://api.example.com/chat/stream', {
json: {
messages: [
{role: 'user', content: '介绍一下量子计算'}
]
},
hooks: {afterResponse: [hook]},
timeout: false
});中止请求
使用 AbortController 取消流式请求:
import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';
const controller = new AbortController();
const hook = createHook({
onData(data) {
console.log('数据块:', data);
// 收到第一个数据块后中止
controller.abort();
},
onAborted() {
console.log('请求已被用户中止');
},
onCompleted(error) {
// 中止时不会调用
if (error) {
console.error('错误:', error);
return;
}
console.log('完成');
}
});
await ky.post('https://api.example.com/chat/stream', {
json: {prompt: '给我讲一个长故事'},
hooks: {afterResponse: [hook]},
signal: controller.signal,
timeout: false
});完整事件元数据
获取每个 SSE 事件的完整 EventSourceMessage:
import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';
const hook = createHook({
onData(data) {
console.log('数据:', data);
},
onEvent(event) {
console.log('事件类型:', event.event);
console.log('事件 ID:', event.id);
console.log('事件数据:', event.data);
},
onReconnectInterval(interval) {
console.log('服务端建议重连间隔:', interval, 'ms');
}
});
await ky.post('https://api.example.com/events', {
hooks: {afterResponse: [hook]},
timeout: false
});错误处理
同时处理流级别和 HTTP 级别的错误:
import ky, {HTTPError} from 'ky';
import {createHook} from '@bdky/ky-sse-hook';
const hook = createHook({
onData(data) {
console.log('数据:', data);
},
onCompleted(error) {
if (error) {
console.error('流读取失败:', error.message);
return;
}
console.log('流正常完成');
}
});
try {
await ky.post('https://api.example.com/stream', {
json: {prompt: 'Hello'},
hooks: {afterResponse: [hook]},
timeout: false
});
}
catch (error) {
// ky 对非 2xx 响应抛出 HTTPError
// hook 会跳过非成功响应,因此 HTTP 错误
// 在这里处理,而不是在 onCompleted 中
if (error instanceof HTTPError) {
console.error('HTTP 错误:', error.response.status);
}
}工作原理
ky.post(url, { hooks: { afterResponse: [hook] } })
│
▼
response.ok && response.body?
│ 否 → return(跳过)
│ 是
▼
response.body.getReader()
│
▼
TextDecoder (UTF-8)
│
▼
eventsource-parser
│
├─ onEvent → options.onEvent()
│ options.onMessage()
│ data.split('\n') → options.onData()(逐行)
│
└─ onRetry → options.onReconnectInterval()
│
▼
流结束?
├─ 正常结束 → onCompleted()
├─ 被中止 → onAborted()
└─ 发生错误 → onCompleted(error)- hook 接收 ky 响应,检查是否成功且有可读的 body。
- 通过
ReadableStreamreader 分块消费 body。 - 每个数据块通过
TextDecoder从Uint8Array解码为字符串。 - 解码后的文本送入
eventsource-parser,解析器发出结构化的 SSE 事件。 - 流正常结束时,调用
onCompleted()(不带参数)。 - 如果请求通过
AbortController中止,则调用onAborted()。 - 如果读取过程中发生意外错误,调用
onCompleted(error)。
浏览器兼容性
| 浏览器 | 最低版本 | |--------|----------| | Chrome | >= 74 | | Firefox | >= 90 | | Safari | >= 14.1 | | Edge | >= 79 | | iOS Safari | >= 14.1 | | Android Chrome | >= 74 |
需要支持 ReadableStream、TextDecoder 和 fetch API。
常见问题
为什么 hook 不抛出异常?
这是设计选择。流读取错误通过 onCompleted(error) 传递,而不是抛出异常。这使调用方能够以回调风格处理错误,与 API 的其余部分保持一致,同时避免流式场景中的未捕获 Promise rejection。
支持 GET 请求吗?
支持。hook 适用于任何 HTTP 方法(GET、POST、PUT 等),只要响应返回 SSE body 即可。它接入 ky 的 afterResponse 钩子,该钩子无论请求方法如何都会触发。
为什么同时有 onEvent 和 onMessage?
它们是便利别名。两者接收相同的 EventSourceMessage,每个 SSE 事件都会触发两者。你可以选择在代码中感觉更自然的名称。如果两者都提供,两者都会被调用。
非 2xx 响应怎么处理?
hook 检查 response.ok,如果响应不成功则跳过处理。非 2xx 错误由 ky 内置的错误处理机制(如 HTTPError)处理,你可以在 ky 调用外层用标准 try/catch 捕获。
会自动重连吗?
不会。此 hook 处理的是单次 SSE 响应。如果服务端发送了 retry: 指令,该值会转发到 onReconnectInterval,但重连逻辑由使用方自行实现。
为什么要设置 timeout: false?
SSE 流是长时间运行的连接。ky 的默认超时(10 秒)会在流完成前中断请求。设置 timeout: false 以禁用流式请求的超时:
await ky.post(url, {
hooks: {afterResponse: [hook]},
timeout: false
});相关项目
- ky — 优雅的 HTTP 请求库
- eventsource-parser — 流式 SSE 解析器
- @bdky/aaas-pilot-kit — 内部使用此 hook 的 AI Pilot SDK
许可证
MIT
