npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@ahoo-wang/fetcher-eventstream

v3.3.9

Published

Server-Sent Events (SSE) support for Fetcher HTTP client with native LLM streaming API support. Enables real-time data streaming and token-by-token LLM response handling.

Readme

@ahoo-wang/fetcher-eventstream

npm version Build Status codecov License npm downloads npm bundle size Ask DeepWiki Storybook

为 Fetcher 提供 text/event-stream 支持,实现服务器发送事件(SSE)功能,用于实时数据流。

🌟 特性

  • 📡 事件流转换:将 text/event-stream 响应转换为 ServerSentEvent 对象的异步生成器
  • 🔌 自动扩展:模块导入时自动扩展 Response 原型,添加事件流方法
  • 📋 SSE 解析:根据规范解析服务器发送事件,包括数据、事件、ID 和重试字段
  • 🔄 流支持:正确处理分块数据和多行事件
  • 💬 注释处理:正确忽略注释行(以 : 开头的行)
  • 🛡️ TypeScript 支持:完整的 TypeScript 类型定义
  • ⚡ 性能优化:高效的解析和流处理,适用于高性能应用
  • 🤖 LLM 流准备就绪: 原生支持来自流行 LLM API(如 OpenAI GPT、Claude 等)的流式响应
  • 🔚 流终止检测:自动流终止检测,实现干净的资源管理和完成处理

🚀 快速开始

安装

# 使用 npm
npm install @ahoo-wang/fetcher-eventstream

# 使用 pnpm
pnpm add @ahoo-wang/fetcher-eventstream

# 使用 yarn
yarn add @ahoo-wang/fetcher-eventstream

模块导入

要使用事件流功能,您需要导入模块以执行其副作用:

import '@ahoo-wang/fetcher-eventstream';

此导入会自动扩展 Response 接口以处理服务器发送事件流:

  • eventStream() - 将带有 text/event-stream 内容类型的响应转换为 ServerSentEventStream
  • jsonEventStream<DATA>() - 将带有 text/event-stream 内容类型的响应转换为 JsonServerSentEventStream<DATA>
  • isEventStream getter - 检查响应是否具有 text/event-stream 内容类型
  • requiredEventStream() - 获取 ServerSentEventStream,如果不可用则抛出错误
  • requiredJsonEventStream<DATA>() - 获取 JsonServerSentEventStream<DATA>,如果不可用则抛出错误

这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。

集成测试示例:带事件流的 LLM 客户端

以下示例展示了如何创建带事件流支持的 LLM 客户端,类似于 Fetcher 项目中的集成测试。您可以在 integration-test/src/eventstream/llmClient.ts 中找到完整实现。

这个示例演示了如何使用 Fetcher 的流式传输功能与流行的 LLM API(如 OpenAI 的 GPT 模型)进行交互。

import {
  BaseURLCapable,
  ContentTypeValues,
  FetchExchange,
  NamedFetcher,
  REQUEST_BODY_INTERCEPTOR_ORDER,
  RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
  api,
  autoGeneratedError,
  body,
  post,
  ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';

export const llmFetcherName = 'llm';

export interface LlmOptions extends BaseURLCapable {
  apiKey: string;
  model?: string;
}

export class LlmRequestInterceptor implements RequestInterceptor {
  readonly name: string = 'LlmRequestInterceptor';
  readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;

  constructor(private llmOptions: LlmOptions) {}

  intercept(exchange: FetchExchange): void {
    const chatRequest = exchange.request.body as ChatRequest;
    if (!chatRequest.model) {
      chatRequest.model = this.llmOptions.model;
    }
  }
}

export function createLlmFetcher(options: LlmOptions): NamedFetcher {
  const llmFetcher = new NamedFetcher(llmFetcherName, {
    baseURL: options.baseURL,
    headers: {
      Authorization: `Bearer ${options.apiKey}`,
      'Content-Type': ContentTypeValues.APPLICATION_JSON,
    },
  });
  llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
  return llmFetcher;
}

@api('/chat', {
  fetcher: llmFetcherName,
  resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
  @post('/completions')
  streamChat(
    @body() body: ChatRequest,
  ): Promise<JsonServerSentEventStream<ChatResponse>> {
    throw autoGeneratedError(body);
  }

  @post('/completions', { resultExtractor: ResultExtractors.Json })
  chat(@body() body: ChatRequest): Promise<ChatResponse> {
    throw autoGeneratedError(body);
  }
}

使用 streamChat 进行实时响应

以下是使用 streamChat 方法从 LLM API 获取实时响应的示例:

import { createLlmFetcher, LlmClient } from './llmClient';

// 使用您的 API 配置初始化 LLM 客户端
const llmFetcher = createLlmFetcher({
  baseURL: 'https://api.openai.com/v1', // OpenAI 示例
  apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
  model: 'gpt-3.5-turbo', // 默认模型
});

// 创建客户端实例
const llmClient = new LlmClient();

// 示例:实时流式传输聊天完成响应
async function streamChatExample() {
  try {
    // 流式传输响应,逐个令牌接收
    const stream = await llmClient.streamChat({
      messages: [
        { role: 'system', content: 'You are a helpful assistant.' },
        { role: 'user', content: 'Explain quantum computing in simple terms.' },
      ],
      model: 'gpt-3.5-turbo', // 如需要可覆盖默认模型
      stream: true, // 启用流式传输
    });

    // 处理流式响应
    let fullResponse = '';
    for await (const event of stream) {
      // 每个事件包含部分响应
      if (event.data) {
        const chunk = event.data;
        const content = chunk.choices[0]?.delta?.content || '';
        fullResponse += content;
        console.log('新令牌:', content);

        // 在令牌到达时实时更新 UI
        updateUI(content);
      }
    }

    console.log('完整响应:', fullResponse);
  } catch (error) {
    console.error('流式聊天错误:', error);
  }
}

// 辅助函数模拟 UI 更新
function updateUI(content: string) {
  // 在实际应用中,这将更新您的 UI
  process.stdout.write(content);
}

基本用法

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// 在响应中使用 eventStream 方法处理 text/event-stream 内容类型
// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
  console.log('收到事件:', event);
}

// 使用 jsonEventStream 方法处理 JSON 数据
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream<MyDataType>()) {
  console.log('收到 JSON 事件:', event.data);
}

