prod-cons-pqueue
v1.0.0
Published
A producer-consumer pattern implementation using PQueue for efficient data processing
Maintainers
Readme
ProdConsPQueue
基于 PQueue 的高效生产者-消费者模式实现,适用于 Node.js 和现代 JavaScript/TypeScript 环境。
🌟 核心特性
- 🚀 高性能: 基于 PQueue 实现,支持并发控制和任务调度
- 🎯 精确控制: 可配置的槽位数量和并发度
- 🔧 事件驱动: 支持实时状态监控和回调
- 🛡️ 错误处理: 完善的错误处理和恢复机制
- 📊 统计信息: 提供详细的运行状态统计
- 🎨 TypeScript 支持: 完整的类型定义和开发体验
- ⏱️ 等待机制: 支持等待特定事件和缓冲区清空
- 🎮 交互式演示: 提供可视化演示工具
📦 安装
npm install prod-cons-pqueue🚀 快速开始
基础使用
import ProdConsPQueue from 'prod-cons-pqueue';
// 创建生产者-消费者实例
const prodCons = new ProdConsPQueue({
slotAmount: 10, // 缓冲区大小
concurrency: 2 // 消费并发度
});
// 监听状态变化
prodCons.on('free-slot-amount-change', (amount) => {
console.log(`可用槽位数: ${amount}`);
});
prodCons.on('blocked-state-change', (isBlocked) => {
console.log(`阻塞状态: ${isBlocked ? '阻塞' : '正常'}`);
});
// 设置消费者
prodCons.consume(async (data) => {
console.log('消费数据:', data);
await new Promise(resolve => setTimeout(resolve, 100));
// 处理数据...
});
// 生产数据
for (let i = 0; i < 5; i++) {
prodCons.produce(async () => {
console.log('生产数据:', i);
await new Promise(resolve => setTimeout(resolve, 50));
return `item-${i}`;
});
}高级用法
import ProdConsPQueue from 'prod-cons-pqueue';
const prodCons = new ProdConsPQueue({
slotAmount: 5,
concurrency: 3
});
// 动态调整槽位数量
prodCons.setSlotAmount(20);
// 获取当前状态
console.log(prodCons.getStats());
// {
// bufferLength: 5,
// freeSlotAmount: 15,
// isBlocked: false,
// pendingJobs: 2,
// runningJobs: 1,
// isPaused: false,
// concurrency: 3,
// slotAmount: 20
// }
// 等待特定事件
await prodCons.waitForEvent('blocked-state-change', (value) => value === true);
// 等待缓冲区清空
await prodCons.waitForEmpty();
// 等待消费完成
await prodCons.waitForConsumption();
// 控制功能
await prodCons.pause(); // 暂停消费
await prodCons.start(); // 继续消费
await prodCons.clear(); // 清空缓冲区
await prodCons.destroy(); // 销毁实例📚 API 文档
构造函数
constructor(options: ProdConsOptions = {})参数:
options.slotAmount(number): 缓冲区槽位数量,默认为 10options.concurrency(number): 消费并发度,默认为 1
核心方法
produce(fn: () => Promise<any>): Promise<void>
生产数据并放入缓冲区。
参数:
fn: 返回 Promise 的异步函数,用于产生数据
consume(fn: (data: any) => Promise<void>): void
设置消费者函数。
参数:
fn: 处理数据的异步函数
setSlotAmount(n: number): void
设置缓冲区槽位数量。
getStats(): Stats
获取当前状态统计信息。
返回值:
interface Stats {
bufferLength: number; // 缓冲区长度
freeSlotAmount: number; // 可用槽位数
isBlocked: boolean; // 是否阻塞
pendingJobs: number; // 待处理任务数
runningJobs: number; // 正在运行的任务数
isPaused: boolean; // 是否暂停
concurrency: number; // 并发数
slotAmount: number; // 槽位总数
}等待方法
waitForEmpty(): Promise<void>
等待缓冲区完全清空。
waitForConsumption(): Promise<void>
等待当前所有消费操作完成。
waitForEvent(eventName: string, condition?: (value: any) => boolean): Promise<any>
等待特定事件触发,可选择性地设置条件过滤。
控制方法
pause(): Promise<void>
暂停消费操作。
start(): Promise<void>
继续消费操作。
clear(): Promise<void>
清空缓冲区。
destroy(): Promise<void>
销毁实例,释放资源。
事件监听
事件类型
'free-slot-amount-change': 空闲槽位数量变化'blocked-state-change': 阻塞状态变化'destroy': 实例销毁
on(event: string, callback: (value: any) => void): void
添加事件监听器。
off(event: string, callback: (value: any) => void): void
移除事件监听器。
🎮 交互式演示
项目包含一个基于 Vue.js 的交互式演示,可视化展示生产者-消费者队列的工作原理:
# 启动演示
npm run preview演示功能:
- 实时可视化任务执行状态
- 动态调整并发数和缓冲区大小
- 观察队列状态变化
- 批量生产和消费操作
🎯 使用场景
1. 异步任务处理
const taskQueue = new ProdConsPQueue({
slotAmount: 20,
concurrency: 5
});
// 处理文件上传任务
taskQueue.consume(async (file) => {
await uploadFile(file);
});
// 提交上传任务
for (const file of files) {
taskQueue.produce(async () => file);
}2. 数据批处理
const dataProcessor = new ProdConsPQueue({
slotAmount: 50,
concurrency: 10
});
// 批量处理数据
dataProcessor.consume(async (dataBatch) => {
const results = await processDataBatch(dataBatch);
return results;
});
// 模拟数据流生成
async function* dataStream() {
for (let i = 0; i < 1000; i++) {
yield { id: i, data: generateRandomData() };
}
}
// 生产数据
for await (const data of dataStream()) {
await dataProcessor.produce(async () => data);
}3. API 限流控制
const rateLimitedQueue = new ProdConsPQueue({
slotAmount: 100,
concurrency: 1
});
// 限流API调用
rateLimitedQueue.consume(async (request) => {
const response = await apiCall(request);
return response;
});
// 高频请求
for (let i = 0; i < 1000; i++) {
rateLimitedQueue.produce(async () => ({
id: i,
data: heavyComputation()
}));
}4. 事件驱动流程控制
const eventProcessor = new ProdConsPQueue({
slotAmount: 10,
concurrency: 3
});
// 等待特定条件的事件
eventProcessor.on('blocked-state-change', (isBlocked) => {
console.log(`队列状态: ${isBlocked ? '阻塞' : '运行中'}`);
});
// 设置消费者
eventProcessor.consume(async (data) => {
console.log('处理数据:', data);
await process(data);
});
// 生产数据并等待处理完成
await eventProcessor.produce(async () => generateData());
await eventProcessor.waitForEmpty(); // 等待所有数据处理完成🔧 开发和测试
本地开发
# 安装依赖
npm install
# 开发模式
npm run dev
# 运行测试
npm run test
# 运行测试覆盖率
npm run test:coverage
# 运行测试UI界面
npm run test:ui
# 代码检查
npm run lint
# 构建项目
npm run build
# 启动演示
npm run preview发布到 npm
# 构建项目
npm run build
# 运行测试
npm run test
# 发布
npm publish⚠️ 注意事项
- 内存管理: 建议在不需要时调用
destroy()方法来释放资源 - 错误处理: 消费函数中的错误会被捕获并记录,不会影响队列的正常运行
- 并发控制: 合理设置
concurrency参数以平衡性能和资源使用 - 生产速度: 当生产速度远大于消费速度时,缓冲区会被填满并进入阻塞状态
- 资源清理: 在应用退出前,确保调用
destroy()方法 - 事件等待: 使用
waitForEvent可以实现更精确的流程控制
📄 许可证
MIT License
🤝 贡献
欢迎提交 Issue 和 Pull Request!
📝 更新日志
1.0.0
- 初始发布
- 支持基本的生产者-消费者模式
- 事件驱动架构
- 完整的 TypeScript 支持
- 全面的测试覆盖
- 新增等待机制:
waitForEmpty、waitForConsumption、waitForEvent - 新增控制功能:
pause、start、clear - 新增统计信息功能
- 完善的错误处理和资源管理
- 新增交互式演示界面
生产者-消费者队列,让异步任务处理更简单! 🚀
