npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@lesliechueng/stream-fetch-manage

v1.0.8

Published

[English](./README.en.md) | **简体中文**

Readme

English | 简体中文

流式请求工具库使用指南

useAgent 提供的流式请求工具库专为大模型应用场景设计,支持 Server-Sent Events (SSE) 流式 POST 通信,具备消息缓存、顺序保证、全链路错误处理等核心能力,帮助开发者快速实现稳定可靠的流式交互体验。

安装

使用任意包管理器安装:

npm install @lesliechueng/stream-fetch-manage --save-dev
yarn add @lesliechueng/stream-fetch-manage --save-dev
pnpm add @lesliechueng/stream-fetch-manage --save-dev

基础用法

import { StreamFetchClient } from "@lesliechueng/stream-fetch-manage";

interface IContent {
  content: string;
  sequenceNumber: number;
  done?: boolean;
}

const streamFetch = new StreamFetchClient<IContent>(
  {
    baseUrl: "/api/chat",
    headers: {
      "Content-Type": "application/json",
    },
    overErrorTimer: 60 * 1000, // 流式中间超时时间,单位为毫秒
  },
  {
    onMessage: (data: IContent) => {
      console.log("收到消息:", data);
    },
    onClose: (lastData: IContent) => {
      console.log("连接关闭", lastData);
    },
    onServerError: (data: IContent, error: Error) => {
      console.error("服务器错误", error);
    },
    onStreamConnectionError: (data: IContent, error: Error) => {
      console.error("流连接错误:", error);
    },
    onConnectionError: (data: IContent, error: Error) => {
      console.error("连接错误:", error);
    },
    onParseError: (data: IContent, error: Error) => {
      console.error("解析错误:", error);
    },
  }
);

// 开始发起请求,下面是具体的参数
streamFetch.sendStreamRequest({
  // 流式中间请求参数
});

// 暂停请求
streamFetch.disconnect();

高级配置

消息顺序保证与缓存,当需要处理可能乱序到达的流式消息时,可配置消息处理器实现顺序保证。

import { StreamFetchClient } from "@lesliechueng/stream-fetch-manage";

interface IContent {
  content: string;
  sequenceNumber: number;
  done?: boolean;
}

const streamFetch = new StreamFetchClient<IContent>(
  {
    baseUrl: "/api/chat",
    headers: {
      "Content-Type": "application/json",
    },
    overErrorTimer: 60 * 1000, // 流式中间超时时间,单位为毫秒
  },
  {
    onMessage: (data: IContent) => {
      console.log("收到消息:", data);
    },
    onClose: (lastData: IContent) => {
      console.log("连接关闭", lastData);
    },
    onServerError: (data: IContent, error: Error) => {
      console.error("服务器错误", error);
    },
    onStreamConnectionError: (data: IContent, error: Error) => {
      console.error("流连接错误:", error);
    },
    onConnectionError: (data: IContent, error: Error) => {
      console.error("连接错误:", error);
    },
    onParseError: (data: IContent, error: Error) => {
      console.error("解析错误:", error);
    },
  },
  {
    maxCacheSize: 6, // 最大缓存大小,单位为条
    cacheTimeout: 5000, // 缓存超时时间,单位为毫秒
    expectedSeq: 0, // 期望的初始化消息索引值
    handleValidateMessageFormat: (data: IContent) => {
      // 校验消息序号的数据类型
      if (typeof data.sequenceNumber !== "number") {
        throw new Error("Message must have a numeric seq field");
      }
    },
    // 使得消息处理器获取消息序号的索引值
    getIndexValue: (data: IContent) => data.sequenceNumber,
  }
);

核心 API

StreamFetchClient 构造函数

new StreamFetchClient(config, eventHandlers, processorConfig?)

| 参数 | 类型 | 说明 | | --------------- | -------------------------- | ------------------------------------ | | config | IStreamFetchClientConfig | 基础配置 | | eventHandlers | ICurrentEventHandlers | 事件处理函数集合 | | processorConfig | IProcessorConfig | 消息处理器配置(可选,用于消息排序) |

IStreamFetchClientConfig

| 属性 | 类型 | 默认值 | 说明 | | -------------- | ------------------------ | ---------------------------------------- | ------------------- | | baseUrl | string | '' | 流式请求基础 URL | | headers | Record<string, string> | { 'Content-Type': 'application/json' } | 请求头 | | overErrorTimer | number | 60000 | 无消息超时时间 (ms) |

ICurrentEventHandlers

| 方法 | 说明 | | ----------------------- | -------------------- | | onMessage | 收到消息时触发 | | onStreamConnectionError | 连接超时时触发 | | onConnectionError | 连接建立失败时触发 | | onServerError | 服务器返回错误时触发 | | onParseError | 消息解析失败时触发 | | onClose | 连接关闭时触发 |

实例方法

sendStreamRequest

发送流式请求

async sendStreamRequest(
  payload: Record<string, any>,
  eventHandlers?: ICurrentEventHandlers<T> | null,
  config?: IStreamFetchClientConfig
)

| 参数 | 类型 | 说明 | | ------------- | -------------------------- | ---------------------- | | payload | Record<string, any> | 请求体数据 | | eventHandlers | ICurrentEventHandlers | 临时事件处理器(可选) | | config | IStreamFetchClientConfig | 临时配置(可选) |

disconnect

断开当前流式连接并清理资源

disconnect();

消息处理机制

当配置了  processorConfig  时,消息处理流程如下:

  1. 收到消息后先进行格式验证(handleValidateMessageFormat
  2. 提取消息序号(getIndexValue
  3. 如果序号与预期一致,直接处理并更新预期序号
  4. 如果序号大于预期,缓存消息
  5. 处理完当前消息后,自动检查缓存中是否有下一条预期消息

这种机制可以确保即使消息乱序到达,最终也能按正确顺序处理。

注意事项

  1. 务必在组件卸载或不需要流式连接时调用  disconnect()  方法,避免内存泄漏
  2. 根据业务需求合理配置  maxCacheSize  和  cacheTimeout,平衡内存占用和消息可靠性
  3. handleValidateMessageFormat  中应严格验证消息格式,避免处理非法数据
  4. 对于需要长期运行的流式连接,建议实现重连机制