高级用法与终止检测

import { Fetcher } from '@ahoo-wang/fetcher';
import {
  toJsonServerSentEventStream,
  type TerminateDetector,
} from '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.openai.com/v1',
});

// 定义 OpenAI 风格完成的终止检测器
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';

// 获取原始事件流
const response = await fetcher.post('/chat/completions', {
  body: {
    model: 'gpt-3.5-turbo',
    messages: [{ role: 'user', content: '你好!' }],
    stream: true,
  },
});

// 转换为带自动终止的类型化 JSON 流
const jsonStream = toJsonServerSentEventStream<ChatCompletionChunk>(
  response.requiredEventStream(),
  terminateOnDone,
);

// 处理带自动终止的流式响应
for await (const event of jsonStream) {
  const content = event.data.choices[0]?.delta?.content;
  if (content) {
    console.log('令牌:', content);
    // 当收到 '[DONE]' 时流会自动终止
  }
}

手动转换

import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';

// 手动转换 Response 对象
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);

// 从流中读取事件
const reader = eventStream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('收到事件:', value);
  }
} finally {
  reader.releaseLock();
}

📚 API 参考

模块导入

要使用事件流功能,您需要导入模块以执行其副作用:

import '@ahoo-wang/fetcher-eventstream';

此导入会自动扩展全局 Response 接口以处理服务器发送事件流:

  • eventStream() - 将带有 text/event-stream 内容类型的响应转换为 ServerSentEventStream
  • jsonEventStream<DATA>() - 将带有 text/event-stream 内容类型的响应转换为 JsonServerSentEventStream<DATA>
  • isEventStream getter - 检查响应是否具有 text/event-stream 内容类型
  • requiredEventStream() - 获取 ServerSentEventStream,如果不可用则抛出错误
  • requiredJsonEventStream<DATA>() - 获取 JsonServerSentEventStream<DATA>,如果不可用则抛出错误

这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。

在集成测试和实际应用中,此导入对于处理事件流至关重要。例如:

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
// 处理事件流
for await (const event of response.requiredEventStream()) {
  console.log('收到事件:', event);
}

toJsonServerSentEventStream

ServerSentEventStream 转换为 JsonServerSentEventStream<DATA>,用于处理带有 JSON 数据的服务器发送事件。可选支持流终止检测以实现自动流关闭。

签名

function toJsonServerSentEventStream<DATA>(
  serverSentEventStream: ServerSentEventStream,
  terminateDetector?: TerminateDetector,
): JsonServerSentEventStream<DATA>;

参数

  • serverSentEventStream:要转换的 ServerSentEventStream
  • terminateDetector:可选的函数,用于检测何时应该终止流。当提供时,当检测器对某个事件返回 true 时,流将自动关闭。

返回

  • JsonServerSentEventStream<DATA>:带有 JSON 数据的 ServerSentEvent 对象的可读流

示例

// 基本用法,不使用终止检测
const jsonStream = toJsonServerSentEventStream<MyData>(serverSentEventStream);

