npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

supa-flow-fetch

v1.0.1

Published

Enhanced streaming data fetching tool with SSE support

Readme

supa-flow-fetch

一个功能强大的流式数据获取工具,提供了完整的 SSE (Server-Sent Events) 支持,包括事件处理、数据转换、批处理等高级功能。

特点

  • 🚀 完整的 SSE 协议支持
  • 💪 强大的错误处理和重试机制
  • 🔄 智能的自动重连
  • ❤️ 可靠的心跳检测
  • 📦 灵活的数据缓冲区管理
  • 🛠️ 强大的数据转换和处理
  • 🎯 事件过滤和订阅
  • 📊 批量数据处理
  • 🔍 完整的状态管理
  • 🐛 详细的调试支持

安装

npm install supa-flow-fetch

基本用法

import { SupaFlow } from 'supa-flow-fetch';

// 创建实例
const flow = new SupaFlow('https://api.example.com/stream', {
  retryStrategy: {
    initialRetryDelay: 1000,
    maxRetryDelay: 30000,
    maxRetries: 10
  },
  autoReconnect: true,
  heartbeat: {
    enabled: true,
    interval: 30000,
    timeout: 60000
  },
  bufferSize: 1000,
  debug: true
});


// 监听所有消息,默认为message,后端设置event实现订阅特定事件
flow.on('message', (data) => {
    console.log(data);
});


// 连接到流
try {
  await flow.connect();
} catch (error) {
  if (error.code === 'CONNECTION_ERROR') {
    console.error('连接失败:', error.message);
  }
}

// 获取缓冲区数据
const buffer = flow.getBuffer();

// 关闭连接
flow.close();

高级功能

事件订阅

// 订阅特定事件
flow.on('temperature', (data) => {
  console.log('收到温度数据:', data);
});

// 取消订阅
const handler = (data) => console.log(data);
flow.on('humidity', handler);
flow.off('humidity', handler);

事件过滤

const flow = new SupaFlow('https://api.example.com/stream', {
  eventFilter: {
    // 只处理这些事件
    include: ['temperature', 'humidity'],
    // 忽略这些事件
    exclude: ['heartbeat']
  }
});

数据转换

const flow = new SupaFlow('https://api.example.com/stream', {
  transformers: [
    // 转换时间戳
    (data) => ({
      ...data,
      timestamp: new Date(data.timestamp).toISOString()
    }),
    // 转换温度单位
    (data) => ({
      ...data,
      temperature: data.temperature ? (data.temperature * 9/5 + 32) + '°F' : undefined
    })
  ]
});

批处理

const flow = new SupaFlow('https://api.example.com/stream', {
  batchConfig: {
    enabled: true,
    size: 10,         // 每批数据量
    interval: 5000    // 处理间隔(毫秒)
  }
});

// 处理批量数据
flow.on('batch', (batchData) => {
  console.log('收到批量数据:', batchData);
});

// 手动处理剩余的批处理数据
flow.flushBatchBuffer();

状态管理

import { SupaFlow, ConnectionState } from 'supa-flow-fetch';

const flow = new SupaFlow('https://api.example.com/stream', {
  onStateChange: (state) => {
    switch (state) {
      case ConnectionState.CONNECTING:
        console.log('正在连接...');
        break;
      case ConnectionState.CONNECTED:
        console.log('已连接');
        break;
      case ConnectionState.DISCONNECTED:
        console.log('已断开连接');
        break;
      case ConnectionState.RECONNECTING:
        console.log('正在重连...');
        break;
      case ConnectionState.ERROR:
        console.log('发生错误');
        break;
    }
  }
});

// 获取当前状态
const currentState = flow.getState();

API 文档

SupaFlow 类

构造函数

constructor(url: string, options?: SupaFlowOptions)

选项

interface SupaFlowOptions {
  // 重试策略
  retryStrategy?: {
    initialRetryDelay?: number; // 初始重试延迟(毫秒)
    maxRetryDelay?: number;     // 最大重试延迟(毫秒)
    maxRetries?: number;        // 最大重试次数
  };
  
  // 事件过滤器
  eventFilter?: {
    include?: string[];        // 包含的事件
    exclude?: string[];        // 排除的事件
  };
  
  // 数据转换器
  transformers?: Array<(data: any) => any>;
  
  // 状态变化回调
  onStateChange?: (state: ConnectionState) => void;
  
  // 批处理配置
  batchConfig?: {
    enabled: boolean;          // 是否启用批处理
    size?: number;            // 批处理大小
    interval?: number;        // 处理间隔(毫秒)
  };
  
  // 自动重连
  autoReconnect?: boolean;
  
  // 心跳检测
  heartbeat?: {
    enabled: boolean;         // 是否启用心跳
    interval?: number;        // 心跳间隔(毫秒)
    timeout?: number;         // 超时时间(毫秒)
  };
  
  // 缓冲区大小
  bufferSize?: number;
  
  // 调试模式
  debug?: boolean;
  
  // HTTP 请求配置
  fetchOptions?: RequestInit;
}

