npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@bdky/ky-sse-hook

v1.0.2

Published

ky afterResponse hook for processing SSE streams

Readme

@bdky/ky-sse-hook

npm version bundle size TypeScript License: MIT

English | 简体中文

一个轻量级的 ky afterResponse 钩子,用于处理 Server-Sent Events (SSE) 流式响应。基于 eventsource-parser 实现符合规范的 SSE 解析。

适用于 AI 对话流式输出、实时数据推送,以及任何需要通过 ky 消费 SSE 响应的场景。

特性

  • 无缝 ky 集成 — 直接插入 ky 的 hooks.afterResponse
  • 符合规范的 SSE 解析 — 基于 eventsource-parser v3
  • 回调驱动 APIonData / onCompleted / onAborted / onEvent / onMessage / onReconnectInterval
  • 至多一次完成保证 — 内部守卫确保 onCompleted 至多触发一次
  • 感知中止操作 — 检测 AbortController 信号并路由到 onAborted
  • 完整 TypeScript 支持 — 内置 TypeScript 类型声明

安装

# npm
npm install @bdky/ky-sse-hook

# yarn
yarn add @bdky/ky-sse-hook

# pnpm
pnpm add @bdky/ky-sse-hook

前置依赖: 需要 ky >= 1.0.0,如果尚未安装请单独安装。

同时发布 ESM 和 CJS 格式,附带 TypeScript 类型声明。

快速开始

import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';

const hook = createHook({
    onData(message) {
        console.log('收到数据:', message);
    },
    onCompleted(error) {
        if (error) {
            console.error('流失败:', error);
            return;
        }
        console.log('流结束');
    }
});

await ky.post('https://api.example.com/chat/stream', {
    json: {prompt: 'Hello, world!'},
    hooks: {afterResponse: [hook]},
    timeout: false
});

注意: 使用 SSE 流时请设置 timeout: false。长时间运行的流会被 ky 的默认超时中断。

API

createHook(options)

创建一个 ky AfterResponseHook,将响应体作为 SSE 流进行消费。

import type {AfterResponseHook} from 'ky';

const hook: AfterResponseHook = createHook(options);

行为说明:

  • 如果响应非成功response.ok === false)或没有 body,hook 会立即返回,不消费流。
  • 响应体通过 ReadableStream 读取,以 UTF-8 解码后送入 SSE 解析器。
  • hook 会等待流完全消费后才返回,避免 ky 尝试读取已被消费的 body 导致冲突。

CreateHookOptions

| 属性 | 类型 | 必填 | 说明 | |------|------|------|------| | onData | (message: string) => void | 是 | 每个 data 行触发。当单条 SSE 消息包含多个 data: 字段时,按 \n 分割后逐行调用 onData。 | | onCompleted | (error?: Error) => void | 否 | 流结束时调用一次。如果流因错误(非中止)终止,会传入 error 参数。保证至多触发一次。 | | onAborted | () => void | 否 | 通过 AbortController 中止请求时调用。中止时 onCompleted 不会被调用。 | | onEvent | (event: EventSourceMessage) => void | 否 | 收到 data 字段非空的 SSE 事件时,传入完整的 EventSourceMessage。空 data 的事件会被静默跳过。在 onData 之前触发。 | | onMessage | (event: EventSourceMessage) => void | 否 | onEvent 的别名。两者同时提供时都会被调用。空 data 的事件同样被跳过。 | | onReconnectInterval | (value: number) => void | 否 | SSE 流包含 retry: 指令时调用,参数为重连间隔(毫秒)。 |

onDataonEvent / onMessage 的区别

  • onData 接收每个 data: 行的原始字符串内容。如果单条 SSE 消息有多个 data: 字段,每行触发一次 onData
  • onEvent / onMessage 接收完整的 EventSourceMessage 对象,包含 dataeventid 字段。每条 SSE 消息触发一次,在 onData 之前。

EventSourceMessage

eventsource-parser 重新导出。类型结构:

interface EventSourceMessage {
    data: string;
    event?: string;
    id?: string;
}

使用示例

AI 对话流式输出

解析 JSON 编码的 SSE 数据并累积响应:

import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';

interface ChatChunk {
    answer: string;
    is_end: boolean;
}

let fullResponse = '';

const hook = createHook({
    onData(data) {
        try {
            const chunk: ChatChunk = JSON.parse(data);
            fullResponse += chunk.answer;
            console.log('当前响应:', fullResponse);
        }
        catch {
            console.error('解析失败:', data);
        }
    },
    onCompleted(error) {
        if (error) {
            console.error('流错误:', error);
            return;
        }
        console.log('最终响应:', fullResponse);
    }
});

await ky.post('https://api.example.com/chat/stream', {
    json: {
        messages: [
            {role: 'user', content: '介绍一下量子计算'}
        ]
    },
    hooks: {afterResponse: [hook]},
    timeout: false
});

中止请求

使用 AbortController 取消流式请求:

import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';

const controller = new AbortController();

