npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@boringnode/queue

v0.5.1

Published

A simple and efficient queue system for Node.js applications

Readme

@boringnode/queue

typescript-image gh-workflow-image npm-image npm-download-image license-image

A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.

Installation

npm install @boringnode/queue

Features

  • Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
  • Type-Safe Jobs: TypeScript classes with typed payloads
  • Delayed Jobs: Schedule jobs to run after a delay
  • Priority Queues: Process high-priority jobs first
  • Bulk Dispatch: Efficiently dispatch thousands of jobs at once
  • Job Grouping: Organize related jobs for monitoring
  • Retry with Backoff: Exponential, linear, or fixed backoff strategies
  • Job Timeout: Fail or retry jobs that exceed a time limit
  • Job History: Retain completed/failed jobs for debugging
  • Scheduled Jobs: Cron or interval-based recurring jobs
  • Auto-Discovery: Automatically register jobs from specified locations

Quick Start

1. Define a Job

import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'

interface SendEmailPayload {
  to: string
}

export default class SendEmailJob extends Job<SendEmailPayload> {
  static options: JobOptions = {
    queue: 'email',
  }

  async execute(): Promise<void> {
    console.log(`Sending email to: ${this.payload.to}`)
  }
}

[!NOTE] The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.

[!WARNING] If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.

2. Configure the Queue Manager

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost', port: 6379 }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

3. Dispatch Jobs

// Simple dispatch
await SendEmailJob.dispatch({ to: '[email protected]' })

// With options
await SendEmailJob.dispatch({ to: '[email protected]' })
  .toQueue('high-priority')
  .priority(1)
  .in('5m')

4. Start a Worker

import { Worker } from '@boringnode/queue'

const worker = new Worker(config)
await worker.start(['default', 'email'])

Bulk Dispatch

Efficiently dispatch thousands of jobs in a single batch operation:

const { jobIds } = await SendEmailJob.dispatchMany([
  { to: '[email protected]' },
  { to: '[email protected]' },
  { to: '[email protected]' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)

console.log(`Dispatched ${jobIds.length} jobs`)

This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.

Job Grouping

Organize related jobs together for monitoring and filtering:

// Group newsletter jobs
await SendEmailJob.dispatch({ to: '[email protected]' }).group('newsletter-jan-2025')

// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')

The groupId is stored with job data and accessible via job.data.groupId.

Job History & Retention

Keep completed and failed jobs for debugging:

export default class ImportantJob extends Job<Payload> {
  static options: JobOptions = {
    // Keep last 1000 completed jobs
    removeOnComplete: { count: 1000 },

    // Keep failed jobs for 7 days
    removeOnFail: { age: '7d' },
  }
}

| Value | Behavior | | --------------------------- | ------------------ | | true (default) | Remove immediately | | false | Keep forever | | { count: n } | Keep last n jobs | | { age: '7d' } | Keep for duration | | { count: 100, age: '1d' } | Both limits apply |

Query job history:

const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status) // 'completed' | 'failed'
console.log(job.finishedAt) // timestamp
console.log(job.error) // error message (if failed)

Adapters

Redis (recommended for production)

import { redis } from '@boringnode/queue/drivers/redis_adapter'

// With options
const adapter = redis({ host: 'localhost', port: 6379 })

// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)

Knex (PostgreSQL, MySQL, SQLite)

import { knex } from '@boringnode/queue/drivers/knex_adapter'

const adapter = knex({
  client: 'pg',
  connection: { host: 'localhost', database: 'myapp' },
})
// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)

// Custom table name
const adapter = knex(config, 'custom_jobs_table')

The Knex adapter requires tables to be created before use. Use QueueSchemaService to create them:

import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'

const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)

// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()

// Or extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
  table.string('tenant_id', 255).nullable()
})

AdonisJS migration example:

import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'

export default class extends BaseSchema {
  async up() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.createJobsTable()
    await schemaService.createSchedulesTable()
  }

  async down() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.dropSchedulesTable()
    await schemaService.dropJobsTable()
  }
}

