@bernierllc/queue-manager
v1.2.0
Published
Robust message queue management with multiple backends, job processing, and failure handling
Downloads
127
Readme
@bernierllc/queue-manager
Robust message queue management with multiple backends, job processing, priority handling, and automatic retry logic.
Installation
npm install @bernierllc/queue-managerFor Redis backend support, install the peer dependency:
npm install @bernierllc/queue-manager ioredisUsage
Basic Queue Setup
import { QueueManager, JobPriority } from '@bernierllc/queue-manager';
// Create a queue with memory backend
const queue = new QueueManager('email-queue', {
backend: 'memory',
concurrency: 5,
retryPolicy: {
maxAttempts: 3,
backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
}
});
// Register a job processor
queue.process('send-email', async (job) => {
const { to, subject, body } = job.data;
await sendEmail(to, subject, body);
return { sent: true };
});
// Add a job to the queue
await queue.add('send-email', {
to: '[email protected]',
subject: 'Welcome',
body: 'Welcome to our service!'
}, {
priority: JobPriority.HIGH
});Priority Handling
import { QueueManager, JobPriority } from '@bernierllc/queue-manager';
const queue = new QueueManager('tasks', { backend: 'memory' });
// Add jobs with different priorities
await queue.add('task', { type: 'urgent' }, { priority: JobPriority.CRITICAL });
await queue.add('task', { type: 'important' }, { priority: JobPriority.HIGH });
await queue.add('task', { type: 'routine' }, { priority: JobPriority.NORMAL });
await queue.add('task', { type: 'background' }, { priority: JobPriority.LOW });
// Jobs are processed in priority order (CRITICAL > HIGH > NORMAL > LOW)Delayed Jobs
// Schedule a job to run after 1 hour
await queue.add('reminder', {
message: 'Time to review'
}, {
delay: 3600000 // 1 hour in milliseconds
});
// Schedule with custom retry policy
await queue.add('api-call', {
endpoint: '/users',
method: 'POST'
}, {
delay: 5000,
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
maxDelay: 60000
}
});Bulk Job Addition
// Add multiple jobs efficiently
const jobs = await queue.addBulk([
{ type: 'process-user', data: { userId: 1 }, options: { priority: JobPriority.HIGH } },
{ type: 'process-user', data: { userId: 2 } },
{ type: 'process-user', data: { userId: 3 } }
]);
console.log(`Added ${jobs.length} jobs`);Event Handling
// Listen to queue events
queue.on('job:waiting', (job) => {
console.log(`Job ${job.id} added to queue`);
});
queue.on('job:active', (job) => {
console.log(`Processing job ${job.id}`);
});
queue.on('job:completed', (job) => {
console.log(`Job ${job.id} completed`);
});
queue.on('job:failed', (job) => {
console.error(`Job ${job.id} failed: ${job.error}`);
});
queue.on('job:progress', (job) => {
console.log(`Job ${job.id} progress: ${job.progress}%`);
});Queue Statistics
// Get queue statistics
const stats = await queue.getStats();
console.log('Queue Stats:', {
waiting: stats.waiting,
active: stats.active,
completed: stats.completed,
failed: stats.failed,
delayed: stats.delayed,
totalProcessed: stats.totalProcessed,
throughput: stats.throughput
});Job Management
// Get a specific job
const job = await queue.getJob('job-id-123');
// Remove a job
await queue.removeJob('job-id-123');
// Get jobs by status
const waitingJobs = await queue.getJobs(JobStatus.WAITING);
const failedJobs = await queue.getJobs(JobStatus.FAILED);
// Clean up old completed jobs (older than 24 hours)
const removed = await queue.clean(86400000, JobStatus.COMPLETED);
console.log(`Removed ${removed} old jobs`);Concurrency Control
// Process up to 10 jobs concurrently
const queue = new QueueManager('high-throughput', {
backend: 'memory',
concurrency: 10
});
queue.process('task', async (job) => {
// Job processing logic
await processTask(job.data);
});Manual Queue Control
// Create queue with manual start
const queue = new QueueManager('manual-queue', {
backend: 'memory',
autoStart: false // Don't auto-start processing
});
// Add jobs
await queue.add('task', { data: 'test' });
// Start processing when ready
await queue.start();
// Pause processing
await queue.pause();
// Resume processing
await queue.resume();
// Stop processing and close connections
await queue.close();Error Handling with Dead Letter Queue
const queue = new QueueManager('critical-tasks', {
backend: 'memory',
retryPolicy: {
maxAttempts: 3,
backoff: { type: 'exponential', delay: 1000, maxDelay: 30000 }
},
deadLetterQueue: 'failed-tasks'
});
// Jobs that fail after all retries are moved to the dead letter queue
queue.process('task', async (job) => {
if (job.data.shouldFail) {
throw new Error('Task failed');
}
return { success: true };
});
// Access dead letter queue for manual inspection
const dlq = new QueueManager('failed-tasks', { backend: 'memory' });
const failedJobs = await dlq.getJobs(JobStatus.FAILED);API Reference
QueueManager
Constructor
constructor(name: string, options: QueueOptions)Creates a new queue manager instance.
Parameters:
name- Unique queue nameoptions- Queue configuration options
Methods
add(type: string, data: T, options?: JobOptions): Promise<Job<T>>
Add a single job to the queue.
Parameters:
type- Job type identifier for processor routingdata- Job payload dataoptions- Optional job configuration (priority, delay, attempts, etc.)
Returns: Promise resolving to the created job
addBulk(jobs: Array<{type: string, data: T, options?: JobOptions}>): Promise<Job<T>[]>
Add multiple jobs to the queue efficiently.
Parameters:
jobs- Array of job specifications
Returns: Promise resolving to array of created jobs
process(type: string, processor: JobProcessor<T>): void
Register a processor for a specific job type.
Parameters:
type- Job type to processprocessor- Async function that processes the job
start(): Promise<void>
Start processing jobs from the queue.
pause(): Promise<void>
Pause job processing (current jobs complete, new jobs wait).
resume(): Promise<void>
Resume job processing after pause.
close(): Promise<void>
Stop processing and close backend connections.
getJob(id: string): Promise<Job | null>
Retrieve a specific job by ID.
Parameters:
id- Job ID
Returns: Promise resolving to job or null if not found
getJobs(status: JobStatus): Promise<Job[]>
Get all jobs with a specific status.
Parameters:
status- Job status filter (WAITING, ACTIVE, COMPLETED, FAILED, DELAYED)
Returns: Promise resolving to array of jobs
removeJob(id: string): Promise<void>
Remove a job from the queue.
Parameters:
id- Job ID to remove
getStats(): Promise<QueueStats>
Get queue statistics.
Returns: Promise resolving to statistics object
clean(maxAge: number, status: JobStatus): Promise<number>
Remove old jobs with a specific status.
Parameters:
maxAge- Maximum age in millisecondsstatus- Job status to clean
Returns: Promise resolving to number of jobs removed
Types
QueueOptions
interface QueueOptions {
backend: 'memory' | 'redis';
concurrency?: number; // Default: 1
defaultDelay?: number; // Default: 0
retryPolicy?: RetryPolicy; // Default: 3 attempts, exponential backoff
deadLetterQueue?: string; // Default: '{name}-dlq'
jobTimeout?: number; // Default: 30000ms
cleanupInterval?: number; // Default: 3600000ms (1 hour)
autoStart?: boolean; // Default: true
}JobOptions
interface JobOptions {
priority?: JobPriority;
delay?: number;
attempts?: number;
timeout?: number;
backoff?: BackoffStrategy;
metadata?: Record<string, any>;
}JobPriority
enum JobPriority {
LOW = 1,
NORMAL = 5,
HIGH = 10,
CRITICAL = 20
}JobStatus
enum JobStatus {
WAITING = 'waiting',
ACTIVE = 'active',
COMPLETED = 'completed',
FAILED = 'failed',
DELAYED = 'delayed'
}Job
interface Job<T = any> {
id: string;
type: string;
data: T;
priority: JobPriority;
attempts: number;
maxAttempts: number;
delay?: number;
createdAt: Date;
processedAt?: Date;
completedAt?: Date;
failedAt?: Date;
error?: string;
progress?: number;
metadata?: Record<string, any>;
}QueueStats
interface QueueStats {
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
totalProcessed: number;
throughput: number;
}Integration Status
Logger Integration
Status: Not applicable
This is a pure core utility package focused on queue management. Logger integration is not required as the package:
- Uses standard EventEmitter for job lifecycle events
- Provides comprehensive event system for external logging integration
- Allows consumers to add logging via event handlers
- Keeps the package lightweight and dependency-free
Applications using this package can integrate logging by listening to queue events:
import { createLogger } from '@bernierllc/logger';
const logger = createLogger({ service: 'queue' });
queue.on('job:completed', (job) => {
logger.info('Job completed', { jobId: job.id, type: job.type });
});
queue.on('job:failed', (job) => {
logger.error('Job failed', { jobId: job.id, error: job.error });
});Docs-Suite Integration
Status: Ready
Documentation format: Markdown with TypeScript examples
- Complete API reference with type definitions
- Comprehensive usage examples
- Integration patterns documented
NeverHub Integration
Status: Not applicable
This is a foundational core utility package that:
- Provides queue management primitives
- Has no runtime service dependencies
- Operates independently without service discovery needs
- Designed for embedding in higher-level service packages
NeverHub integration should be implemented in service-layer packages that orchestrate multiple core utilities, not in the atomic core packages themselves. This maintains clean separation of concerns per MECE architecture.
Configuration
This is a pure utility core package with no runtime environment configuration. All configuration is provided programmatically through the constructor options.
Programmatic Configuration
const queue = new QueueManager('my-queue', {
backend: 'memory', // or 'redis'
concurrency: 5, // Process 5 jobs concurrently
defaultDelay: 0, // Default job delay in ms
retryPolicy: {
maxAttempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
maxDelay: 30000
}
},
deadLetterQueue: 'my-queue-dlq',
jobTimeout: 30000, // Job timeout in ms
cleanupInterval: 3600000, // Cleanup interval in ms
autoStart: true // Auto-start processing
});Features
- Multiple Backends: Memory (development) and Redis (production) support
- Priority Queues: Process jobs based on priority (CRITICAL > HIGH > NORMAL > LOW)
- Delayed Jobs: Schedule jobs to run after a specified delay
- Automatic Retries: Configurable retry logic with exponential backoff using @bernierllc/retry-policy
- Dead Letter Queue: Failed jobs moved to DLQ after max retry attempts
- Concurrency Control: Process multiple jobs in parallel with configurable limits
- Job Lifecycle Events: Comprehensive event system for monitoring
- Statistics & Monitoring: Real-time queue metrics and throughput tracking
- Job Management: Get, remove, and clean jobs by status
- Type-Safe: Full TypeScript support with strict typing
- Timeout Protection: Configurable job timeouts to prevent stuck jobs
- Bulk Operations: Efficient bulk job addition
Dependencies
@bernierllc/retry-policy- Exponential backoff and retry logic
See Also
- @bernierllc/retry-policy - Retry strategies used by this package
- @bernierllc/message-queue - Alternative message queue implementation
License
Copyright (c) 2025 Bernier LLC. All rights reserved.
This package is licensed under the MIT License.
