npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

prod-cons-pqueue

v1.0.0

Published

A producer-consumer pattern implementation using PQueue for efficient data processing

Readme

ProdConsPQueue

基于 PQueue 的高效生产者-消费者模式实现,适用于 Node.js 和现代 JavaScript/TypeScript 环境。

🌟 核心特性

  • 🚀 高性能: 基于 PQueue 实现,支持并发控制和任务调度
  • 🎯 精确控制: 可配置的槽位数量和并发度
  • 🔧 事件驱动: 支持实时状态监控和回调
  • 🛡️ 错误处理: 完善的错误处理和恢复机制
  • 📊 统计信息: 提供详细的运行状态统计
  • 🎨 TypeScript 支持: 完整的类型定义和开发体验
  • ⏱️ 等待机制: 支持等待特定事件和缓冲区清空
  • 🎮 交互式演示: 提供可视化演示工具

📦 安装

npm install prod-cons-pqueue

🚀 快速开始

基础使用

import ProdConsPQueue from 'prod-cons-pqueue';

// 创建生产者-消费者实例
const prodCons = new ProdConsPQueue({
  slotAmount: 10,  // 缓冲区大小
  concurrency: 2   // 消费并发度
});

// 监听状态变化
prodCons.on('free-slot-amount-change', (amount) => {
  console.log(`可用槽位数: ${amount}`);
});

prodCons.on('blocked-state-change', (isBlocked) => {
  console.log(`阻塞状态: ${isBlocked ? '阻塞' : '正常'}`);
});

// 设置消费者
prodCons.consume(async (data) => {
  console.log('消费数据:', data);
  await new Promise(resolve => setTimeout(resolve, 100));
  // 处理数据...
});

// 生产数据
for (let i = 0; i < 5; i++) {
  prodCons.produce(async () => {
    console.log('生产数据:', i);
    await new Promise(resolve => setTimeout(resolve, 50));
    return `item-${i}`;
  });
}

高级用法

import ProdConsPQueue from 'prod-cons-pqueue';

const prodCons = new ProdConsPQueue({ 
  slotAmount: 5,
  concurrency: 3 
});

// 动态调整槽位数量
prodCons.setSlotAmount(20);

// 获取当前状态
console.log(prodCons.getStats());
// {
//   bufferLength: 5,
//   freeSlotAmount: 15,
//   isBlocked: false,
//   pendingJobs: 2,
//   runningJobs: 1,
//   isPaused: false,
//   concurrency: 3,
//   slotAmount: 20
// }

// 等待特定事件
await prodCons.waitForEvent('blocked-state-change', (value) => value === true);

// 等待缓冲区清空
await prodCons.waitForEmpty();

// 等待消费完成
await prodCons.waitForConsumption();

// 控制功能
await prodCons.pause();  // 暂停消费
await prodCons.start();  // 继续消费
await prodCons.clear();  // 清空缓冲区
await prodCons.destroy(); // 销毁实例

📚 API 文档

构造函数

constructor(options: ProdConsOptions = {})

参数:

  • options.slotAmount (number): 缓冲区槽位数量,默认为 10
  • options.concurrency (number): 消费并发度,默认为 1

核心方法

produce(fn: () => Promise<any>): Promise<void>

生产数据并放入缓冲区。

参数:

  • fn: 返回 Promise 的异步函数,用于产生数据

consume(fn: (data: any) => Promise<void>): void

设置消费者函数。

参数:

  • fn: 处理数据的异步函数

setSlotAmount(n: number): void

设置缓冲区槽位数量。

getStats(): Stats

获取当前状态统计信息。

返回值:

interface Stats {
  bufferLength: number;      // 缓冲区长度
  freeSlotAmount: number;    // 可用槽位数
  isBlocked: boolean;       // 是否阻塞
  pendingJobs: number;      // 待处理任务数
  runningJobs: number;      // 正在运行的任务数
  isPaused: boolean;        // 是否暂停
  concurrency: number;      // 并发数
  slotAmount: number;       // 槽位总数
}

等待方法

waitForEmpty(): Promise<void>

等待缓冲区完全清空。

waitForConsumption(): Promise<void>

等待当前所有消费操作完成。

waitForEvent(eventName: string, condition?: (value: any) => boolean): Promise<any>

等待特定事件触发,可选择性地设置条件过滤。

控制方法