Fake (testing + assertions)

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost' }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

const adapter = QueueManager.fake()

await SendEmailJob.dispatch({ to: '[email protected]' })

adapter.assertPushed(SendEmailJob)
adapter.assertPushed(SendEmailJob, {
  queue: 'default',
  payload: (payload) => payload.to === '[email protected]',
})
adapter.assertPushedCount(1)

QueueManager.restore()

Sync (for testing)

import { sync } from '@boringnode/queue/drivers/sync_adapter'

const adapter = sync() // Jobs execute immediately

Use the sync adapter for tests and lightweight local development only.

  • await MyJob.dispatch(payload).run() waits for the job to fully finish.
  • Retries are executed inline, not by a background worker.
  • If you configure backoff, the adapter will sleep between attempts.
  • This means the caller can stay blocked for the full retry duration.

Example: with maxRetries: 3 and an exponential backoff of 1s, 2s, 4s, the request or command that dispatched the job can stay busy for about 7 seconds before the job exhausts its retries and runs failed().

Job Options

export default class MyJob extends Job<Payload> {
  static options: JobOptions = {
    queue: 'email', // Queue name (default: 'default')
    priority: 1, // Lower = higher priority (default: 5)
    maxRetries: 3, // Retry attempts before failing
    timeout: '30s', // Max execution time
    failOnTimeout: true, // Fail permanently on timeout (default: retry)
    removeOnComplete: { count: 100 }, // Keep last 100 completed
    removeOnFail: { age: '7d' }, // Keep failed for 7 days
  }
}

Delayed Jobs

await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
await SendEmailJob.dispatch(payload).in('1d') // 1 day

Retry & Backoff

import { exponentialBackoff } from '@boringnode/queue'

export default class ReliableJob extends Job<Payload> {
  static options: JobOptions = {
    maxRetries: 5,
    retry: {
      backoff: () =>
        exponentialBackoff({
          baseDelay: '1s',
          maxDelay: '1m',
          multiplier: 2,
          jitter: true,
        }),
    },
  }
}

maxRetries can be defined directly on the job options, and retry.backoff controls the delay between attempts.

With the sync adapter, these delays happen inline in the caller via sleep. If a job fails repeatedly, dispatch().run() will take as long as the total backoff duration. Use a worker-backed adapter when you do not want retries to slow down the request/command that dispatched the job.

import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'

// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })

// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })

// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })

Job Timeout

export default class LongRunningJob extends Job<Payload> {
  static options: JobOptions = {
    timeout: '30s',
    failOnTimeout: false, // Will retry (default)
  }

  async execute(): Promise<void> {
    for (const item of this.payload.items) {
      // Check abort signal for graceful timeout handling
      if (this.signal?.aborted) {
        throw new Error('Job timed out')
      }
      await this.processItem(item)
    }
  }
}

Job Context

Access execution metadata via this.context:

async execute(): Promise<void> {
  console.log(this.context.jobId)       // Unique job ID
  console.log(this.context.attempt)     // 1, 2, 3...
  console.log(this.context.queue)       // Queue name
  console.log(this.context.priority)    // Priority value
  console.log(this.context.acquiredAt)  // When acquired
  console.log(this.context.stalledCount) // Stall recoveries
}

Scheduled Jobs

Run jobs on a recurring basis:

// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' }).every('10s')

// Cron schedule
await CleanupJob.schedule({ days: 30 })
  .id('daily-cleanup')
  .cron('0 0 * * *') // Midnight daily
  .timezone('Europe/Paris')
import { Schedule } from '@boringnode/queue'

// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Run now
await schedule.delete()

// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })

Schedule options:

| Method | Description | | ------------------- | --------------------------------- | | .id(string) | Unique identifier | | .every(duration) | Fixed interval ('5s', '1m', '1h') | | .cron(expression) | Cron schedule | | .timezone(tz) | Timezone (default: 'UTC') | | .from(date) | Start boundary | | .to(date) | End boundary | | .limit(n) | Maximum runs |

