@yakocloud/bullmq-router
v2.1.1
Published
Bullmq router
Downloads
1,201
Readme
@yakocloud/bullmq-router
Type-safe file-system-style router for BullMQ. Define jobs as leaf nodes in a nested object, get fully typed push, replace, cancel, exec and more — automatically wired to the right queue and worker.
Why use bullmq-router?
BullMQ is powerful, but as your system grows, managing jobs becomes messy:
- Job names are just strings → easy to break, hard to refactor
- Queues, workers, and handlers are scattered across files
- No clear structure for organizing jobs
- Deduplication and idempotency require manual effort
- Replacing or cancelling jobs is non-trivial
- Testing job logic outside the queue is inconvenient
Installation
npm install @yakocloud/bullmq-router bullmq
bullmqis a peer dependency and must be installed separately.
Quick Start
1. Define jobs
// jobs/email/send.ts
import { defineJob } from '@yakocloud/bullmq-router'
export default defineJob<{ to: string; subject: string }>(async (job) => {
await sendEmail(job.data)
})()2. Build the router
Top-level keys define queue names — each key becomes a separate BullMQ queue. Nested keys are just grouping and don't affect the queue name.
// router.ts
import { createRouter } from '@yakocloud/bullmq-router'
import send from './jobs/email/send.js'
import invoice from './jobs/email/invoice.js'
export const router = createRouter({
email: { // <-- queue name: "email"
send, // job name: "email.send"
invoice, // job name: "email.invoice"
},
})Best practice: mirror the router structure in the file system. Each folder exports a
queueRouterobject that composes its children — the file tree becomes the job tree.
// myJobs/email/path/to/send.ts
export default defineJob<{ text: string }>((job) => {
console.log(job.data.text)
})()
// myJobs/email/path/to/index.ts
import send from './send.js'
export const queueRouter = { send }
// myJobs/email/path/index.ts
import { queueRouter as to } from './to/index.js'
export const queueRouter = { to }
// myJobs/email/index.ts
import { queueRouter as path } from './path/index.js'
export const queueRouter = { path }
// router.ts
import { queueRouter as email } from './myJobs/email/index.js'
export const router = createRouter({
email, // queue name: "email", job name: "email.path.to.send"
})3. Set up workers and queues
// setup.ts
import { setupBullmqRouter } from '@yakocloud/bullmq-router'
import { router } from './router.js'
const workers = setupBullmqRouter(router, {
connection: { host: 'localhost', port: 6379 },
})4. Push jobs
import { router } from './router.js'
await router.email.send.push({ to: '[email protected]', subject: 'Hello' })API
defineJob<T>(pop)(options?)
Defines a job leaf node.
| Parameter | Type | Description |
|---|---|---|
| pop | (job, jobName, queue) => Promise<void> | Handler executed when the job is dequeued |
| options.jobIdComponents | (keyof T)[] | Fields used to build a deterministic job ID |
Returns a job definition object with the following methods:
.push(data, options?)
Adds a single job to the queue.
await router.email.send.push({ to: '[email protected]', subject: 'Hello' }).pushBulk(items)
Adds multiple jobs in a single batched operation.
await router.email.send.pushBulk([
{ data: { to: '[email protected]', subject: 'Hi' } },
{ data: { to: '[email protected]', subject: 'Hey' } },
]).replace(data, options?)
Replaces an existing job with the same ID, preserving its logs. If no existing job is found, a new one is created.
await router.email.send.replace({ to: '[email protected]', subject: 'Updated' }).cancelDelayedJob(data | jobId)
Cancels a delayed job by stamping it with a cancellation time and immediately promoting it. The job handler will skip it automatically.
await router.email.send.cancelDelayedJob({ to: '[email protected]' })
// or by raw job ID:
await router.email.send.cancelDelayedJob('[email protected]')Returns true if the job was found and cancelled, false otherwise.
.getJob(data | jobId)
Fetches a BullMQ Job instance by data or raw job ID. Returns undefined if not found.
const job = await router.email.send.getJob({ to: '[email protected]' }).getFullJob(data | jobId)
Fetches a job along with its logs and current state in a single parallel request.
const job = await router.email.send.getFullJob({ to: '[email protected]' })
// job.state — 'waiting' | 'active' | 'completed' | ...
// job.logs — string[].exec(data)
Executes the job handler synchronously, bypassing the queue entirely. Useful for testing or one-off manual runs.
await router.email.send.exec({ to: '[email protected]', subject: 'Test' }).toString()
Returns the fully qualified dot-separated job name (e.g. email.send). Job definitions are coercible to strings, so they can be used directly in template literals.
console.log(`${router.email.send}`) // "email.send"createRouter<T>(nativeRouter)
Wraps a nested object of job definitions with a Proxy that automatically injects the correct JOB_PATH into every leaf node at access time. No manual path configuration needed.
const router = createRouter({
email: { send, invoice },
notifications: { push },
})setupBullmqRouter(router, options)
Registers queues and workers for every top-level key in the router. Returns a map of BullMQ Worker instances keyed by queue name, so you can attach additional event listeners or shut workers down individually.
| Option | Type | Description |
|---|---|---|
| router | object | The router created by createRouter |
| connection | ConnectionOptions | Default Redis connection for all queues and workers |
| prefix | string \| undefined | Default queue prefix for all queues and workers |
| queueOptions | Partial<Record<keyof R, QueueOptions>> | Per-queue overrides |
| workerOptions | Partial<Record<keyof R, WorkerOptions>> | Per-worker overrides |
| sandboxOptions | { routerPath: string; workers: (keyof R)[]; execArgv?: string[] } \| undefined | Run selected queues in a sandboxed child process (see below) |
Returns Record<keyof R, Worker> — one BullMQ Worker per top-level queue key.
const workers = setupBullmqRouter(router, { connection })
// Attach extra listeners
workers.email.on('completed', (job) => console.log(job.id, 'done'))
// Graceful shutdown
await Promise.all(Object.values(workers).map(w => w.close()))Each top-level key in the router is treated as a separate BullMQ queue name.
Sandbox workers (sandboxOptions)
By default, every worker runs in the same Node.js process. For CPU-intensive or crash-prone jobs you can move specific queues into BullMQ's sandboxed processor mode — each job executes in an isolated child process.
| Field | Type | Description |
|---|---|---|
| routerPath | string | Absolute path to the file that exports your router as the default export. BullMQ uses this to load the processor in the child process. |
| workers | (keyof R)[] | Queue names that should run sandboxed. All other queues continue running in-process. |
| execArgv | string[] | undefined | array of additional Node.js runtime arguments passed to the Node.js process. |
const workers = setupBullmqRouter(router, {
connection: { host: 'localhost', port: 6379 },
sandboxOptions: {
routerPath: path.join(__dirname, 'router.js')
// or
// routerPath: new URL('./router.js', import.meta.url).pathname,
workers: ['email', 'pdf'], // these queues run in child processes
execArgv: process.env.NODE_ENV === "production" ? undefined : ['--import', 'tsx'],
},
})The file at
routerPathmust export the router as its default export — this is the entry point BullMQ forks for each sandboxed job.
Deterministic Job IDs
When jobIdComponents is set, the router builds a stable job ID from the job path and the specified data fields:
export default defineJob<{ to: string; subject: string }>(async (job) => {
await sendEmail(job.data)
})({
jobIdComponents: ["to"]
})
result ID: "email.send" + "." + "user@example-com" // sanitized to safe charactersThis allows deduplication, replacement, and cancellation by data rather than raw ID.
When jobIdComponents is omitted, BullMQ generates the ID automatically.
Job Cancellation Pattern
Jobs that are delayed can be cancelled without removing them from the queue. The router stamps the job data with __cancelledTime__ and promotes the job to the waiting state. When the worker picks it up, the handler detects the stamp and skips processing — the job completes immediately without side effects.
// Schedule a job with a 10-minute delay
await router.email.send.push(data, { delay: 10 * 60 * 1000 })
// Cancel it before it runs
await router.email.send.cancelDelayedJob(data)License
MIT
