npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@bernierllc/queue-manager

v1.2.0

Published

Robust message queue management with multiple backends, job processing, and failure handling

Downloads

127

Readme

@bernierllc/queue-manager

Robust message queue management with multiple backends, job processing, priority handling, and automatic retry logic.

Installation

npm install @bernierllc/queue-manager

For Redis backend support, install the peer dependency:

npm install @bernierllc/queue-manager ioredis

Usage

Basic Queue Setup

import { QueueManager, JobPriority } from '@bernierllc/queue-manager';

// Create a queue with memory backend
const queue = new QueueManager('email-queue', {
  backend: 'memory',
  concurrency: 5,
  retryPolicy: {
    maxAttempts: 3,
    backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
  }
});

// Register a job processor
queue.process('send-email', async (job) => {
  const { to, subject, body } = job.data;
  await sendEmail(to, subject, body);
  return { sent: true };
});

// Add a job to the queue
await queue.add('send-email', {
  to: '[email protected]',
  subject: 'Welcome',
  body: 'Welcome to our service!'
}, {
  priority: JobPriority.HIGH
});

Priority Handling

import { QueueManager, JobPriority } from '@bernierllc/queue-manager';

const queue = new QueueManager('tasks', { backend: 'memory' });

// Add jobs with different priorities
await queue.add('task', { type: 'urgent' }, { priority: JobPriority.CRITICAL });
await queue.add('task', { type: 'important' }, { priority: JobPriority.HIGH });
await queue.add('task', { type: 'routine' }, { priority: JobPriority.NORMAL });
await queue.add('task', { type: 'background' }, { priority: JobPriority.LOW });

// Jobs are processed in priority order (CRITICAL > HIGH > NORMAL > LOW)

Delayed Jobs

// Schedule a job to run after 1 hour
await queue.add('reminder', {
  message: 'Time to review'
}, {
  delay: 3600000 // 1 hour in milliseconds
});

// Schedule with custom retry policy
await queue.add('api-call', {
  endpoint: '/users',
  method: 'POST'
}, {
  delay: 5000,
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 2000,
    maxDelay: 60000
  }
});

Bulk Job Addition

// Add multiple jobs efficiently
const jobs = await queue.addBulk([
  { type: 'process-user', data: { userId: 1 }, options: { priority: JobPriority.HIGH } },
  { type: 'process-user', data: { userId: 2 } },
  { type: 'process-user', data: { userId: 3 } }
]);

console.log(`Added ${jobs.length} jobs`);

Event Handling

// Listen to queue events
queue.on('job:waiting', (job) => {
  console.log(`Job ${job.id} added to queue`);
});

queue.on('job:active', (job) => {
  console.log(`Processing job ${job.id}`);
});

queue.on('job:completed', (job) => {
  console.log(`Job ${job.id} completed`);
});

queue.on('job:failed', (job) => {
  console.error(`Job ${job.id} failed: ${job.error}`);
});

queue.on('job:progress', (job) => {
  console.log(`Job ${job.id} progress: ${job.progress}%`);
});

Queue Statistics

// Get queue statistics
const stats = await queue.getStats();
console.log('Queue Stats:', {
  waiting: stats.waiting,
  active: stats.active,
  completed: stats.completed,
  failed: stats.failed,
  delayed: stats.delayed,
  totalProcessed: stats.totalProcessed,
  throughput: stats.throughput
});

Job Management

// Get a specific job
const job = await queue.getJob('job-id-123');

// Remove a job
await queue.removeJob('job-id-123');

// Get jobs by status
const waitingJobs = await queue.getJobs(JobStatus.WAITING);
const failedJobs = await queue.getJobs(JobStatus.FAILED);

// Clean up old completed jobs (older than 24 hours)
const removed = await queue.clean(86400000, JobStatus.COMPLETED);
console.log(`Removed ${removed} old jobs`);

Concurrency Control