Dependency Injection

Integrate with IoC containers:

await QueueManager.init({
  // ...
  jobFactory: async (JobClass) => {
    return app.container.make(JobClass)
  },
})
export default class SendEmailJob extends Job<SendEmailPayload> {
  constructor(
    private mailer: MailerService,
    private logger: Logger
  ) {
    super()
  }

  async execute(): Promise<void> {
    this.logger.info(`Sending email to ${this.payload.to}`)
    await this.mailer.send(this.payload)
  }
}

Worker Configuration

const config = {
  worker: {
    concurrency: 5, // Parallel jobs
    idleDelay: '2s', // Poll interval when idle
    timeout: '1m', // Default job timeout
    stalledThreshold: '30s', // When to consider job stalled
    stalledInterval: '30s', // How often to check
    maxStalledCount: 1, // Max recoveries before failing
    gracefulShutdown: true, // Wait for jobs on SIGTERM
  },
}

Logging

import { pino } from 'pino'

await QueueManager.init({
  // ...
  logger: pino(),
})

OpenTelemetry Instrumentation (experimental)

[!WARNING] The OpenTelemetry instrumentation is experimental and its API may change in future releases.

@boringnode/queue ships with built-in OpenTelemetry instrumentation that creates PRODUCER spans for job dispatch and CONSUMER spans for job execution, following OTel messaging semantic conventions.

Quick Setup

import { QueueInstrumentation } from '@boringnode/queue/otel'
import * as boringqueue from '@boringnode/queue'

const instrumentation = new QueueInstrumentation({
  messagingSystem: 'boringqueue', // default
  executionSpanLinkMode: 'link',  // or 'parent'
})

instrumentation.enable()
instrumentation.manuallyRegister(boringqueue)

The instrumentation patches QueueManager.init() to automatically inject its wrappers — no config changes needed in your queue setup.

Span Attributes

The instrumentation uses standard OTel messaging semantic conventions where they map cleanly, plus a few queue-specific custom attributes.

| Attribute | Kind | Description | | ------------------------------- | ------- | ------------------------------------------ | | messaging.system | Semconv | 'boringqueue' (configurable) | | messaging.operation.name | Semconv | 'publish' or 'process' | | messaging.destination.name | Semconv | Queue name | | messaging.message.id | Semconv | Job ID for single-message spans | | messaging.batch.message_count | Semconv | Number of jobs in a batch dispatch | | messaging.message.retry.count | Custom | Retry count (0-based) for a job attempt | | messaging.job.name | Custom | Job class name (e.g. SendEmailJob) | | messaging.job.status | Custom | 'completed', 'failed', or 'retrying' | | messaging.job.group_id | Custom | Queue-specific group identifier | | messaging.job.priority | Custom | Queue-specific job priority | | messaging.job.delay_ms | Custom | Delay before the job becomes available |

Trace Context Propagation

The instrumentation automatically propagates trace context from dispatch to execution:

  • Link mode (default): Each job execution is an independent trace, linked to the dispatch span
  • Parent mode: Job execution is a child of the dispatch span (same trace)

Child spans created inside execute() (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span.

diagnostics_channel

Raw telemetry events are available via diagnostics_channel for custom subscribers:

import { tracingChannels } from '@boringnode/queue'

const { executeChannel } = tracingChannels

executeChannel.subscribe({
  start() {},
  end() {},
  asyncStart() {},
  asyncEnd(message) {
    console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`)
  },
  error() {},
})

Benchmarks

Performance comparison with BullMQ (5ms simulated work per job):

| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff | | ---- | ----------- | ----------------- | ------ | ----------- | | 1000 | 5 | 1096ms | 1116ms | 1.8% faster | | 1000 | 10 | 565ms | 579ms | 2.4% faster | | 100K | 10 | 56.2s | 57.5s | 2.1% faster | | 100K | 20 | 29.1s | 29.6s | 1.7% faster |

npm run benchmark -- --realistic