npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@boringnode/queue

v0.4.0

Published

A simple and efficient queue system for Node.js applications

Downloads

1,649

Readme

@boringnode/queue

typescript-image gh-workflow-image npm-image npm-download-image license-image

A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.

Installation

npm install @boringnode/queue

Features

  • Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
  • Type-Safe Jobs: TypeScript classes with typed payloads
  • Delayed Jobs: Schedule jobs to run after a delay
  • Priority Queues: Process high-priority jobs first
  • Bulk Dispatch: Efficiently dispatch thousands of jobs at once
  • Job Grouping: Organize related jobs for monitoring
  • Retry with Backoff: Exponential, linear, or fixed backoff strategies
  • Job Timeout: Fail or retry jobs that exceed a time limit
  • Job History: Retain completed/failed jobs for debugging
  • Scheduled Jobs: Cron or interval-based recurring jobs
  • Auto-Discovery: Automatically register jobs from specified locations

Quick Start

1. Define a Job

import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'

interface SendEmailPayload {
  to: string
}

export default class SendEmailJob extends Job<SendEmailPayload> {
  static options: JobOptions = {
    queue: 'email',
  }

  async execute(): Promise<void> {
    console.log(`Sending email to: ${this.payload.to}`)
  }
}

[!NOTE] The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.

[!WARNING] If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.

2. Configure the Queue Manager

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost', port: 6379 }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

3. Dispatch Jobs

// Simple dispatch
await SendEmailJob.dispatch({ to: '[email protected]' })

// With options
await SendEmailJob.dispatch({ to: '[email protected]' })
  .toQueue('high-priority')
  .priority(1)
  .in('5m')

4. Start a Worker

import { Worker } from '@boringnode/queue'

const worker = new Worker(config)
await worker.start(['default', 'email'])

Bulk Dispatch

Efficiently dispatch thousands of jobs in a single batch operation:

const { jobIds } = await SendEmailJob.dispatchMany([
  { to: '[email protected]' },
  { to: '[email protected]' },
  { to: '[email protected]' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)

console.log(`Dispatched ${jobIds.length} jobs`)

This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.

Job Grouping

Organize related jobs together for monitoring and filtering:

// Group newsletter jobs
await SendEmailJob.dispatch({ to: '[email protected]' }).group('newsletter-jan-2025')

// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')

The groupId is stored with job data and accessible via job.data.groupId.

Job History & Retention

Keep completed and failed jobs for debugging:

export default class ImportantJob extends Job<Payload> {
  static options: JobOptions = {
    // Keep last 1000 completed jobs
    removeOnComplete: { count: 1000 },

    // Keep failed jobs for 7 days
    removeOnFail: { age: '7d' },
  }
}

| Value | Behavior | | --------------------------- | ------------------ | | true (default) | Remove immediately | | false | Keep forever | | { count: n } | Keep last n jobs | | { age: '7d' } | Keep for duration | | { count: 100, age: '1d' } | Both limits apply |

Query job history:

const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status) // 'completed' | 'failed'
console.log(job.finishedAt) // timestamp
console.log(job.error) // error message (if failed)

Adapters

Redis (recommended for production)

import { redis } from '@boringnode/queue/drivers/redis_adapter'

// With options
const adapter = redis({ host: 'localhost', port: 6379 })

// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)

Knex (PostgreSQL, MySQL, SQLite)

import { knex } from '@boringnode/queue/drivers/knex_adapter'

const adapter = knex({
  client: 'pg',
  connection: { host: 'localhost', database: 'myapp' },
})
// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)

// Custom table name
const adapter = knex(config, 'custom_jobs_table')

The Knex adapter requires tables to be created before use. Use QueueSchemaService to create them:

import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'

const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)

// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()

// Or extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
  table.string('tenant_id', 255).nullable()
})

AdonisJS migration example:

import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'

export default class extends BaseSchema {
  async up() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.createJobsTable()
    await schemaService.createSchedulesTable()
  }

  async down() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.dropSchedulesTable()
    await schemaService.dropJobsTable()
  }
}

Fake (testing + assertions)

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost' }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

const adapter = QueueManager.fake()

await SendEmailJob.dispatch({ to: '[email protected]' })

adapter.assertPushed(SendEmailJob)
adapter.assertPushed(SendEmailJob, {
  queue: 'default',
  payload: (payload) => payload.to === '[email protected]',
})
adapter.assertPushedCount(1)