// 使用终止检测处理 OpenAI 风格的完成
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminatingStream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnDone,
);

// 自定义终止逻辑
const terminateOnError: TerminateDetector = event => {
  return event.event === 'error' || event.data.includes('ERROR');
};
const errorHandlingStream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnError,
);

JsonServerSentEvent

定义带有 JSON 数据的服务器发送事件结构的接口。

interface JsonServerSentEvent<DATA> extends Omit<ServerSentEvent, 'data'> {
  data: DATA; // 作为解析 JSON 的事件数据
}

JsonServerSentEventStream

带有 JSON 数据的 ServerSentEvent 对象的可读流的类型别名。

type JsonServerSentEventStream<DATA> = ReadableStream<
  JsonServerSentEvent<DATA>
>;

TerminateDetector

用于检测服务器发送事件流何时应该终止的函数类型。这通常用于 LLM API 发送特殊终止事件来表示响应流结束的情况。

签名

type TerminateDetector = (event: ServerSentEvent) => boolean;

参数

  • event:当前正在处理的 ServerSentEvent

返回

  • boolean:如果应该终止流则返回 true,否则返回 false

示例

// OpenAI 风格的终止(常见模式)
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';

// 基于事件的终止
const terminateOnComplete: TerminateDetector = event => event.event === 'done';

// 具有多个条件的自定义终止
const terminateOnFinish: TerminateDetector = event => {
  return (
    event.event === 'done' ||
    event.event === 'error' ||
    event.data === '[DONE]' ||
    event.data.includes('TERMINATE')
  );
};

// 与 toJsonServerSentEventStream 一起使用
const stream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnDone,
);

常见用例

  • LLM 流式传输:检测来自 OpenAI、Claude 或其他 LLM API 的完成标记,如 [DONE]
  • 错误处理:在收到错误事件时终止流
  • 自定义协议:实现应用程序特定的终止逻辑
  • 资源管理:在满足特定条件时自动关闭流

toServerSentEventStream

将带有 text/event-stream 主体的 Response 对象转换为 ServerSentEvent 对象的可读流。

签名

function toServerSentEventStream(response: Response): ServerSentEventStream;

参数

  • response:带有 text/event-stream 内容类型的 HTTP 响应

返回

  • ServerSentEventStream:ServerSentEvent 对象的可读流

ServerSentEvent

定义服务器发送事件结构的接口。

interface ServerSentEvent {
  data: string; // 事件数据(必需)
  event?: string; // 事件类型(可选,默认为 'message')
  id?: string; // 事件 ID(可选)
  retry?: number; // 以毫秒为单位的重试超时(可选)
}

ServerSentEventStream

ServerSentEvent 对象的可读流的类型别名。

type ServerSentEventStream = ReadableStream<ServerSentEvent>;

🛠️ 示例

实时通知

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// 监听实时通知
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
  switch (event.event) {
    case 'message':
      showNotification('消息', event.data);
      break;
    case 'alert':
      showAlert('警报', event.data);
      break;
    case 'update':
      handleUpdate(JSON.parse(event.data));
      break;
    default:
      console.log('未知事件:', event);
  }
}

进度更新

import { Fetcher } from '@ahoo-wang/fetcher';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// 跟踪长时间运行的任务进度
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
  if (event.event === 'progress') {
    const progress = JSON.parse(event.data);
    updateProgressBar(progress.percentage);
  } else if (event.event === 'complete') {
    showCompletionMessage(event.data);
    break;
  }
}

聊天应用

import { Fetcher } from '@ahoo-wang/fetcher';

const fetcher = new Fetcher({
  baseURL: 'https://chat-api.example.com',
});

// 实时聊天消息
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
  if (event.event === 'message') {
    const message = JSON.parse(event.data);
    displayMessage(message);
  } else if (event.event === 'user-joined') {
    showUserJoined(event.data);
  } else if (event.event === 'user-left') {
    showUserLeft(event.data);
  }
}

🧪 测试

# 运行测试
pnpm test

# 运行带覆盖率的测试
pnpm test -- --coverage

测试套件包括:

  • 事件流转换测试
  • 边缘情况处理(格式错误的事件、分块数据等)
  • 大事件流的性能测试

📋 服务器发送事件规范兼容性

此包完全实现了 服务器发送事件规范

  • 数据字段:支持多行数据字段
  • 事件字段:自定义事件类型
  • ID 字段:最后事件 ID 跟踪
  • 重试字段:自动重连超时
  • 注释行:忽略以 : 开头的行
  • 事件分发:正确的事件分发,默认事件类型为 'message'

🤝 贡献

欢迎贡献!请查看 贡献指南 了解更多详情。

📄 许可证

本项目采用 Apache-2.0 许可证