npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

distrib-job-queue

v1.0.0

Published

A fault-tolerant distributed job queue with pluggable backends for Redis, RabbitMQ, or file system

Readme

Distributed Job Queue

A fault-tolerant distributed job queue system with pluggable backends for Node.js applications.

Features

  • Pluggable Backends: Support for Redis, RabbitMQ, and file-based storage
  • Fault Tolerance: Automatic retry with configurable backoff strategies
  • Priority Queuing: Jobs can be assigned different priority levels
  • Delayed Jobs: Schedule jobs to run at a later time
  • Concurrency Control: Configure the number of concurrent jobs
  • Middleware Support: Extensible with custom middleware for logging, metrics, rate limiting, etc.
  • Typescript Support: Built with TypeScript for type safety

Installation

npm install distrib-job-queue

Quick Start

import { Queue, Worker, JobPriority } from 'distrib-job-queue';

async function main() {
  // Create a queue with Redis backend
  const queue = new Queue({
    name: 'email-queue',
    backend: 'redis',
    backendOptions: {
      url: 'redis://localhost:6379'
    }
  });

  // Connect to the queue
  await queue.connect();

  // Create a worker
  const worker = queue.createWorker({ concurrency: 5 });

  // Register job processor
  worker.register('send-email', async (job) => {
    const { to, subject, body } = job.data;
    console.log(`Sending email to ${to} with subject "${subject}"`);
    
    // Your email sending logic here
    
    return { sent: true, timestamp: new Date() };
  });

  // Start worker
  worker.start();

  // Add a job to the queue
  await queue.add('send-email', {
    to: '[email protected]',
    subject: 'Welcome to our service',
    body: 'Thank you for signing up!'
  }, {
    priority: JobPriority.HIGH,
    retries: 3
  });
}

main().catch(console.error);

Usage

Creating a Queue

import { Queue } from 'distrib-job-queue';

// Redis backend
const redisQueue = new Queue({
  name: 'my-queue',
  backend: 'redis',
  backendOptions: {
    url: 'redis://localhost:6379'
  }
});

// RabbitMQ backend
const rabbitmqQueue = new Queue({
  name: 'my-queue',
  backend: 'rabbitmq',
  backendOptions: {
    url: 'amqp://guest:guest@localhost:5672'
  }
});

// File-based backend
const fileQueue = new Queue({
  name: 'my-queue',
  backend: 'file',
  backendOptions: {
    directory: './queue-data'
  }
});

// Connect to the queue
await redisQueue.connect();

Adding Jobs

// Simple job
await queue.add('job-name', { foo: 'bar' });

// With options
await queue.add('job-name', { foo: 'bar' }, {
  priority: JobPriority.HIGH,
  retries: 5,
  delay: 60000, // 1 minute delay
  backoff: {
    type: 'exponential',
    delay: 1000,
    maxDelay: 30000,
    jitter: true
  },
  timeout: 10000, // 10 seconds timeout
  removeOnComplete: true,
  removeOnFail: false
});

Creating a Worker

const worker = queue.createWorker({
  concurrency: 5,
  pollInterval: 1000,
  timeout: 30000,
  autostart: true
});

// Register job processors
worker.register('job-name', async (job) => {
  // Process the job
  console.log(`Processing job ${job.id}`);
  console.log('Job data:', job.data);
  
  // Return a result
  return { processed: true };
});

// Start the worker if not auto-started
worker.start();

// Stop the worker
await worker.stop();

Using Middleware

import { createLoggerMiddleware, createRateLimiterMiddleware, createMetricsMiddleware } from 'distrib-job-queue';

// Add middleware to the queue
queue.use(createLoggerMiddleware({ level: 'debug' }));
queue.use(createRateLimiterMiddleware({
  limit: 100,
  window: 60000 // 100 jobs per minute
}));

// Add middleware to the worker
worker.use(createMetricsMiddleware());

// Custom middleware
queue.use(async (ctx, next) => {
  const { job } = ctx;
  console.log(`Processing job ${job.id}`);
  
  // Add tracing ID to the job data
  job.data.tracingId = 'trace-' + Date.now();
  
  // Call next middleware
  await next();
  
  console.log(`Job ${job.id} completed`);
});

Handling Events

// Queue events
queue.on('ready', () => console.log('Queue is ready'));
queue.on('job added', (job) => console.log(`Job ${job.id} added`));
queue.on('error', (err) => console.error('Queue error:', err));

// Worker events
worker.on('process', (job) => console.log(`Processing job ${job.id}`));
worker.on('completed', (job) => console.log(`Job ${job.id} completed with result:`, job.result));
worker.on('failed', (job, err) => console.error(`Job ${job.id} failed:`, err));
worker.on('error', (err) => console.error('Worker error:', err));

API Reference

Queue

Constructor

new Queue(options: QueueOptions)

Options:

  • name: Queue name (required)
  • backend: Backend type: 'redis', 'rabbitmq', or 'file' (required)
  • backendOptions: Backend-specific options (required)

Methods

  • connect(): Connect to the backend
  • disconnect(): Disconnect from the backend
  • add(name: string, data: any, options?: JobOptions): Add a job to the queue
  • getJob(id: string): Get a job by ID
  • removeJob(id: string): Remove a job by ID
  • count(): Count jobs in the queue
  • clear(): Clear the queue
  • use(middleware: MiddlewareFunction): Add middleware
  • createWorker(options?: WorkerOptions): Create a worker for this queue

Worker

Constructor

queue.createWorker(options?: WorkerOptions)

Options:

  • concurrency: Maximum number of concurrent jobs (default: 1)
  • pollInterval: Poll interval in ms (default: 1000)
  • timeout: Default job timeout in ms (default: 30000)
  • autostart: Start processing automatically (default: true)

Methods

  • register(jobName: string, processor: JobProcessor): Register a job processor
  • start(): Start processing jobs
  • stop(): Stop processing jobs
  • use(middleware: MiddlewareFunction): Add middleware

Job

Properties

  • id: Unique job ID
  • name: Job name
  • data: Job data
  • status: Job status (PENDING, ACTIVE, COMPLETED, FAILED, RETRYING, DELAYED)
  • priority: Job priority (LOW, NORMAL, HIGH, CRITICAL)
  • attempts: Number of attempts
  • maxRetries: Maximum number of retries
  • delay: Delay in ms
  • timeout: Timeout in ms
  • createdAt: Creation timestamp
  • startedAt: Start timestamp
  • completedAt: Completion timestamp
  • failedAt: Failure timestamp
  • result: Job result
  • error: Job error

Middleware

Built-in Middleware

Logger Middleware

import { createLoggerMiddleware } from 'distrib-job-queue';

queue.use(createLoggerMiddleware({
  level: 'debug', // 'debug', 'info', 'warn', 'error'
  logger: console // Custom logger
}));

Rate Limiter Middleware

import { createRateLimiterMiddleware } from 'distrib-job-queue';

queue.use(createRateLimiterMiddleware({
  limit: 100, // Max jobs
  window: 60000, // Time window in ms
  queueing: true // Queue excess jobs or reject
}));

Metrics Middleware

import { createMetricsMiddleware, metrics } from 'distrib-job-queue';

worker.use(createMetricsMiddleware());

// Get metrics
console.log(metrics.getMetrics());

// Listen for metric events
metrics.on('job:processed', ({ jobType }) => {
  console.log(`Job processed: ${jobType}`);
});

License

MIT