const hook = createHook({
    onData(data) {
        console.log('数据块:', data);

        // 收到第一个数据块后中止
        controller.abort();
    },
    onAborted() {
        console.log('请求已被用户中止');
    },
    onCompleted(error) {
        // 中止时不会调用
        if (error) {
            console.error('错误:', error);
            return;
        }
        console.log('完成');
    }
});

await ky.post('https://api.example.com/chat/stream', {
    json: {prompt: '给我讲一个长故事'},
    hooks: {afterResponse: [hook]},
    signal: controller.signal,
    timeout: false
});

完整事件元数据

获取每个 SSE 事件的完整 EventSourceMessage

import ky from 'ky';
import {createHook} from '@bdky/ky-sse-hook';

const hook = createHook({
    onData(data) {
        console.log('数据:', data);
    },
    onEvent(event) {
        console.log('事件类型:', event.event);
        console.log('事件 ID:', event.id);
        console.log('事件数据:', event.data);
    },
    onReconnectInterval(interval) {
        console.log('服务端建议重连间隔:', interval, 'ms');
    }
});

await ky.post('https://api.example.com/events', {
    hooks: {afterResponse: [hook]},
    timeout: false
});

错误处理

同时处理流级别和 HTTP 级别的错误:

import ky, {HTTPError} from 'ky';
import {createHook} from '@bdky/ky-sse-hook';

const hook = createHook({
    onData(data) {
        console.log('数据:', data);
    },
    onCompleted(error) {
        if (error) {
            console.error('流读取失败:', error.message);
            return;
        }
        console.log('流正常完成');
    }
});

try {
    await ky.post('https://api.example.com/stream', {
        json: {prompt: 'Hello'},
        hooks: {afterResponse: [hook]},
        timeout: false
    });
}
catch (error) {
    // ky 对非 2xx 响应抛出 HTTPError
    // hook 会跳过非成功响应,因此 HTTP 错误
    // 在这里处理,而不是在 onCompleted 中
    if (error instanceof HTTPError) {
        console.error('HTTP 错误:', error.response.status);
    }
}

工作原理

ky.post(url, { hooks: { afterResponse: [hook] } })
        │
        ▼
  response.ok && response.body?
        │ 否 → return(跳过)
        │ 是
        ▼
  response.body.getReader()
        │
        ▼
  TextDecoder (UTF-8)
        │
        ▼
  eventsource-parser
        │
        ├─ onEvent → options.onEvent()
        │            options.onMessage()
        │            data.split('\n') → options.onData()(逐行)
        │
        └─ onRetry → options.onReconnectInterval()
        │
        ▼
  流结束?
   ├─ 正常结束    → onCompleted()
   ├─ 被中止      → onAborted()
   └─ 发生错误    → onCompleted(error)
  1. hook 接收 ky 响应,检查是否成功且有可读的 body。
  2. 通过 ReadableStream reader 分块消费 body。
  3. 每个数据块通过 TextDecoderUint8Array 解码为字符串。
  4. 解码后的文本送入 eventsource-parser,解析器发出结构化的 SSE 事件。
  5. 流正常结束时,调用 onCompleted()(不带参数)。
  6. 如果请求通过 AbortController 中止,则调用 onAborted()
  7. 如果读取过程中发生意外错误,调用 onCompleted(error)

浏览器兼容性

| 浏览器 | 最低版本 | |--------|----------| | Chrome | >= 74 | | Firefox | >= 90 | | Safari | >= 14.1 | | Edge | >= 79 | | iOS Safari | >= 14.1 | | Android Chrome | >= 74 |

需要支持 ReadableStreamTextDecoderfetch API。

常见问题

为什么 hook 不抛出异常?

这是设计选择。流读取错误通过 onCompleted(error) 传递,而不是抛出异常。这使调用方能够以回调风格处理错误,与 API 的其余部分保持一致,同时避免流式场景中的未捕获 Promise rejection。

支持 GET 请求吗?

支持。hook 适用于任何 HTTP 方法(GET、POST、PUT 等),只要响应返回 SSE body 即可。它接入 ky 的 afterResponse 钩子,该钩子无论请求方法如何都会触发。

为什么同时有 onEventonMessage

它们是便利别名。两者接收相同的 EventSourceMessage,每个 SSE 事件都会触发两者。你可以选择在代码中感觉更自然的名称。如果两者都提供,两者都会被调用。

非 2xx 响应怎么处理?

hook 检查 response.ok,如果响应不成功则跳过处理。非 2xx 错误由 ky 内置的错误处理机制(如 HTTPError)处理,你可以在 ky 调用外层用标准 try/catch 捕获。

会自动重连吗?

不会。此 hook 处理的是单次 SSE 响应。如果服务端发送了 retry: 指令,该值会转发到 onReconnectInterval,但重连逻辑由使用方自行实现。

为什么要设置 timeout: false

SSE 流是长时间运行的连接。ky 的默认超时(10 秒)会在流完成前中断请求。设置 timeout: false 以禁用流式请求的超时:

await ky.post(url, {
    hooks: {afterResponse: [hook]},
    timeout: false
});

相关项目

许可证

MIT