queue-runtime
v0.1.2
Published
A powerful, flexible multi-queue job processing runtime with global worker pool, automatic worker redistribution, and support for multiple queue drivers (Redis, RabbitMQ, AWS SQS, Memory)
Maintainers
Readme
Queue Runtime
A powerful, flexible multi-queue job processing runtime with global worker pool, automatic worker redistribution, and support for multiple queue drivers.
Features
Core Features
- ✅ Multi-queue support - Manage multiple job queues independently
- ✅ Global worker pool - Shared worker pool across all queues with
maxWorkerslimit - ✅ Per-queue concurrency limits - Control concurrency for each queue
- ✅ Worker redistribution - Automatically redistribute idle workers to busy queues
- ✅ Rebalancing on release - Rebalance workers when queues become idle
- ✅ Graceful shutdown - Wait for running jobs to complete before shutdown
Error Handling
- ✅ Retry mechanism - Automatic retry with exponential backoff
- ✅ Dead Letter Queue (DLQ) - Failed jobs after max attempts sent to DLQ
- ✅ Error tracking - Track and report errors with detailed statistics
Queue Drivers
- ✅ Redis - Production-ready Redis driver (default)
- ✅ RabbitMQ - RabbitMQ driver (optional dependency)
- ✅ AWS SQS - AWS SQS driver (optional dependency)
Installation
npm install queue-runtimeOptional Dependencies
For RabbitMQ support:
npm install amqplib @types/amqplibFor AWS SQS support:
npm install @aws-sdk/client-sqsQuick Start
import { RedisDriver, QueueRuntime } from 'queue-runtime';
// Or import drivers separately:
// import { RedisDriver } from 'queue-runtime/drivers/redis';
// Create driver
const driver = new RedisDriver('redis://localhost:6379');
await driver.connect();
// Create runtime
const runtime = new QueueRuntime(driver, {
maxWorkers: 15,
enableRedistributeIdleWorkers: true,
enableRebalanceOnRelease: true,
pollInterval: 50,
});
// Register jobs
runtime.registerJob({
queueName: 'send-email',
concurrency: 8,
maxAttempts: 3,
handler: async (payload) => {
// Your job logic here
console.log('Sending email to:', payload.to);
},
});
// Enqueue jobs
await runtime.enqueue('send-email', { to: '[email protected]' });
// Start processing
runtime.start();Configuration
QueueRuntimeConfig
interface QueueRuntimeConfig {
maxWorkers: number; // Maximum workers across all queues
enableRedistributeIdleWorkers?: boolean; // Auto-redistribute idle workers (default: true)
enableRebalanceOnRelease?: boolean; // Rebalance when queue releases workers (default: true)
pollInterval?: number; // Polling interval in ms (default: 50)
errorHandler?: ErrorHandlerConfig; // Error handling configuration
}JobDefinition
interface JobDefinition {
queueName: string; // Unique queue name
concurrency: number; // Max concurrent jobs for this queue
handler: (payload: any) => Promise<any>; // Job handler function
maxAttempts?: number; // Max retry attempts (default: 3)
retryBackoffMs?: number; // Base retry delay (default: 1000ms)
retryBackoffMultiplier?: number; // Exponential backoff multiplier (default: 2)
maxRetryDelayMs?: number; // Max retry delay (default: 300000ms = 5min)
dlqEnabled?: boolean; // Enable dead letter queue (default: true)
}Queue Drivers
Redis Driver
import { RedisDriver } from 'queue-runtime/drivers/redis';
const driver = new RedisDriver('redis://localhost:6379');
await driver.connect();RabbitMQ Driver
import { RabbitMQDriver } from 'queue-runtime';
// Or: import { RabbitMQDriver } from 'queue-runtime/drivers/rabbitmq';
const driver = new RabbitMQDriver({
url: 'amqp://localhost',
durable: true,
prefetch: 1,
});
await driver.connect();AWS SQS Driver
import { SQSDriver } from 'queue-runtime';
// Or: import { SQSDriver } from 'queue-runtime/drivers/sqs';
const driver = new SQSDriver({
region: 'us-east-1',
accessKeyId: 'your-key',
secretAccessKey: 'your-secret',
});
await driver.connect();Memory Driver (Testing)
import { MemoryDriver } from 'queue-runtime';
// Or: import { MemoryDriver } from 'queue-runtime/drivers/memory';
const driver = new MemoryDriver();
await driver.connect();Statistics & Monitoring
// Get runtime statistics
const stats = runtime.getStats();
console.log(stats);
// {
// maxWorkers: 15,
// globalPool: 5,
// available: 10,
// queues: [...],
// errors: { totalErrors: 0, errorsByQueue: {}, errorsByType: {} }
// }
// Get error reports
const errors = runtime.getErrorReports(10); // Last 10 errorsError Handling
Jobs automatically retry on failure with exponential backoff. After max attempts, jobs are sent to a Dead Letter Queue (DLQ).
runtime.registerJob({
queueName: 'process-order',
concurrency: 5,
maxAttempts: 3,
retryBackoffMs: 1000, // Start with 1 second
retryBackoffMultiplier: 2, // Double each retry: 1s, 2s, 4s
maxRetryDelayMs: 30000, // Max 30 seconds
dlqEnabled: true, // Send to DLQ after max attempts
handler: async (payload) => {
// Your job logic
},
});Worker Redistribution
The runtime automatically redistributes idle workers to busy queues:
- Idle Worker Redistribution: When total concurrency < maxWorkers, idle workers are distributed evenly across queues
- Rebalance on Release: When a queue releases all workers, they're redistributed to other active queues
License
MIT
Requirements
- Node.js >= 18