// Process up to 10 jobs concurrently
const queue = new QueueManager('high-throughput', {
  backend: 'memory',
  concurrency: 10
});

queue.process('task', async (job) => {
  // Job processing logic
  await processTask(job.data);
});

Manual Queue Control

// Create queue with manual start
const queue = new QueueManager('manual-queue', {
  backend: 'memory',
  autoStart: false // Don't auto-start processing
});

// Add jobs
await queue.add('task', { data: 'test' });

// Start processing when ready
await queue.start();

// Pause processing
await queue.pause();

// Resume processing
await queue.resume();

// Stop processing and close connections
await queue.close();

Error Handling with Dead Letter Queue

const queue = new QueueManager('critical-tasks', {
  backend: 'memory',
  retryPolicy: {
    maxAttempts: 3,
    backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
  },
  deadLetterQueue: 'failed-tasks'
});

// Jobs that fail after all retries are moved to the dead letter queue
queue.process('task', async (job) => {
  if (job.data.shouldFail) {
    throw new Error('Task failed');
  }
  return { success: true };
});

// Access dead letter queue for manual inspection
const dlq = new QueueManager('failed-tasks', { backend: 'memory' });
const failedJobs = await dlq.getJobs(JobStatus.FAILED);

API Reference

QueueManager

Constructor

constructor(name: string, options: QueueOptions)

Creates a new queue manager instance.

Parameters:

  • name - Unique queue name
  • options - Queue configuration options

Methods

add(type: string, data: T, options?: JobOptions): Promise<Job<T>>

Add a single job to the queue.

Parameters:

  • type - Job type identifier for processor routing
  • data - Job payload data
  • options - Optional job configuration (priority, delay, attempts, etc.)

Returns: Promise resolving to the created job

addBulk(jobs: Array<{type: string, data: T, options?: JobOptions}>): Promise<Job<T>[]>

Add multiple jobs to the queue efficiently.

Parameters:

  • jobs - Array of job specifications

Returns: Promise resolving to array of created jobs

process(type: string, processor: JobProcessor<T>): void

Register a processor for a specific job type.

Parameters:

  • type - Job type to process
  • processor - Async function that processes the job
start(): Promise<void>

Start processing jobs from the queue.

pause(): Promise<void>

Pause job processing (current jobs complete, new jobs wait).

resume(): Promise<void>

Resume job processing after pause.

close(): Promise<void>

Stop processing and close backend connections.

getJob(id: string): Promise<Job | null>

Retrieve a specific job by ID.

Parameters:

  • id - Job ID

Returns: Promise resolving to job or null if not found

getJobs(status: JobStatus): Promise<Job[]>

Get all jobs with a specific status.

Parameters:

  • status - Job status filter (WAITING, ACTIVE, COMPLETED, FAILED, DELAYED)

Returns: Promise resolving to array of jobs

removeJob(id: string): Promise<void>

Remove a job from the queue.

Parameters:

  • id - Job ID to remove
getStats(): Promise<QueueStats>

Get queue statistics.

Returns: Promise resolving to statistics object

clean(maxAge: number, status: JobStatus): Promise<number>

Remove old jobs with a specific status.

Parameters:

  • maxAge - Maximum age in milliseconds
  • status - Job status to clean

Returns: Promise resolving to number of jobs removed

Types

QueueOptions

interface QueueOptions {
  backend: 'memory' | 'redis';
  concurrency?: number;           // Default: 1
  defaultDelay?: number;          // Default: 0
  retryPolicy?: RetryPolicy;      // Default: 3 attempts, exponential backoff
  deadLetterQueue?: string;       // Default: '{name}-dlq'
  jobTimeout?: number;            // Default: 30000ms
  cleanupInterval?: number;       // Default: 3600000ms (1 hour)
  autoStart?: boolean;            // Default: true
}

JobOptions

interface JobOptions {
  priority?: JobPriority;
  delay?: number;
  attempts?: number;
  timeout?: number;
  backoff?: BackoffStrategy;
  metadata?: Record<string, any>;
}

