npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@yakocloud/bullmq-router

v2.1.1

Published

Bullmq router

Downloads

1,201

Readme

@yakocloud/bullmq-router

Type-safe file-system-style router for BullMQ. Define jobs as leaf nodes in a nested object, get fully typed push, replace, cancel, exec and more — automatically wired to the right queue and worker.

Why use bullmq-router?

BullMQ is powerful, but as your system grows, managing jobs becomes messy:

  • Job names are just strings → easy to break, hard to refactor
  • Queues, workers, and handlers are scattered across files
  • No clear structure for organizing jobs
  • Deduplication and idempotency require manual effort
  • Replacing or cancelling jobs is non-trivial
  • Testing job logic outside the queue is inconvenient

Installation

npm install @yakocloud/bullmq-router bullmq

bullmq is a peer dependency and must be installed separately.

Quick Start

1. Define jobs

// jobs/email/send.ts
import { defineJob } from '@yakocloud/bullmq-router'

export default defineJob<{ to: string; subject: string }>(async (job) => {
  await sendEmail(job.data)
})()

2. Build the router

Top-level keys define queue names — each key becomes a separate BullMQ queue. Nested keys are just grouping and don't affect the queue name.

// router.ts
import { createRouter } from '@yakocloud/bullmq-router'
import send from './jobs/email/send.js'
import invoice from './jobs/email/invoice.js'

export const router = createRouter({
  email: {       // <-- queue name: "email"
    send,        // job name: "email.send"
    invoice,     // job name: "email.invoice"
  },
})

Best practice: mirror the router structure in the file system. Each folder exports a queueRouter object that composes its children — the file tree becomes the job tree.

// myJobs/email/path/to/send.ts
export default defineJob<{ text: string }>((job) => {
  console.log(job.data.text)
})()

// myJobs/email/path/to/index.ts
import send from './send.js'
export const queueRouter = { send }

// myJobs/email/path/index.ts
import { queueRouter as to } from './to/index.js'
export const queueRouter = { to }

// myJobs/email/index.ts
import { queueRouter as path } from './path/index.js'
export const queueRouter = { path }

// router.ts
import { queueRouter as email } from './myJobs/email/index.js'
export const router = createRouter({
  email, // queue name: "email", job name: "email.path.to.send"
})

3. Set up workers and queues

// setup.ts
import { setupBullmqRouter } from '@yakocloud/bullmq-router'
import { router } from './router.js'

const workers = setupBullmqRouter(router, {
  connection: { host: 'localhost', port: 6379 },
})

4. Push jobs

import { router } from './router.js'

await router.email.send.push({ to: '[email protected]', subject: 'Hello' })

API

defineJob<T>(pop)(options?)

Defines a job leaf node.

| Parameter | Type | Description | |---|---|---| | pop | (job, jobName, queue) => Promise<void> | Handler executed when the job is dequeued | | options.jobIdComponents | (keyof T)[] | Fields used to build a deterministic job ID |

Returns a job definition object with the following methods:

.push(data, options?)

Adds a single job to the queue.

await router.email.send.push({ to: '[email protected]', subject: 'Hello' })

.pushBulk(items)

Adds multiple jobs in a single batched operation.

await router.email.send.pushBulk([
  { data: { to: '[email protected]', subject: 'Hi' } },
  { data: { to: '[email protected]', subject: 'Hey' } },
])

.replace(data, options?)

Replaces an existing job with the same ID, preserving its logs. If no existing job is found, a new one is created.

await router.email.send.replace({ to: '[email protected]', subject: 'Updated' })

.cancelDelayedJob(data | jobId)

Cancels a delayed job by stamping it with a cancellation time and immediately promoting it. The job handler will skip it automatically.

await router.email.send.cancelDelayedJob({ to: '[email protected]' })
// or by raw job ID:
await router.email.send.cancelDelayedJob('[email protected]')

Returns true if the job was found and cancelled, false otherwise.

.getJob(data | jobId)

Fetches a BullMQ Job instance by data or raw job ID. Returns undefined if not found.

const job = await router.email.send.getJob({ to: '[email protected]' })

