@boringnode/queue
v0.4.0
Published
A simple and efficient queue system for Node.js applications
Downloads
1,649
Readme
@boringnode/queue
A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.
Installation
npm install @boringnode/queueFeatures
- Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
- Type-Safe Jobs: TypeScript classes with typed payloads
- Delayed Jobs: Schedule jobs to run after a delay
- Priority Queues: Process high-priority jobs first
- Bulk Dispatch: Efficiently dispatch thousands of jobs at once
- Job Grouping: Organize related jobs for monitoring
- Retry with Backoff: Exponential, linear, or fixed backoff strategies
- Job Timeout: Fail or retry jobs that exceed a time limit
- Job History: Retain completed/failed jobs for debugging
- Scheduled Jobs: Cron or interval-based recurring jobs
- Auto-Discovery: Automatically register jobs from specified locations
Quick Start
1. Define a Job
import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'
interface SendEmailPayload {
to: string
}
export default class SendEmailJob extends Job<SendEmailPayload> {
static options: JobOptions = {
queue: 'email',
}
async execute(): Promise<void> {
console.log(`Sending email to: ${this.payload.to}`)
}
}[!NOTE] The job name defaults to the class name (
SendEmailJob). You can override it withname: 'CustomName'in options.
[!WARNING] If you minify your code in production, class names may be mangled. Always specify
nameexplicitly in your job options.
2. Configure the Queue Manager
import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'
await QueueManager.init({
default: 'redis',
adapters: {
redis: redis({ host: 'localhost', port: 6379 }),
},
locations: ['./app/jobs/**/*.ts'],
})3. Dispatch Jobs
// Simple dispatch
await SendEmailJob.dispatch({ to: '[email protected]' })
// With options
await SendEmailJob.dispatch({ to: '[email protected]' })
.toQueue('high-priority')
.priority(1)
.in('5m')4. Start a Worker
import { Worker } from '@boringnode/queue'
const worker = new Worker(config)
await worker.start(['default', 'email'])Bulk Dispatch
Efficiently dispatch thousands of jobs in a single batch operation:
const { jobIds } = await SendEmailJob.dispatchMany([
{ to: '[email protected]' },
{ to: '[email protected]' },
{ to: '[email protected]' },
])
.group('newsletter-jan-2025')
.toQueue('emails')
.priority(3)
console.log(`Dispatched ${jobIds.length} jobs`)This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.
Job Grouping
Organize related jobs together for monitoring and filtering:
// Group newsletter jobs
await SendEmailJob.dispatch({ to: '[email protected]' }).group('newsletter-jan-2025')
// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')The groupId is stored with job data and accessible via job.data.groupId.
Job History & Retention
Keep completed and failed jobs for debugging:
export default class ImportantJob extends Job<Payload> {
static options: JobOptions = {
// Keep last 1000 completed jobs
removeOnComplete: { count: 1000 },
// Keep failed jobs for 7 days
removeOnFail: { age: '7d' },
}
}| Value | Behavior |
| --------------------------- | ------------------ |
| true (default) | Remove immediately |
| false | Keep forever |
| { count: n } | Keep last n jobs |
| { age: '7d' } | Keep for duration |
| { count: 100, age: '1d' } | Both limits apply |
Query job history:
const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status) // 'completed' | 'failed'
console.log(job.finishedAt) // timestamp
console.log(job.error) // error message (if failed)Adapters
Redis (recommended for production)
import { redis } from '@boringnode/queue/drivers/redis_adapter'
// With options
const adapter = redis({ host: 'localhost', port: 6379 })
// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)Knex (PostgreSQL, MySQL, SQLite)
import { knex } from '@boringnode/queue/drivers/knex_adapter'
const adapter = knex({
client: 'pg',
connection: { host: 'localhost', database: 'myapp' },
})// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)
// Custom table name
const adapter = knex(config, 'custom_jobs_table')The Knex adapter requires tables to be created before use. Use QueueSchemaService to create them:
import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)
// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()
// Or extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
table.string('tenant_id', 255).nullable()
})AdonisJS migration example:
import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'
export default class extends BaseSchema {
async up() {
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()
}
async down() {
const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
await schemaService.dropSchedulesTable()
await schemaService.dropJobsTable()
}
}Fake (testing + assertions)
import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'
await QueueManager.init({
default: 'redis',
adapters: {
redis: redis({ host: 'localhost' }),
},
locations: ['./app/jobs/**/*.ts'],
})
const adapter = QueueManager.fake()
await SendEmailJob.dispatch({ to: '[email protected]' })
adapter.assertPushed(SendEmailJob)
adapter.assertPushed(SendEmailJob, {
queue: 'default',
payload: (payload) => payload.to === '[email protected]',
})
adapter.assertPushedCount(1)
QueueManager.restore()Sync (for testing)
import { sync } from '@boringnode/queue/drivers/sync_adapter'
const adapter = sync() // Jobs execute immediatelyJob Options
export default class MyJob extends Job<Payload> {
static options: JobOptions = {
queue: 'email', // Queue name (default: 'default')
priority: 1, // Lower = higher priority (default: 5)
maxRetries: 3, // Retry attempts before failing
timeout: '30s', // Max execution time
failOnTimeout: true, // Fail permanently on timeout (default: retry)
removeOnComplete: { count: 100 }, // Keep last 100 completed
removeOnFail: { age: '7d' }, // Keep failed for 7 days
}
}Delayed Jobs
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
await SendEmailJob.dispatch(payload).in('1d') // 1 dayRetry & Backoff
import { exponentialBackoff } from '@boringnode/queue'
export default class ReliableJob extends Job<Payload> {
static options: JobOptions = {
maxRetries: 5,
retry: {
backoff: () =>
exponentialBackoff({
baseDelay: '1s',
maxDelay: '1m',
multiplier: 2,
jitter: true,
}),
},
}
}import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'
// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })
// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })
// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })Job Timeout
export default class LongRunningJob extends Job<Payload> {
static options: JobOptions = {
timeout: '30s',
failOnTimeout: false, // Will retry (default)
}
async execute(): Promise<void> {
for (const item of this.payload.items) {
// Check abort signal for graceful timeout handling
if (this.signal?.aborted) {
throw new Error('Job timed out')
}
await this.processItem(item)
}
}
}Job Context
Access execution metadata via this.context:
async execute(): Promise<void> {
console.log(this.context.jobId) // Unique job ID
console.log(this.context.attempt) // 1, 2, 3...
console.log(this.context.queue) // Queue name
console.log(this.context.priority) // Priority value
console.log(this.context.acquiredAt) // When acquired
console.log(this.context.stalledCount) // Stall recoveries
}Scheduled Jobs
Run jobs on a recurring basis:
// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' }).every('10s')
// Cron schedule
await CleanupJob.schedule({ days: 30 })
.id('daily-cleanup')
.cron('0 0 * * *') // Midnight daily
.timezone('Europe/Paris')import { Schedule } from '@boringnode/queue'
// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Run now
await schedule.delete()
// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })Schedule options:
| Method | Description |
| ------------------- | --------------------------------- |
| .id(string) | Unique identifier |
| .every(duration) | Fixed interval ('5s', '1m', '1h') |
| .cron(expression) | Cron schedule |
| .timezone(tz) | Timezone (default: 'UTC') |
| .from(date) | Start boundary |
| .to(date) | End boundary |
| .limit(n) | Maximum runs |
Dependency Injection
Integrate with IoC containers:
await QueueManager.init({
// ...
jobFactory: async (JobClass) => {
return app.container.make(JobClass)
},
})export default class SendEmailJob extends Job<SendEmailPayload> {
constructor(
private mailer: MailerService,
private logger: Logger
) {
super()
}
async execute(): Promise<void> {
this.logger.info(`Sending email to ${this.payload.to}`)
await this.mailer.send(this.payload)
}
}Worker Configuration
const config = {
worker: {
concurrency: 5, // Parallel jobs
idleDelay: '2s', // Poll interval when idle
timeout: '1m', // Default job timeout
stalledThreshold: '30s', // When to consider job stalled
stalledInterval: '30s', // How often to check
maxStalledCount: 1, // Max recoveries before failing
gracefulShutdown: true, // Wait for jobs on SIGTERM
},
}Logging
import { pino } from 'pino'
await QueueManager.init({
// ...
logger: pino(),
})Benchmarks
Performance comparison with BullMQ (5ms simulated work per job):
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff | | ---- | ----------- | ----------------- | ------ | ----------- | | 1000 | 5 | 1096ms | 1116ms | 1.8% faster | | 1000 | 10 | 565ms | 579ms | 2.4% faster | | 100K | 10 | 56.2s | 57.5s | 2.1% faster | | 100K | 20 | 29.1s | 29.6s | 1.7% faster |
npm run benchmark -- --realistic