@rudderjs/queue
v4.1.2
Published
Queue job abstractions, registry, and provider factory with built-in `sync` driver and plugin support for `inngest` and `bullmq`.
Readme
@rudderjs/queue
Queue job abstractions, registry, and provider factory with built-in sync driver and plugin support for inngest and bullmq.
Installation
pnpm add @rudderjs/queueSetup
// config/queue.ts
import type { QueueConfig } from '@rudderjs/queue'
export default {
default: Env.get('QUEUE_CONNECTION', 'sync'),
connections: {
sync: { driver: 'sync' },
bullmq: { driver: 'bullmq', host: '127.0.0.1', port: 6379 },
},
} satisfies QueueConfig// bootstrap/providers.ts
import { queue } from '@rudderjs/queue'
import configs from '../config/index.js'
export default [queue(configs.queue)]Defining Jobs
import { Job } from '@rudderjs/queue'
export class SendWelcomeEmail extends Job {
static override queue = 'emails'
static override retries = 5
static override delay = 0
constructor(private readonly userId: string) { super() }
async handle(): Promise<void> {
// send the email
}
async failed(error: unknown): Promise<void> {
// called when all retries are exhausted
console.error('SendWelcomeEmail failed', error)
}
}Dispatching Jobs
// Dispatch with defaults from static properties
await SendWelcomeEmail.dispatch('user-123').send()
// Override queue and delay at call site
await SendWelcomeEmail.dispatch('user-123')
.onQueue('priority')
.delay(5000)
.send()Job Static Properties
| Property | Default | Description |
|-----------|-------------|------------------------------------------|
| queue | 'default' | Queue name to dispatch to. |
| retries | 3 | Retry attempts before calling failed(). |
| delay | 0 | Delay in ms before the job runs. |
DispatchBuilder Methods
| Method | Returns | Description |
|-------------------|-------------------|-----------------------------------|
| delay(ms) | this | Override the job delay (ms). |
| onQueue(name) | this | Override the target queue name. |
| send() | Promise<void> | Dispatch the job. |
Configuration
QueueConfig
interface QueueConfig {
default: string
connections: Record<string, QueueConnectionConfig>
}QueueConnectionConfig
// Sync (built-in — runs jobs immediately in-process)
{ driver: 'sync' }
// BullMQ (requires: pnpm add @rudderjs/queue-bullmq ioredis)
{ driver: 'bullmq', host: '127.0.0.1', port: 6379 }
// Inngest (requires: pnpm add @rudderjs/queue-inngest inngest)
{ driver: 'inngest', eventKey: '...', signingKey: '...' }Built-in Drivers
sync
Runs jobs immediately in the same process. No external dependencies. Good for development and testing.
{ driver: 'sync' }Plugin Drivers
| Driver | Package | Install |
|-----------|----------------------------|--------------------------------------------|
| bullmq | @rudderjs/queue-bullmq | pnpm add @rudderjs/queue-bullmq ioredis |
| inngest | @rudderjs/queue-inngest | pnpm add @rudderjs/queue-inngest inngest |
Rudder Commands
| Command | Description |
|------------------|-------------------------------------------------|
| queue:work | Start a queue worker (BullMQ only). |
| queue:status | Show waiting/active/completed/failed counts. |
| queue:clear | Drain waiting and delayed jobs from a queue. |
| queue:failed | List recently failed jobs. |
| queue:retry | Re-enqueue all failed jobs. |
pnpm rudder queue:work
pnpm rudder queue:work emails,default
pnpm rudder queue:status
pnpm rudder queue:status emails
pnpm rudder queue:clear
pnpm rudder queue:failed
pnpm rudder queue:retryQueueRegistry
import { QueueRegistry } from '@rudderjs/queue'
const adapter = QueueRegistry.get() // QueueAdapter | nullSyncAdapter
Exported for standalone use and testing:
import { SyncAdapter } from '@rudderjs/queue'
const adapter = new SyncAdapter()
await adapter.dispatch(new MyJob())Observers (advanced)
Subscribe to job lifecycle events without depending on @rudderjs/horizon:
import { queueObservers } from '@rudderjs/queue/observers'
const unsubscribe = queueObservers.subscribe((e) => {
if (e.kind === 'job.completed') {
// e.queue, e.jobId, e.name, e.duration, e.completedAt
}
})Event kinds: job.dispatched, job.active, job.completed, job.failed. The registry is a globalThis singleton (mirrors @rudderjs/mcp/observers, @rudderjs/http/observers, @rudderjs/ai/observers). SyncAdapter.dispatch() emits these natively; out-of-process drivers (BullMQ) emit them from the worker process so cross-process subscribers (e.g. @rudderjs/horizon's RedisStorage) see every transition.
Notes
- TTL/delay values are in milliseconds.
syncdriver ignoresdelay— jobs run immediately.SyncAdaptercallsjob.failed()before re-throwing on error.- All commands are registered in
boot()— they appear inpnpm rudder --helpregardless of driver. - Commands that the active driver doesn't support throw an error with a helpful message.