方法

  • connect(): 连接到流
  • close(): 关闭连接
  • on(event: string, handler: (data: any) => void): 订阅事件
  • off(event: string, handler: (data: any) => void): 取消订阅
  • getState(): 获取当前连接状态
  • getBuffer(): 获取数据缓冲区
  • getBatchBuffer(): 获取批处理缓冲区
  • flushBatchBuffer(): 手动处理批处理缓冲区
  • clearBuffer(): 清空数据缓冲区

连接状态

enum ConnectionState {
  CONNECTING = 'CONNECTING',
  CONNECTED = 'CONNECTED',
  DISCONNECTED = 'DISCONNECTED',
  RECONNECTING = 'RECONNECTING',
  ERROR = 'ERROR'
}

错误类型

class SupaFlowError extends Error {
  code: string;       // 错误代码
  details?: any;      // 错误详情
}

错误代码:

  • CONNECTION_ERROR: 连接错误
  • MAX_RETRIES_EXCEEDED: 超过最大重试次数
  • UNKNOWN_ERROR: 未知错误

最佳实践

  1. 始终处理错误:
try {
  await flow.connect();
} catch (error) {
  if (error.code === 'CONNECTION_ERROR') {
    // 处理连接错误
  } else if (error.code === 'MAX_RETRIES_EXCEEDED') {
    // 处理重试失败
  }
}
  1. 使用数据转换器保持代码整洁:
const flow = new SupaFlow(url, {
  transformers: [
    // 数据清理
    data => ({...data, timestamp: new Date(data.timestamp)}),
    // 数据格式化
    data => ({...data, formattedValue: `${data.value}${data.unit}`})
  ]
});
  1. 合理使用批处理:
// 对于高频数据,使用批处理提高性能
const flow = new SupaFlow(url, {
  batchConfig: {
    enabled: true,
    size: 100,        // 根据数据量调整
    interval: 1000    // 根据实时性要求调整
  }
});
  1. 监控连接状态:
flow.on('state', (state) => {
  if (state === ConnectionState.ERROR) {
    // 记录错误日志
  } else if (state === ConnectionState.DISCONNECTED) {
    // 通知用户连接断开
  }
});

HTTP 请求配置

SupaFlow 支持通过 fetchOptions 配置项来自定义 HTTP 请求的所有参数。这些参数遵循标准的 RequestInit 接口:

const flow = new SupaFlow('https://api.example.com/stream', {
  fetchOptions: {
    // HTTP 方法
    method: 'POST',
    
    // 请求头
    headers: {
      'Authorization': 'Bearer your-token',
      'Content-Type': 'application/json'
    },
    
    // 请求体
    body: JSON.stringify({ channel: 'sensors' }),
    
    // 凭证处理
    credentials: 'include',  // 'omit' | 'same-origin' | 'include'
    
    // 缓存策略
    cache: 'no-cache',      // 'default' | 'no-store' | 'reload' | 'no-cache' | 'force-cache' | 'only-if-cached'
    
    // 请求模式
    mode: 'cors',           // 'cors' | 'no-cors' | 'same-origin'
    
    // 重定向处理
    redirect: 'follow',     // 'follow' | 'error' | 'manual'
    
    // 引用策略
    referrerPolicy: 'no-referrer'
  }
});

常见使用场景

  1. 发送认证信息:
const flow = new SupaFlow(url, {
  fetchOptions: {
    headers: {
      'Authorization': 'Bearer your-token',
      'X-API-Key': 'your-api-key'
    }
  }
});
  1. 发送表单数据:
const formData = new FormData();
formData.append('channel', 'sensors');

const flow = new SupaFlow(url, {
  fetchOptions: {
    method: 'POST',
    body: formData
  }
});
  1. 设置请求超时:
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 5000);

const flow = new SupaFlow(url, {
  fetchOptions: {
    signal: controller.signal
  }
});
  1. 处理 CORS:
const flow = new SupaFlow(url, {
  fetchOptions: {
    mode: 'cors',
    credentials: 'include',
    headers: {
      'Access-Control-Allow-Origin': '*'
    }
  }
});
  1. 自定义缓存策略:
const flow = new SupaFlow(url, {
  fetchOptions: {
    cache: 'no-cache',
    headers: {
      'Cache-Control': 'no-cache',
      'Pragma': 'no-cache'
    }
  }
});

RequestInit 完整配置项

| 配置项 | 类型 | 说明 | |--------|------|------| | method | string | HTTP 请求方法 (GET, POST, etc.) | | headers | Headers | Record<string, string> | 请求头 | | body | BodyInit | 请求体 | | mode | RequestMode | 请求模式 | | credentials | RequestCredentials | 凭证策略 | | cache | RequestCache | 缓存策略 | | redirect | RequestRedirect | 重定向策略 | | referrer | string | 请求来源 | | referrerPolicy | ReferrerPolicy | 引用策略 | | integrity | string | 完整性校验 | | keepalive | boolean | 保持连接 | | signal | AbortSignal | 中止信号 | | window | null | 请求上下文 |

示例

完整的示例代码可以在 examples 目录中找到:

许可证

MIT