.getFullJob(data | jobId)

Fetches a job along with its logs and current state in a single parallel request.

const job = await router.email.send.getFullJob({ to: '[email protected]' })
// job.state  — 'waiting' | 'active' | 'completed' | ...
// job.logs   — string[]

.exec(data)

Executes the job handler synchronously, bypassing the queue entirely. Useful for testing or one-off manual runs.

await router.email.send.exec({ to: '[email protected]', subject: 'Test' })

.toString()

Returns the fully qualified dot-separated job name (e.g. email.send). Job definitions are coercible to strings, so they can be used directly in template literals.

console.log(`${router.email.send}`) // "email.send"

createRouter<T>(nativeRouter)

Wraps a nested object of job definitions with a Proxy that automatically injects the correct JOB_PATH into every leaf node at access time. No manual path configuration needed.

const router = createRouter({
  email: { send, invoice },
  notifications: { push },
})

setupBullmqRouter(router, options)

Registers queues and workers for every top-level key in the router. Returns a map of BullMQ Worker instances keyed by queue name, so you can attach additional event listeners or shut workers down individually.

| Option | Type | Description | |---|---|---| | router | object | The router created by createRouter | | connection | ConnectionOptions | Default Redis connection for all queues and workers | | prefix | string \| undefined | Default queue prefix for all queues and workers | | queueOptions | Partial<Record<keyof R, QueueOptions>> | Per-queue overrides | | workerOptions | Partial<Record<keyof R, WorkerOptions>> | Per-worker overrides | | sandboxOptions | { routerPath: string; workers: (keyof R)[]; execArgv?: string[] } \| undefined | Run selected queues in a sandboxed child process (see below) |

Returns Record<keyof R, Worker> — one BullMQ Worker per top-level queue key.

const workers = setupBullmqRouter(router, { connection })

// Attach extra listeners
workers.email.on('completed', (job) => console.log(job.id, 'done'))

// Graceful shutdown
await Promise.all(Object.values(workers).map(w => w.close()))

Each top-level key in the router is treated as a separate BullMQ queue name.

Sandbox workers (sandboxOptions)

By default, every worker runs in the same Node.js process. For CPU-intensive or crash-prone jobs you can move specific queues into BullMQ's sandboxed processor mode — each job executes in an isolated child process.

| Field | Type | Description | |---|---|---| | routerPath | string | Absolute path to the file that exports your router as the default export. BullMQ uses this to load the processor in the child process. | | workers | (keyof R)[] | Queue names that should run sandboxed. All other queues continue running in-process. | | execArgv | string[] | undefined | array of additional Node.js runtime arguments passed to the Node.js process. |

const workers = setupBullmqRouter(router, {
  connection: { host: 'localhost', port: 6379 },
  sandboxOptions: {
    routerPath: path.join(__dirname, 'router.js')
    // or
    // routerPath: new URL('./router.js', import.meta.url).pathname,
    workers: ['email', 'pdf'],  // these queues run in child processes
    execArgv: process.env.NODE_ENV === "production" ? undefined : ['--import', 'tsx'],
  },
})

The file at routerPath must export the router as its default export — this is the entry point BullMQ forks for each sandboxed job.


Deterministic Job IDs

When jobIdComponents is set, the router builds a stable job ID from the job path and the specified data fields:

export default defineJob<{ to: string; subject: string }>(async (job) => {
  await sendEmail(job.data)
})({
  jobIdComponents: ["to"]
})
result ID: "email.send" + "." + "user@example-com"   // sanitized to safe characters

This allows deduplication, replacement, and cancellation by data rather than raw ID.

When jobIdComponents is omitted, BullMQ generates the ID automatically.


Job Cancellation Pattern

Jobs that are delayed can be cancelled without removing them from the queue. The router stamps the job data with __cancelledTime__ and promotes the job to the waiting state. When the worker picks it up, the handler detects the stamp and skips processing — the job completes immediately without side effects.

// Schedule a job with a 10-minute delay
await router.email.send.push(data, { delay: 10 * 60 * 1000 })

// Cancel it before it runs
await router.email.send.cancelDelayedJob(data)

License

MIT