npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@igniter-js/adapter-bullmq

v0.3.4

Published

[![NPM Version](https://img.shields.io/npm/v/@igniter-js/adapter-bullmq.svg)](https://www.npmjs.com/package/@igniter-js/adapter-bullmq) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Readme

@igniter-js/adapter-bullmq

NPM Version License: MIT

The official BullMQ adapter for the Igniter.js Queues system. This package provides a production-ready driver for handling background job processing using Redis.

Role in the Ecosystem

This adapter acts as a bridge between the abstract @igniter-js/core Queues system and the powerful BullMQ library. It implements the necessary logic to enqueue, schedule, and process jobs, allowing you to add robust background task capabilities to your Igniter.js application.

Installation

To use this adapter, you need to install it along with its peer dependencies: bullmq and a Redis client like ioredis.

# npm
npm install @igniter-js/adapter-bullmq bullmq ioredis

# yarn
yarn add @igniter-js/adapter-bullmq bullmq ioredis

# pnpm
pnpm add @igniter-js/adapter-bullmq bullmq ioredis

# bun
bun add @igniter-js/adapter-bullmq bullmq ioredis

Basic Usage

The primary export of this package is the createBullMQAdapter factory function. You use this to create a jobs instance, which then provides the tools (.router(), .register(), .merge()) to define your background jobs.

1. Create the Adapter and a Job Router

First, create an instance of the adapter and use it to define a router for a specific group of jobs.

// src/services/jobs.ts
import { createBullMQAdapter } from '@igniter-js/adapter-bullmq';
import { createRedisStoreAdapter } from '@igniter-js/adapter-redis'; // Often shares a Redis connection
import { Redis } from 'ioredis';
import { z } from 'zod';

// A single Redis client can be used for both Store and Queues
const redis = new Redis(process.env.REDIS_URL);
const store = createRedisStoreAdapter({ client: redis });

// 1. Create the BullMQ adapter instance
export const jobs = createBullMQAdapter({
  store, // The adapter requires a store for the Redis connection
  autoStartWorker: {
    concurrency: 5,
  },
});

// 2. Define a router for email-related jobs
const emailJobRouter = jobs.router({
  namespace: 'emails',
  jobs: {
    sendWelcome: jobs.register({
      input: z.object({ email: z.string().email() }),
      handler: async ({ payload, context }) => {
        context.logger.info(`Sending welcome email to ${payload.email}`);
        // Your email sending logic here...
        return { sent: true };
      },
    }),
  },
});

// 3. Merge all routers into a single configuration
export const REGISTERED_JOBS = jobs.merge({
  emails: emailJobRouter,
});

2. Register with the Igniter Builder

Pass the REGISTERED_JOBS object to the .jobs() method in your main igniter.ts file.

// src/igniter.ts
import { Igniter } from '@igniter-js/core';
import { REGISTERED_JOBS } from './services/jobs';

export const igniter = Igniter
  .context<AppContext>()
  .jobs(REGISTERED_JOBS)
  .create();

Your background job queue is now configured and ready to use. You can invoke jobs from your actions using igniter.jobs.emails.schedule({ task: 'sendWelcome', ... }).

Worker Control

The adapter provides granular control over workers via WorkerHandle:

// Start a worker and get a handle
const handle = await jobs.worker({
  queues: ['email-queue'],
  concurrency: 5
});

// Control the worker
await handle.pause();   // Pause processing
await handle.resume();  // Resume processing
await handle.close();   // Gracefully close

// Check status
console.log('Running:', handle.isRunning());
console.log('Paused:', handle.isPaused());

// Get metrics
const metrics = await handle.getMetrics();
console.log(`Processed: ${metrics.processed}, Failed: ${metrics.failed}`);

Rate Limiting

Control the rate at which jobs are processed to avoid overwhelming external APIs or services. This is particularly useful for:

  • Avoiding API rate limits (e.g., YouTube, Twitter, Stripe)
  • Preventing server overload
  • Throttling resource-intensive operations

Job-Level Rate Limiting

Configure rate limiting directly on individual job definitions:

import { z } from 'zod';

const downloadVideoJob = jobs.register({
  name: 'Download Video',
  input: z.object({ videoId: z.string() }),
  handler: async ({ input, context }) => {
    // Download logic here
    return { downloaded: true };
  },
  // Rate limiting for YouTube downloads to avoid 429 errors
  limiter: {
    max: 1,        // Maximum 1 job at a time
    duration: 30000, // 30 seconds between jobs
  },
});

Router-Level Rate Limiting

Apply rate limiting to all jobs in a router via defaultOptions:

const youtubeRouter = jobs.router({
  namespace: 'youtube',
  jobs: {
    downloadVideo: downloadVideoJob,
    extractAudio: extractAudioJob,
    getMetadata: getMetadataJob,
  },
  defaultOptions: {
    queue: { name: 'youtube-downloads' },
    // All jobs in this router share the same rate limit
    limiter: {
      max: 1,        // Maximum 1 job at a time
      duration: 30000, // 30 seconds between jobs
    },
  },
});

Worker-Level Rate Limiting

Configure rate limiting when starting a worker:

const handle = await jobs.worker({
  queues: ['youtube-downloads'],
  concurrency: 1,
  limiter: {
    max: 10,       // Process maximum 10 jobs
    duration: 60000, // Per minute
  },
});

Auto-Start Worker with Rate Limiting

Configure rate limiting in the adapter options for auto-started workers:

export const jobs = createBullMQAdapter({
  store,
  autoStartWorker: {
    concurrency: 1,
    limiter: {
      max: 1,
      duration: 30000,
    },
  },
});

Limiter Priority

When multiple limiters are configured, the following priority is applied:

  1. Job-level limiter takes precedence over router-level
  2. Router-level limiter (via defaultOptions) is used if job doesn't have one
  3. Worker-level limiter applies globally to all jobs in the queue

Note: Rate limiting is applied at the worker level in BullMQ. All jobs in the same queue share the same rate limit. Jobs that are rate-limited return to the "waiting" state until the limit resets.

Queue Management

Manage queues directly via adapter.queues:

// List all queues
const queues = await jobs.queues.list();

// Get specific queue info
const queueInfo = await jobs.queues.get('email-queue');
console.log(`Waiting: ${queueInfo.jobCounts.waiting}`);

// Control queues
await jobs.queues.pause('email-queue');
await jobs.queues.resume('email-queue');

// Clean old jobs
const cleaned = await jobs.queues.clean('email-queue', {
  status: ['completed', 'failed'],
  olderThan: 7 * 24 * 60 * 60 * 1000 // 7 days
});

// Drain waiting jobs
await jobs.queues.drain('email-queue');

Job Management

Control individual jobs via adapter.job:

// Get job info
const jobInfo = await jobs.job.get('job-123');
console.log(`Status: ${jobInfo?.status}`);

// Get job state
const state = await jobs.job.getState('job-123');

// Get job logs
const logs = await jobs.job.getLogs('job-123');

// Retry a failed job
await jobs.job.retry('job-123');

// Remove a job
await jobs.job.remove('job-123');

// Promote delayed job to immediate
await jobs.job.promote('job-123');

// Batch operations
await jobs.job.retryMany(['job-1', 'job-2', 'job-3']);
await jobs.job.removeMany(['job-1', 'job-2', 'job-3']);

Management API in Controllers

Access management APIs directly in your controllers via the proxy:

const adminController = igniter.controller({
  path: '/admin',
  actions: {
    // Pause queue processing
    pauseQueue: igniter.mutation({
      method: 'POST',
      path: '/queues/:queueName/pause',
      handler: async (ctx) => {
        await ctx.jobs.$queues.pause(ctx.params.queueName);
        return { paused: true };
      }
    }),
    
    // Retry a job
    retryJob: igniter.mutation({
      method: 'POST',
      path: '/jobs/:jobId/retry',
      handler: async (ctx) => {
        await ctx.jobs.$job.retry(ctx.params.jobId);
        return { retried: true };
      }
    }),
    
    // Get queue status
    queueStatus: igniter.query({
      path: '/queues/:queueName',
      handler: async (ctx) => {
        return await ctx.jobs.$queues.get(ctx.params.queueName);
      }
    }),
    
    // List active workers
    listWorkers: igniter.query({
      path: '/workers',
      handler: async (ctx) => {
        const workers = ctx.jobs.$workers;
        return Array.from(workers.entries()).map(([id, w]) => ({
          id,
          queueName: w.queueName,
          isRunning: w.isRunning(),
          isPaused: w.isPaused()
        }));
      }
    })
  }
});

For more detailed guides, please refer to the Official Igniter.js Wiki.

Contributing

Contributions are welcome! Please see the main CONTRIBUTING.md file for details on how to get started.

License

This package is licensed under the MIT License.