pause(): Promise<void>

暂停消费操作。

start(): Promise<void>

继续消费操作。

clear(): Promise<void>

清空缓冲区。

destroy(): Promise<void>

销毁实例,释放资源。

事件监听

事件类型

  • 'free-slot-amount-change': 空闲槽位数量变化
  • 'blocked-state-change': 阻塞状态变化
  • 'destroy': 实例销毁

on(event: string, callback: (value: any) => void): void

添加事件监听器。

off(event: string, callback: (value: any) => void): void

移除事件监听器。

🎮 交互式演示

项目包含一个基于 Vue.js 的交互式演示,可视化展示生产者-消费者队列的工作原理:

# 启动演示
npm run preview

演示功能:

  • 实时可视化任务执行状态
  • 动态调整并发数和缓冲区大小
  • 观察队列状态变化
  • 批量生产和消费操作

🎯 使用场景

1. 异步任务处理

const taskQueue = new ProdConsPQueue({ 
  slotAmount: 20, 
  concurrency: 5 
});

// 处理文件上传任务
taskQueue.consume(async (file) => {
  await uploadFile(file);
});

// 提交上传任务
for (const file of files) {
  taskQueue.produce(async () => file);
}

2. 数据批处理

const dataProcessor = new ProdConsPQueue({ 
  slotAmount: 50,
  concurrency: 10 
});

// 批量处理数据
dataProcessor.consume(async (dataBatch) => {
  const results = await processDataBatch(dataBatch);
  return results;
});

// 模拟数据流生成
async function* dataStream() {
  for (let i = 0; i < 1000; i++) {
    yield { id: i, data: generateRandomData() };
  }
}

// 生产数据
for await (const data of dataStream()) {
  await dataProcessor.produce(async () => data);
}

3. API 限流控制

const rateLimitedQueue = new ProdConsPQueue({ 
  slotAmount: 100,
  concurrency: 1 
});

// 限流API调用
rateLimitedQueue.consume(async (request) => {
  const response = await apiCall(request);
  return response;
});

// 高频请求
for (let i = 0; i < 1000; i++) {
  rateLimitedQueue.produce(async () => ({
    id: i,
    data: heavyComputation()
  }));
}

4. 事件驱动流程控制

const eventProcessor = new ProdConsPQueue({ 
  slotAmount: 10,
  concurrency: 3 
});

// 等待特定条件的事件
eventProcessor.on('blocked-state-change', (isBlocked) => {
  console.log(`队列状态: ${isBlocked ? '阻塞' : '运行中'}`);
});

// 设置消费者
eventProcessor.consume(async (data) => {
  console.log('处理数据:', data);
  await process(data);
});

// 生产数据并等待处理完成
await eventProcessor.produce(async () => generateData());
await eventProcessor.waitForEmpty(); // 等待所有数据处理完成

🔧 开发和测试

本地开发

# 安装依赖
npm install

# 开发模式
npm run dev

# 运行测试
npm run test

# 运行测试覆盖率
npm run test:coverage

# 运行测试UI界面
npm run test:ui

# 代码检查
npm run lint

# 构建项目
npm run build

# 启动演示
npm run preview

发布到 npm

# 构建项目
npm run build

# 运行测试
npm run test

# 发布
npm publish

⚠️ 注意事项

  1. 内存管理: 建议在不需要时调用 destroy() 方法来释放资源
  2. 错误处理: 消费函数中的错误会被捕获并记录,不会影响队列的正常运行
  3. 并发控制: 合理设置 concurrency 参数以平衡性能和资源使用
  4. 生产速度: 当生产速度远大于消费速度时,缓冲区会被填满并进入阻塞状态
  5. 资源清理: 在应用退出前,确保调用 destroy() 方法
  6. 事件等待: 使用 waitForEvent 可以实现更精确的流程控制

📄 许可证

MIT License

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📝 更新日志

1.0.0

  • 初始发布
  • 支持基本的生产者-消费者模式
  • 事件驱动架构
  • 完整的 TypeScript 支持
  • 全面的测试覆盖
  • 新增等待机制:waitForEmptywaitForConsumptionwaitForEvent
  • 新增控制功能:pausestartclear
  • 新增统计信息功能
  • 完善的错误处理和资源管理
  • 新增交互式演示界面

生产者-消费者队列,让异步任务处理更简单! 🚀