QueueManager.restore()

Sync (for testing)

import { sync } from '@boringnode/queue/drivers/sync_adapter'

const adapter = sync() // Jobs execute immediately

Job Options

export default class MyJob extends Job<Payload> {
  static options: JobOptions = {
    queue: 'email', // Queue name (default: 'default')
    priority: 1, // Lower = higher priority (default: 5)
    maxRetries: 3, // Retry attempts before failing
    timeout: '30s', // Max execution time
    failOnTimeout: true, // Fail permanently on timeout (default: retry)
    removeOnComplete: { count: 100 }, // Keep last 100 completed
    removeOnFail: { age: '7d' }, // Keep failed for 7 days
  }
}

Delayed Jobs

await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
await SendEmailJob.dispatch(payload).in('1d') // 1 day

Retry & Backoff

import { exponentialBackoff } from '@boringnode/queue'

export default class ReliableJob extends Job<Payload> {
  static options: JobOptions = {
    maxRetries: 5,
    retry: {
      backoff: () =>
        exponentialBackoff({
          baseDelay: '1s',
          maxDelay: '1m',
          multiplier: 2,
          jitter: true,
        }),
    },
  }
}
import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'

// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })

// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })

// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })

Job Timeout

export default class LongRunningJob extends Job<Payload> {
  static options: JobOptions = {
    timeout: '30s',
    failOnTimeout: false, // Will retry (default)
  }

  async execute(): Promise<void> {
    for (const item of this.payload.items) {
      // Check abort signal for graceful timeout handling
      if (this.signal?.aborted) {
        throw new Error('Job timed out')
      }
      await this.processItem(item)
    }
  }
}

Job Context

Access execution metadata via this.context:

async execute(): Promise<void> {
  console.log(this.context.jobId)       // Unique job ID
  console.log(this.context.attempt)     // 1, 2, 3...
  console.log(this.context.queue)       // Queue name
  console.log(this.context.priority)    // Priority value
  console.log(this.context.acquiredAt)  // When acquired
  console.log(this.context.stalledCount) // Stall recoveries
}

Scheduled Jobs

Run jobs on a recurring basis:

// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' }).every('10s')

// Cron schedule
await CleanupJob.schedule({ days: 30 })
  .id('daily-cleanup')
  .cron('0 0 * * *') // Midnight daily
  .timezone('Europe/Paris')
import { Schedule } from '@boringnode/queue'

// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Run now
await schedule.delete()

// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })

Schedule options:

| Method | Description | | ------------------- | --------------------------------- | | .id(string) | Unique identifier | | .every(duration) | Fixed interval ('5s', '1m', '1h') | | .cron(expression) | Cron schedule | | .timezone(tz) | Timezone (default: 'UTC') | | .from(date) | Start boundary | | .to(date) | End boundary | | .limit(n) | Maximum runs |

Dependency Injection

Integrate with IoC containers:

await QueueManager.init({
  // ...
  jobFactory: async (JobClass) => {
    return app.container.make(JobClass)
  },
})
export default class SendEmailJob extends Job<SendEmailPayload> {
  constructor(
    private mailer: MailerService,
    private logger: Logger
  ) {
    super()
  }

  async execute(): Promise<void> {
    this.logger.info(`Sending email to ${this.payload.to}`)
    await this.mailer.send(this.payload)
  }
}

Worker Configuration

const config = {
  worker: {
    concurrency: 5, // Parallel jobs
    idleDelay: '2s', // Poll interval when idle
    timeout: '1m', // Default job timeout
    stalledThreshold: '30s', // When to consider job stalled
    stalledInterval: '30s', // How often to check
    maxStalledCount: 1, // Max recoveries before failing
    gracefulShutdown: true, // Wait for jobs on SIGTERM
  },
}

Logging

import { pino } from 'pino'

await QueueManager.init({
  // ...
  logger: pino(),
})

Benchmarks

Performance comparison with BullMQ (5ms simulated work per job):

| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff | | ---- | ----------- | ----------------- | ------ | ----------- | | 1000 | 5 | 1096ms | 1116ms | 1.8% faster | | 1000 | 10 | 565ms | 579ms | 2.4% faster | | 100K | 10 | 56.2s | 57.5s | 2.1% faster | | 100K | 20 | 29.1s | 29.6s | 1.7% faster |

npm run benchmark -- --realistic