distrib-job-queue
v1.0.0
Published
A fault-tolerant distributed job queue with pluggable backends for Redis, RabbitMQ, or file system
Maintainers
Readme
Distributed Job Queue
A fault-tolerant distributed job queue system with pluggable backends for Node.js applications.
Features
- Pluggable Backends: Support for Redis, RabbitMQ, and file-based storage
- Fault Tolerance: Automatic retry with configurable backoff strategies
- Priority Queuing: Jobs can be assigned different priority levels
- Delayed Jobs: Schedule jobs to run at a later time
- Concurrency Control: Configure the number of concurrent jobs
- Middleware Support: Extensible with custom middleware for logging, metrics, rate limiting, etc.
- Typescript Support: Built with TypeScript for type safety
Installation
npm install distrib-job-queueQuick Start
import { Queue, Worker, JobPriority } from 'distrib-job-queue';
async function main() {
// Create a queue with Redis backend
const queue = new Queue({
name: 'email-queue',
backend: 'redis',
backendOptions: {
url: 'redis://localhost:6379'
}
});
// Connect to the queue
await queue.connect();
// Create a worker
const worker = queue.createWorker({ concurrency: 5 });
// Register job processor
worker.register('send-email', async (job) => {
const { to, subject, body } = job.data;
console.log(`Sending email to ${to} with subject "${subject}"`);
// Your email sending logic here
return { sent: true, timestamp: new Date() };
});
// Start worker
worker.start();
// Add a job to the queue
await queue.add('send-email', {
to: '[email protected]',
subject: 'Welcome to our service',
body: 'Thank you for signing up!'
}, {
priority: JobPriority.HIGH,
retries: 3
});
}
main().catch(console.error);Usage
Creating a Queue
import { Queue } from 'distrib-job-queue';
// Redis backend
const redisQueue = new Queue({
name: 'my-queue',
backend: 'redis',
backendOptions: {
url: 'redis://localhost:6379'
}
});
// RabbitMQ backend
const rabbitmqQueue = new Queue({
name: 'my-queue',
backend: 'rabbitmq',
backendOptions: {
url: 'amqp://guest:guest@localhost:5672'
}
});
// File-based backend
const fileQueue = new Queue({
name: 'my-queue',
backend: 'file',
backendOptions: {
directory: './queue-data'
}
});
// Connect to the queue
await redisQueue.connect();Adding Jobs
// Simple job
await queue.add('job-name', { foo: 'bar' });
// With options
await queue.add('job-name', { foo: 'bar' }, {
priority: JobPriority.HIGH,
retries: 5,
delay: 60000, // 1 minute delay
backoff: {
type: 'exponential',
delay: 1000,
maxDelay: 30000,
jitter: true
},
timeout: 10000, // 10 seconds timeout
removeOnComplete: true,
removeOnFail: false
});Creating a Worker
const worker = queue.createWorker({
concurrency: 5,
pollInterval: 1000,
timeout: 30000,
autostart: true
});
// Register job processors
worker.register('job-name', async (job) => {
// Process the job
console.log(`Processing job ${job.id}`);
console.log('Job data:', job.data);
// Return a result
return { processed: true };
});
// Start the worker if not auto-started
worker.start();
// Stop the worker
await worker.stop();Using Middleware
import { createLoggerMiddleware, createRateLimiterMiddleware, createMetricsMiddleware } from 'distrib-job-queue';
// Add middleware to the queue
queue.use(createLoggerMiddleware({ level: 'debug' }));
queue.use(createRateLimiterMiddleware({
limit: 100,
window: 60000 // 100 jobs per minute
}));
// Add middleware to the worker
worker.use(createMetricsMiddleware());
// Custom middleware
queue.use(async (ctx, next) => {
const { job } = ctx;
console.log(`Processing job ${job.id}`);
// Add tracing ID to the job data
job.data.tracingId = 'trace-' + Date.now();
// Call next middleware
await next();
console.log(`Job ${job.id} completed`);
});Handling Events
// Queue events
queue.on('ready', () => console.log('Queue is ready'));
queue.on('job added', (job) => console.log(`Job ${job.id} added`));
queue.on('error', (err) => console.error('Queue error:', err));
// Worker events
worker.on('process', (job) => console.log(`Processing job ${job.id}`));
worker.on('completed', (job) => console.log(`Job ${job.id} completed with result:`, job.result));
worker.on('failed', (job, err) => console.error(`Job ${job.id} failed:`, err));
worker.on('error', (err) => console.error('Worker error:', err));API Reference
Queue
Constructor
new Queue(options: QueueOptions)Options:
name: Queue name (required)backend: Backend type: 'redis', 'rabbitmq', or 'file' (required)backendOptions: Backend-specific options (required)
Methods
connect(): Connect to the backenddisconnect(): Disconnect from the backendadd(name: string, data: any, options?: JobOptions): Add a job to the queuegetJob(id: string): Get a job by IDremoveJob(id: string): Remove a job by IDcount(): Count jobs in the queueclear(): Clear the queueuse(middleware: MiddlewareFunction): Add middlewarecreateWorker(options?: WorkerOptions): Create a worker for this queue
Worker
Constructor
queue.createWorker(options?: WorkerOptions)Options:
concurrency: Maximum number of concurrent jobs (default: 1)pollInterval: Poll interval in ms (default: 1000)timeout: Default job timeout in ms (default: 30000)autostart: Start processing automatically (default: true)
Methods
register(jobName: string, processor: JobProcessor): Register a job processorstart(): Start processing jobsstop(): Stop processing jobsuse(middleware: MiddlewareFunction): Add middleware
Job
Properties
id: Unique job IDname: Job namedata: Job datastatus: Job status (PENDING, ACTIVE, COMPLETED, FAILED, RETRYING, DELAYED)priority: Job priority (LOW, NORMAL, HIGH, CRITICAL)attempts: Number of attemptsmaxRetries: Maximum number of retriesdelay: Delay in mstimeout: Timeout in mscreatedAt: Creation timestampstartedAt: Start timestampcompletedAt: Completion timestampfailedAt: Failure timestampresult: Job resulterror: Job error
Middleware
Built-in Middleware
Logger Middleware
import { createLoggerMiddleware } from 'distrib-job-queue';
queue.use(createLoggerMiddleware({
level: 'debug', // 'debug', 'info', 'warn', 'error'
logger: console // Custom logger
}));Rate Limiter Middleware
import { createRateLimiterMiddleware } from 'distrib-job-queue';
queue.use(createRateLimiterMiddleware({
limit: 100, // Max jobs
window: 60000, // Time window in ms
queueing: true // Queue excess jobs or reject
}));Metrics Middleware
import { createMetricsMiddleware, metrics } from 'distrib-job-queue';
worker.use(createMetricsMiddleware());
// Get metrics
console.log(metrics.getMetrics());
// Listen for metric events
metrics.on('job:processed', ({ jobType }) => {
console.log(`Job processed: ${jobType}`);
});License
MIT
