npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@tomsons/queue-manager

v0.5.0

Published

A powerful and flexible queue manager for handling asynchronous tasks in TypeScript applications. It provides features like concurrency control, task prioritization, automatic retries, cancellation, and progress tracking.

Readme

Queue Manager

A powerful and flexible queue manager for handling asynchronous tasks in TypeScript applications. It provides features like concurrency control, task prioritization, automatic retries, cancellation, and progress tracking.

Features

  • Concurrency Control: Limit the number of tasks that run simultaneously.
  • Task Prioritization: Higher priority tasks are executed first.
  • Automatic Retries: Automatically retries failed tasks with configurable backoff strategies (e.g., exponential backoff).
  • Task Cancellation: Support for cancelling queued and in-progress tasks.
  • Progress Reporting: Real-time updates on task status and progress.
  • Failed Task Queue: Failed tasks are moved to a separate queue for later inspection or reprocessing.

Basic Usage

Here's a simple example of how to use the QueueManager.

import { QueueManager, Task, TaskStatus } from '@tomsons/queue-manager';

// 1. Create a QueueManager instance
const queueManager = new QueueManager({ concurrency: 2 });

// 2. Listen for progress updates
queueManager.onProgress(progress => {
  console.log(`Task ${progress.taskId} is ${progress.status} - ${progress.progress}%`);
  if (progress.status === TaskStatus.FAILED) {
    console.error(`Task ${progress.taskId} failed with error:`, progress.error);
  }
});

// 3. Define a task
const myTask: Task<string> = {
  id: 'task-1',
  priority: 10, // Higher number means higher priority
  execute: async (reportProgress) => {
    console.log('Executing task-1...');
    // Simulate work and report progress
    await new Promise(resolve => setTimeout(resolve, 500));
    reportProgress({ taskId: 'task-1', progress: 50, status: TaskStatus.RUNNING });
    await new Promise(resolve => setTimeout(resolve, 500));
    return 'Task-1 Finished!';
  }
};

// 4. Enqueue the task
queueManager.enqueue(myTask);

// 5. Wait for all tasks to complete
await queueManager.waitForCompletion();
console.log('All tasks have been processed.');

Advanced Usage: Custom Task Class

For more complex scenarios, you can encapsulate task logic within a class that implements the Task interface. This is useful for creating reusable and configurable task types.

Here's an example of a FileUploadTask that uploads a file and reports progress.

import { QueueManager, Task, TaskProgress, TaskStatus } from '@tomsons/queue-manager';
import fetch from 'node-fetch'; // or use browser's fetch

// Custom class implementing the Task interface
class FileUploadTask implements Task<{ url: string }> {
  id: string;
  priority: number;
  
  private file: Buffer;
  private endpoint: string;
  private abortController: AbortController;

  constructor(file: Buffer, fileName: string, endpoint: string) {
    this.id = `upload-${fileName}`;
    this.priority = 5;
    this.file = file;
    this.endpoint = endpoint;
    this.abortController = new AbortController();
  }

  async execute(reportProgress: (progress: TaskProgress) => void): Promise<{ url: string }> {
    // In a real scenario, you would stream the file and calculate progress.
    // This is a simplified example.
    reportProgress({ taskId: this.id, progress: 0, status: TaskStatus.RUNNING });

    const response = await fetch(this.endpoint, {
      method: 'POST',
      body: this.file,
      signal: this.abortController.signal,
    });

    if (!response.ok) {
      throw new Error(`Upload failed with status: ${response.status}`);
    }
    
    reportProgress({ taskId: this.id, progress: 100, status: TaskStatus.RUNNING });
    return response.json() as Promise<{ url: string }>;
  }

  cancel(): void {
    console.log(`Cancelling upload for ${this.id}`);
    this.abortController.abort();
  }
}

// --- Usage ---
const queue = new QueueManager({ concurrency: 1 });

const fileBuffer = Buffer.from('some file content');
const uploadTask = new FileUploadTask(fileBuffer, 'document.pdf', 'https://api.example.com/upload');

queue.enqueue(uploadTask);

// To cancel the task if it's running
// setTimeout(() => queue.cancelAll(), 50); 

API Reference

QueueManager<T>

| Method | Description | | :--- | :--- | | constructor(options: QueueOptions) | Creates a new queue manager instance. | | getFailedTasks(): Task<T>[] | Returns a copy of the tasks that have failed all retry attempts. | | enqueue(task: Task<T>): void | Adds a new task to the processing queue. | | reprocessFailedTasks(): void | Re-queues all tasks from the failed queue for another attempt. | | onProgress(callback): () => void | Registers a listener for task progress updates. Returns a function to unregister the listener. | | clearQueue(cancelRunning?: boolean): void | Clears all pending (non-running) tasks. If cancelRunning is true, it also cancels tasks that are currently in progress. | | waitForCompletion(): Promise<void> | Returns a promise that resolves when the queue is empty and all running tasks are complete. | | cancelAll(): Promise<void> | Cancels all queued and running tasks. |

Task<T> Interface

This interface defines the structure of a task that can be processed by the QueueManager.

| Property | Type | Description | | :--- | :--- | :--- | | id | string | Required. A unique identifier for the task. | | execute | (progress: (p: TaskProgress) => void) => Promise<T> | Required. The function that performs the task's work. It receives a reportProgress callback and must return a Promise. | | priority | number | Optional. The task's priority. Higher numbers are processed first. Defaults to 0. | | maxRetries | number | Optional. Overrides the default maximum number of retries for this specific task. | | retryPolicy | RetryPolicy | Optional. Overrides the default retry policy for this specific task. | | cancel | () => void | Optional. A function that cancels the task's execution, e.g., by aborting an HTTP request. |

Building

Run nx build queue-manager to build the library.

Running unit tests

Run nx test queue-manager to execute the unit tests via Vitest.