JobPriority

enum JobPriority {
  LOW = 1,
  NORMAL = 5,
  HIGH = 10,
  CRITICAL = 20
}

JobStatus

enum JobStatus {
  WAITING = 'waiting',
  ACTIVE = 'active',
  COMPLETED = 'completed',
  FAILED = 'failed',
  DELAYED = 'delayed'
}

Job

interface Job<T = any> {
  id: string;
  type: string;
  data: T;
  priority: JobPriority;
  attempts: number;
  maxAttempts: number;
  delay?: number;
  createdAt: Date;
  processedAt?: Date;
  completedAt?: Date;
  failedAt?: Date;
  error?: string;
  progress?: number;
  metadata?: Record<string, any>;
}

QueueStats

interface QueueStats {
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  totalProcessed: number;
  throughput: number;
}

Integration Status

Logger Integration

Status: Not applicable

This is a pure core utility package focused on queue management. Logger integration is not required as the package:

  • Uses standard EventEmitter for job lifecycle events
  • Provides comprehensive event system for external logging integration
  • Allows consumers to add logging via event handlers
  • Keeps the package lightweight and dependency-free

Applications using this package can integrate logging by listening to queue events:

import { createLogger } from '@bernierllc/logger';

const logger = createLogger({ service: 'queue' });

queue.on('job:completed', (job) => {
  logger.info('Job completed', { jobId: job.id, type: job.type });
});

queue.on('job:failed', (job) => {
  logger.error('Job failed', { jobId: job.id, error: job.error });
});

Docs-Suite Integration

Status: Ready

Documentation format: Markdown with TypeScript examples

  • Complete API reference with type definitions
  • Comprehensive usage examples
  • Integration patterns documented

NeverHub Integration

Status: Not applicable

This is a foundational core utility package that:

  • Provides queue management primitives
  • Has no runtime service dependencies
  • Operates independently without service discovery needs
  • Designed for embedding in higher-level service packages

NeverHub integration should be implemented in service-layer packages that orchestrate multiple core utilities, not in the atomic core packages themselves. This maintains clean separation of concerns per MECE architecture.

Configuration

This is a pure utility core package with no runtime environment configuration. All configuration is provided programmatically through the constructor options.

Programmatic Configuration

const queue = new QueueManager('my-queue', {
  backend: 'memory',              // or 'redis'
  concurrency: 5,                 // Process 5 jobs concurrently
  defaultDelay: 0,                // Default job delay in ms
  retryPolicy: {
    maxAttempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000,
      maxDelay: 30000
    }
  },
  deadLetterQueue: 'my-queue-dlq',
  jobTimeout: 30000,              // Job timeout in ms
  cleanupInterval: 3600000,       // Cleanup interval in ms
  autoStart: true                 // Auto-start processing
});

Features

  • Multiple Backends: Memory (development) and Redis (production) support
  • Priority Queues: Process jobs based on priority (CRITICAL > HIGH > NORMAL > LOW)
  • Delayed Jobs: Schedule jobs to run after a specified delay
  • Automatic Retries: Configurable retry logic with exponential backoff using @bernierllc/retry-policy
  • Dead Letter Queue: Failed jobs moved to DLQ after max retry attempts
  • Concurrency Control: Process multiple jobs in parallel with configurable limits
  • Job Lifecycle Events: Comprehensive event system for monitoring
  • Statistics & Monitoring: Real-time queue metrics and throughput tracking
  • Job Management: Get, remove, and clean jobs by status
  • Type-Safe: Full TypeScript support with strict typing
  • Timeout Protection: Configurable job timeouts to prevent stuck jobs
  • Bulk Operations: Efficient bulk job addition

Dependencies

  • @bernierllc/retry-policy - Exponential backoff and retry logic

See Also

License

Copyright (c) 2025 Bernier LLC. All rights reserved.

This package is licensed under the MIT License.