@ahoo-wang/fetcher-eventstream
v3.3.9
Published
Server-Sent Events (SSE) support for Fetcher HTTP client with native LLM streaming API support. Enables real-time data streaming and token-by-token LLM response handling.
Maintainers
Readme
@ahoo-wang/fetcher-eventstream
为 Fetcher 提供 text/event-stream 支持,实现服务器发送事件(SSE)功能,用于实时数据流。
🌟 特性
- 📡 事件流转换:将
text/event-stream响应转换为ServerSentEvent对象的异步生成器 - 🔌 自动扩展:模块导入时自动扩展
Response原型,添加事件流方法 - 📋 SSE 解析:根据规范解析服务器发送事件,包括数据、事件、ID 和重试字段
- 🔄 流支持:正确处理分块数据和多行事件
- 💬 注释处理:正确忽略注释行(以
:开头的行) - 🛡️ TypeScript 支持:完整的 TypeScript 类型定义
- ⚡ 性能优化:高效的解析和流处理,适用于高性能应用
- 🤖 LLM 流准备就绪: 原生支持来自流行 LLM API(如 OpenAI GPT、Claude 等)的流式响应
- 🔚 流终止检测:自动流终止检测,实现干净的资源管理和完成处理
🚀 快速开始
安装
# 使用 npm
npm install @ahoo-wang/fetcher-eventstream
# 使用 pnpm
pnpm add @ahoo-wang/fetcher-eventstream
# 使用 yarn
yarn add @ahoo-wang/fetcher-eventstream模块导入
要使用事件流功能,您需要导入模块以执行其副作用:
import '@ahoo-wang/fetcher-eventstream';此导入会自动扩展 Response 接口以处理服务器发送事件流:
eventStream()- 将带有text/event-stream内容类型的响应转换为ServerSentEventStreamjsonEventStream<DATA>()- 将带有text/event-stream内容类型的响应转换为JsonServerSentEventStream<DATA>isEventStreamgetter - 检查响应是否具有text/event-stream内容类型requiredEventStream()- 获取ServerSentEventStream,如果不可用则抛出错误requiredJsonEventStream<DATA>()- 获取JsonServerSentEventStream<DATA>,如果不可用则抛出错误
这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。
集成测试示例:带事件流的 LLM 客户端
以下示例展示了如何创建带事件流支持的 LLM 客户端,类似于 Fetcher 项目中的集成测试。您可以在 integration-test/src/eventstream/llmClient.ts 中找到完整实现。
这个示例演示了如何使用 Fetcher 的流式传输功能与流行的 LLM API(如 OpenAI 的 GPT 模型)进行交互。
import {
BaseURLCapable,
ContentTypeValues,
FetchExchange,
NamedFetcher,
REQUEST_BODY_INTERCEPTOR_ORDER,
RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
api,
autoGeneratedError,
body,
post,
ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';
export const llmFetcherName = 'llm';
export interface LlmOptions extends BaseURLCapable {
apiKey: string;
model?: string;
}
export class LlmRequestInterceptor implements RequestInterceptor {
readonly name: string = 'LlmRequestInterceptor';
readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;
constructor(private llmOptions: LlmOptions) {}
intercept(exchange: FetchExchange): void {
const chatRequest = exchange.request.body as ChatRequest;
if (!chatRequest.model) {
chatRequest.model = this.llmOptions.model;
}
}
}
export function createLlmFetcher(options: LlmOptions): NamedFetcher {
const llmFetcher = new NamedFetcher(llmFetcherName, {
baseURL: options.baseURL,
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': ContentTypeValues.APPLICATION_JSON,
},
});
llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
return llmFetcher;
}
@api('/chat', {
fetcher: llmFetcherName,
resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
@post('/completions')
streamChat(
@body() body: ChatRequest,
): Promise<JsonServerSentEventStream<ChatResponse>> {
throw autoGeneratedError(body);
}
@post('/completions', { resultExtractor: ResultExtractors.Json })
chat(@body() body: ChatRequest): Promise<ChatResponse> {
throw autoGeneratedError(body);
}
}使用 streamChat 进行实时响应
以下是使用 streamChat 方法从 LLM API 获取实时响应的示例:
import { createLlmFetcher, LlmClient } from './llmClient';
// 使用您的 API 配置初始化 LLM 客户端
const llmFetcher = createLlmFetcher({
baseURL: 'https://api.openai.com/v1', // OpenAI 示例
apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
model: 'gpt-3.5-turbo', // 默认模型
});
// 创建客户端实例
const llmClient = new LlmClient();
// 示例:实时流式传输聊天完成响应
async function streamChatExample() {
try {
// 流式传输响应,逐个令牌接收
const stream = await llmClient.streamChat({
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Explain quantum computing in simple terms.' },
],
model: 'gpt-3.5-turbo', // 如需要可覆盖默认模型
stream: true, // 启用流式传输
});
// 处理流式响应
let fullResponse = '';
for await (const event of stream) {
// 每个事件包含部分响应
if (event.data) {
const chunk = event.data;
const content = chunk.choices[0]?.delta?.content || '';
fullResponse += content;
console.log('新令牌:', content);
// 在令牌到达时实时更新 UI
updateUI(content);
}
}
console.log('完整响应:', fullResponse);
} catch (error) {
console.error('流式聊天错误:', error);
}
}
// 辅助函数模拟 UI 更新
function updateUI(content: string) {
// 在实际应用中,这将更新您的 UI
process.stdout.write(content);
}基本用法
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 在响应中使用 eventStream 方法处理 text/event-stream 内容类型
// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
console.log('收到事件:', event);
}
// 使用 jsonEventStream 方法处理 JSON 数据
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream<MyDataType>()) {
console.log('收到 JSON 事件:', event.data);
}高级用法与终止检测
import { Fetcher } from '@ahoo-wang/fetcher';
import {
toJsonServerSentEventStream,
type TerminateDetector,
} from '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.openai.com/v1',
});
// 定义 OpenAI 风格完成的终止检测器
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
// 获取原始事件流
const response = await fetcher.post('/chat/completions', {
body: {
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: '你好!' }],
stream: true,
},
});
// 转换为带自动终止的类型化 JSON 流
const jsonStream = toJsonServerSentEventStream<ChatCompletionChunk>(
response.requiredEventStream(),
terminateOnDone,
);
// 处理带自动终止的流式响应
for await (const event of jsonStream) {
const content = event.data.choices[0]?.delta?.content;
if (content) {
console.log('令牌:', content);
// 当收到 '[DONE]' 时流会自动终止
}
}手动转换
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
// 手动转换 Response 对象
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
// 从流中读取事件
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('收到事件:', value);
}
} finally {
reader.releaseLock();
}📚 API 参考
模块导入
要使用事件流功能,您需要导入模块以执行其副作用:
import '@ahoo-wang/fetcher-eventstream';此导入会自动扩展全局 Response 接口以处理服务器发送事件流:
eventStream()- 将带有text/event-stream内容类型的响应转换为ServerSentEventStreamjsonEventStream<DATA>()- 将带有text/event-stream内容类型的响应转换为JsonServerSentEventStream<DATA>isEventStreamgetter - 检查响应是否具有text/event-stream内容类型requiredEventStream()- 获取ServerSentEventStream,如果不可用则抛出错误requiredJsonEventStream<DATA>()- 获取JsonServerSentEventStream<DATA>,如果不可用则抛出错误
这是 JavaScript/TypeScript 中常见的模式,用于在不修改原始类型定义的情况下扩展现有类型的功能。
在集成测试和实际应用中,此导入对于处理事件流至关重要。例如:
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// Response 对象将自动具有 eventStream() 和 jsonEventStream() 方法
const response = await fetcher.get('/events');
// 处理事件流
for await (const event of response.requiredEventStream()) {
console.log('收到事件:', event);
}toJsonServerSentEventStream
将 ServerSentEventStream 转换为 JsonServerSentEventStream<DATA>,用于处理带有 JSON 数据的服务器发送事件。可选支持流终止检测以实现自动流关闭。
签名
function toJsonServerSentEventStream<DATA>(
serverSentEventStream: ServerSentEventStream,
terminateDetector?: TerminateDetector,
): JsonServerSentEventStream<DATA>;参数
serverSentEventStream:要转换的 ServerSentEventStreamterminateDetector:可选的函数,用于检测何时应该终止流。当提供时,当检测器对某个事件返回true时,流将自动关闭。
返回
JsonServerSentEventStream<DATA>:带有 JSON 数据的 ServerSentEvent 对象的可读流
示例
// 基本用法,不使用终止检测
const jsonStream = toJsonServerSentEventStream<MyData>(serverSentEventStream);
// 使用终止检测处理 OpenAI 风格的完成
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminatingStream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnDone,
);
// 自定义终止逻辑
const terminateOnError: TerminateDetector = event => {
return event.event === 'error' || event.data.includes('ERROR');
};
const errorHandlingStream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnError,
);JsonServerSentEvent
定义带有 JSON 数据的服务器发送事件结构的接口。
interface JsonServerSentEvent<DATA> extends Omit<ServerSentEvent, 'data'> {
data: DATA; // 作为解析 JSON 的事件数据
}JsonServerSentEventStream
带有 JSON 数据的 ServerSentEvent 对象的可读流的类型别名。
type JsonServerSentEventStream<DATA> = ReadableStream<
JsonServerSentEvent<DATA>
>;TerminateDetector
用于检测服务器发送事件流何时应该终止的函数类型。这通常用于 LLM API 发送特殊终止事件来表示响应流结束的情况。
签名
type TerminateDetector = (event: ServerSentEvent) => boolean;参数
event:当前正在处理的ServerSentEvent
返回
boolean:如果应该终止流则返回true,否则返回false
示例
// OpenAI 风格的终止(常见模式)
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
// 基于事件的终止
const terminateOnComplete: TerminateDetector = event => event.event === 'done';
// 具有多个条件的自定义终止
const terminateOnFinish: TerminateDetector = event => {
return (
event.event === 'done' ||
event.event === 'error' ||
event.data === '[DONE]' ||
event.data.includes('TERMINATE')
);
};
// 与 toJsonServerSentEventStream 一起使用
const stream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnDone,
);常见用例
- LLM 流式传输:检测来自 OpenAI、Claude 或其他 LLM API 的完成标记,如
[DONE] - 错误处理:在收到错误事件时终止流
- 自定义协议:实现应用程序特定的终止逻辑
- 资源管理:在满足特定条件时自动关闭流
toServerSentEventStream
将带有 text/event-stream 主体的 Response 对象转换为 ServerSentEvent 对象的可读流。
签名
function toServerSentEventStream(response: Response): ServerSentEventStream;参数
response:带有text/event-stream内容类型的 HTTP 响应
返回
ServerSentEventStream:ServerSentEvent 对象的可读流
ServerSentEvent
定义服务器发送事件结构的接口。
interface ServerSentEvent {
data: string; // 事件数据(必需)
event?: string; // 事件类型(可选,默认为 'message')
id?: string; // 事件 ID(可选)
retry?: number; // 以毫秒为单位的重试超时(可选)
}ServerSentEventStream
ServerSentEvent 对象的可读流的类型别名。
type ServerSentEventStream = ReadableStream<ServerSentEvent>;🛠️ 示例
实时通知
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 监听实时通知
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
switch (event.event) {
case 'message':
showNotification('消息', event.data);
break;
case 'alert':
showAlert('警报', event.data);
break;
case 'update':
handleUpdate(JSON.parse(event.data));
break;
default:
console.log('未知事件:', event);
}
}进度更新
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
// 跟踪长时间运行的任务进度
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
if (event.event === 'progress') {
const progress = JSON.parse(event.data);
updateProgressBar(progress.percentage);
} else if (event.event === 'complete') {
showCompletionMessage(event.data);
break;
}
}聊天应用
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://chat-api.example.com',
});
// 实时聊天消息
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
if (event.event === 'message') {
const message = JSON.parse(event.data);
displayMessage(message);
} else if (event.event === 'user-joined') {
showUserJoined(event.data);
} else if (event.event === 'user-left') {
showUserLeft(event.data);
}
}🧪 测试
# 运行测试
pnpm test
# 运行带覆盖率的测试
pnpm test -- --coverage测试套件包括:
- 事件流转换测试
- 边缘情况处理(格式错误的事件、分块数据等)
- 大事件流的性能测试
📋 服务器发送事件规范兼容性
此包完全实现了 服务器发送事件规范:
- 数据字段:支持多行数据字段
- 事件字段:自定义事件类型
- ID 字段:最后事件 ID 跟踪
- 重试字段:自动重连超时
- 注释行:忽略以
:开头的行 - 事件分发:正确的事件分发,默认事件类型为 'message'
🤝 贡献
欢迎贡献!请查看 贡献指南 了解更多详情。
📄 许可证
本项目采用 Apache-2.0 许可证。
