npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

smart-task-queue

v1.0.1

Published

智能任务队列系统,支持自动并发控制、任务去重、指数退避重试和实时性能监控

Downloads

10

Readme

SmartTaskQueue

一个高性能、功能丰富的智能任务队列管理库,专为Node.js应用设计,具备自动扩缩容、并发控制、任务去重和失败重试等特性。

特性

  • 智能并发控制:自动根据负载调整并发数量,避免资源过载
  • 任务去重:基于任务ID自动去重,防止重复处理
  • 失败自动重试:内置指数退避算法,优雅处理临时错误
  • 事件驱动:完整的事件系统,方便监控和扩展
  • 性能监控:内置吞吐量计算和队列状态监控
  • 队列溢出保护:防止内存溢出,自动拒绝过多任务
  • 可扩展性:简单的继承模型,易于定制业务逻辑

安装

npm install smart-task-queue
# 或使用 yarn
yarn add smart-task-queue
# 或使用 pnpm
pnpm add smart-task-queue

基本用法

创建自定义队列

import SmartTaskQueue from 'smart-task-queue';

// 创建自定义队列类
class MyQueue extends SmartTaskQueue {
  // 必须实现此方法来处理任务
  async processTask(payload) {
    // 实现您的业务逻辑
    console.log(`处理任务: ${payload.id}`);
    return { processed: true, data: payload };
  }
}

// 实例化队列
const queue = new MyQueue({
  maxConcurrency: 100,  // 最大并发数
  minConcurrency: 10,   // 最小并发数
  taskExpireTime: 60000 // 任务去重过期时间(毫秒)
});

// 监听事件
queue.on('success', ({ taskId, result }) => {
  console.log(`任务 ${taskId} 成功完成:`, result);
});

queue.on('failure', ({ taskId, error }) => {
  console.error(`任务 ${taskId} 失败:`, error);
});

添加任务

// 添加简单任务
async function runTask() {
  try {
    const result = await queue.add(
      'unique-task-id',      // 任务唯一ID
      () => fetchData(),     // 任务函数
      3                      // 重试次数(可选,默认3次)
    );
    console.log('任务结果:', result);
  } catch (error) {
    console.error('任务最终失败:', error);
  }
}

// 批量添加任务
async function processBatch(items) {
  const promises = items.map((item, index) => {
    return queue.add(
      `task-${item.id}`,
      () => processItem(item),
      2
    );
  });
  
  // 等待所有任务完成
  const results = await Promise.allSettled(promises);
  console.log(`完成: ${results.filter(r => r.status === 'fulfilled').length}`);
  console.log(`失败: ${results.filter(r => r.status === 'rejected').length}`);
}

使用事件系统

// 监听队列状态变化
queue.on('monitor', (status) => {
  console.log('队列状态:', status);
  // 例如: 记录指标到监控系统
});

// 监听并发变化
queue.on('concurrencyChange', (newValue) => {
  console.log(`并发数已调整为: ${newValue}`);
});

// 监听任务重复
queue.on('duplicate', (taskId) => {
  console.log(`任务 ${taskId} 被去重机制拦截`);
});

// 监听队列溢出
queue.on('reject', ({ taskId, error }) => {
  console.error(`任务 ${taskId} 被拒绝: ${error.message}`);
});

API 文档

构造函数选项

const queue = new MyQueue({
  maxConcurrency: 100,    // 最大并发数
  minConcurrency: 10,     // 最小并发数
  maxQueue: 10000,        // 最大队列长度
  monitorInterval: 5000,  // 监控间隔(毫秒)
  taskExpireTime: 60000,  // 任务去重过期时间(毫秒)
  autoStart: true         // 是否自动启动队列
});

方法

| 方法 | 描述 | |------|------| | add(taskId, taskFn, retries = 3) | 添加任务到队列 | | start() | 启动队列监控 | | stop() | 停止队列监控 | | setConcurrency(newConcurrency) | 动态调整并发数 | | getStatus() | 获取当前队列状态 | | processTask(payload) | 需子类实现的任务处理方法 |

事件

| 事件 | 参数 | 描述 | |------|------|------| | success | { taskId, result } | 任务成功完成 | | failure | { taskId, error } | 任务失败 | | duplicate | taskId | 检测到重复任务 | | reject | { taskId, error } | 任务被拒绝(队列溢出) | | monitor | statusObject | 队列状态更新 | | concurrencyChange | newValue | 并发数变化 | | taskSuccess | { id, result } | 子类任务处理成功 | | taskError | { id, error } | 子类任务处理失败 |

高级用法

自定义任务处理逻辑

class DatabaseQueue extends SmartTaskQueue {
  constructor(options = {}) {
    super(options);
    this.db = options.database;
  }
  
  async processTask(payload) {
    const { operation, data } = payload;
    
    switch (operation) {
      case 'insert':
        return await this.db.insert(data);
      case 'update':
        return await this.db.update(data.id, data);
      case 'delete':
        return await this.db.delete(data.id);
      default:
        throw new Error(`未知操作: ${operation}`);
    }
  }
}

// 使用
const dbQueue = new DatabaseQueue({
  database: myDatabaseConnection,
  maxConcurrency: 50
});

// 发送任务
dbQueue.emit('task', {
  id: 'insert-user-123',
  operation: 'insert',
  data: { name: '张三', email: '[email protected]' }
});

动态调整并发数

// 根据系统负载调整并发数
function adjustConcurrency() {
  const cpuUsage = getCPUUsage(); // 假设的函数
  
  if (cpuUsage > 80) {
    // CPU负载高,降低并发
    queue.setConcurrency(20);
  } else if (cpuUsage < 30) {
    // CPU负载低,提高并发
    queue.setConcurrency(150);
  }
}

// 每10秒检查一次
setInterval(adjustConcurrency, 10000);

性能优化建议

  1. 合理设置并发数:根据任务类型和系统资源设置合适的maxConcurrencyminConcurrency
  2. 任务函数优化:确保任务函数内部高效,避免不必要的阻塞操作
  3. 适当的去重时间:根据业务需求设置合适的taskExpireTime,过长会占用更多内存
  4. 监控队列状态:定期检查getStatus()返回的指标,特别是吞吐量和失败率
  5. 合理使用重试:对于可重试的临时错误设置重试,对于永久性错误避免无意义重试

许可证

MIT