npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@riktajs/queue

v0.11.5

Published

BullMQ-based job queue integration for Rikta Framework with lifecycle management and event-driven processing

Downloads

69

Readme

@riktajs/queue

BullMQ-based job queue integration for Rikta Framework with lifecycle management, event-driven processing, and optional Bull Board monitoring.

Features

  • 🚀 High Performance - Built on BullMQ for distributed job processing
  • 🎯 Decorator-based API - @Processor, @Process, @OnJobComplete, etc.
  • Full DI Support - @Autowired works in processors for service injection
  • �🔄 Lifecycle Integration - Seamless integration with Rikta's lifecycle hooks
  • 📡 Event System - Queue events emitted via Rikta's EventBus
  • Connection Pooling - Shared Redis connections for optimal performance
  • 📊 Optional Monitoring - Bull Board integration (bring your own dependency)
  • 🛡️ Type-safe - Full TypeScript support with generics and Zod validation
  • Scheduling - Delayed jobs, repeatable jobs, cron patterns

Installation

npm install @riktajs/queue bullmq

Note: ioredis is included as a direct dependency and will be installed automatically.

Optional: Bull Board Monitoring

npm install @bull-board/api @bull-board/fastify

Quick Start

1. Create a Processor

Processors support dependency injection via @Autowired:

import { Autowired } from '@riktajs/core';
import { Processor, Process, OnJobComplete, OnJobFailed } from '@riktajs/queue';
import { Job } from 'bullmq';

interface EmailJobData {
  to: string;
  subject: string;
  body: string;
}

@Processor('email-queue', { concurrency: 5 })
class EmailProcessor {
  // Inject services using @Autowired - fully supported!
  @Autowired(MailerService)
  private mailer!: MailerService;

  @Autowired(LoggerService)
  private logger!: LoggerService;

  @Process('send')
  async handleSendEmail(job: Job<EmailJobData>) {
    this.logger.info(`📧 Sending email to ${job.data.to}`);
    
    // Use injected services
    await this.mailer.send(job.data);
    
    return { sent: true, messageId: `msg-${job.id}` };
  }

  @Process('bulk-send')
  async handleBulkSend(job: Job<{ emails: EmailJobData[] }>) {
    for (const email of job.data.emails) {
      await this.mailer.send(email);
      await job.updateProgress(/* calculate progress */);
    }
    return { sent: job.data.emails.length };
  }

  @OnJobComplete()
  async onComplete(job: Job, result: unknown) {
    this.logger.info(`✅ Job ${job.id} completed:`, result);
  }

  @OnJobFailed()
  async onFailed(job: Job | undefined, error: Error) {
    this.logger.error(`❌ Job ${job?.id} failed:`, error.message);
  }
}

2. Configure the Provider

import { Rikta } from '@riktajs/core';
import { createQueueProvider } from '@riktajs/queue';

// Create and configure provider
const queueProvider = createQueueProvider({
  config: {
    redis: {
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
      password: process.env.REDIS_PASSWORD,
    },
    defaultConcurrency: 3,
    shutdownTimeout: 30000,
  },
});

// Register your processors
queueProvider.registerProcessors(EmailProcessor);

// Bootstrap your app
const app = await Rikta.create();
// Register the provider for lifecycle management

3. Add Jobs from Services

import { Injectable, Autowired } from '@riktajs/core';
import { QueueService } from '@riktajs/queue';

@Injectable()
class NotificationService {
  @Autowired()
  private queueService!: QueueService;

  async sendWelcomeEmail(userEmail: string) {
    // Add a single job
    await this.queueService.addJob('email-queue', 'send', {
      to: userEmail,
      subject: 'Welcome!',
      body: 'Thanks for signing up!',
    });
  }

  async sendDelayedReminder(userEmail: string) {
    // Add a delayed job (sends after 1 hour)
    await this.queueService.addDelayedJob(
      'email-queue',
      'send',
      {
        to: userEmail,
        subject: 'Don\'t forget!',
        body: 'Complete your profile.',
      },
      60 * 60 * 1000 // 1 hour
    );
  }

  async sendDailyDigest() {
    // Add a repeatable job (runs daily at 9am)
    await this.queueService.addRepeatableJob(
      'email-queue',
      'bulk-send',
      { emails: [] }, // Data populated at runtime
      { pattern: '0 9 * * *' } // Cron pattern
    );
  }

  async sendBulkEmails(emails: EmailJobData[]) {
    // Add multiple jobs in bulk
    const jobs = emails.map(email => ({
      name: 'send',
      data: email,
    }));
    
    await this.queueService.addJobs('email-queue', jobs);
  }
}

Configuration

Environment Variables

| Variable | Description | Default | |----------|-------------|---------| | QUEUE_REDIS_HOST | Redis host | localhost | | QUEUE_REDIS_PORT | Redis port | 6379 | | QUEUE_REDIS_PASSWORD | Redis password | - | | QUEUE_REDIS_DB | Redis database number | 0 | | QUEUE_REDIS_USERNAME | Redis username (ACL) | - | | QUEUE_DEFAULT_CONCURRENCY | Default worker concurrency | 1 | | QUEUE_SHUTDOWN_TIMEOUT | Graceful shutdown timeout (ms) | 30000 | | QUEUE_DASHBOARD_PATH | Bull Board path | /admin/queues | | QUEUE_DASHBOARD_ENABLED | Enable Bull Board | false |

Programmatic Configuration

const provider = createQueueProvider({
  config: {
    redis: {
      host: 'redis.example.com',
      port: 6379,
      password: 'secret',
      tls: true,
    },
    defaultConcurrency: 5,
    defaultRateLimiter: {
      max: 100,
      duration: 60000, // 100 jobs per minute
    },
    shutdownTimeout: 60000,
  },
  retryAttempts: 3,
  retryDelay: 5000,
});

Decorators

@Processor(queueName, options?)

Marks a class as a job processor for a specific queue.

@Processor('my-queue', {
  concurrency: 10,
  rateLimiter: { max: 100, duration: 60000 },
})
class MyProcessor { }

@Process(jobName?)

Marks a method as a job handler. If no name is provided, uses the method name.

@Process('send-email')
async handleSendEmail(job: Job) { }

@Process() // Uses 'processOrder' as job name
async processOrder(job: Job) { }

Event Decorators

| Decorator | Event | Signature | |-----------|-------|-----------| | @OnJobComplete() | Job completed | (job: Job, result: unknown) | | @OnJobFailed() | Job failed | (job: Job \| undefined, error: Error) | | @OnJobProgress() | Job progress updated | (job: Job, progress: number \| object) | | @OnJobStalled() | Job stalled | (jobId: string) | | @OnWorkerReady() | Worker ready | () | | @OnWorkerError() | Worker error | (error: Error) |

Dependency Injection in Processors

Processors fully support Rikta's dependency injection. Use @Autowired to inject services:

import { Autowired } from '@riktajs/core';
import { Processor, Process, QueueService, QUEUE_SERVICE } from '@riktajs/queue';
import { Job } from 'bullmq';

@Processor('order-queue')
class OrderProcessor {
  @Autowired(LoggerService)
  private logger!: LoggerService;

  @Autowired(DatabaseService)
  private db!: DatabaseService;

  @Autowired(QUEUE_SERVICE)
  private queueService!: QueueService;

  @Process('process-order')
  async handleOrder(job: Job) {
    this.logger.info(`Processing order ${job.data.orderId}`);
    
    await this.db.saveOrder(job.data);
    
    // Add a follow-up job to another queue
    await this.queueService.addJob('email-queue', 'send', {
      to: job.data.email,
      subject: 'Order Confirmed',
      body: `Order ${job.data.orderId} processed!`,
    });
  }
}

All injected services are resolved through Rikta's DI container, ensuring proper lifecycle management.

Validation with Zod

Use built-in Zod utilities for type-safe job validation:

import { createJobSchema, z, CommonJobSchemas } from '@riktajs/queue';

// Create custom schema
const OrderJobSchema = createJobSchema(z.object({
  orderId: z.string().uuid(),
  items: z.array(z.object({
    productId: z.string(),
    quantity: z.number().positive(),
  })),
  total: z.number().positive(),
}));

// Validate in processor
@Processor('orders')
class OrderProcessor {
  @Process('process')
  async handleOrder(job: Job) {
    const data = OrderJobSchema.validate(job.data);
    // data is now typed as { orderId: string, items: [...], total: number }
  }
}

// Use common schemas
const emailData = CommonJobSchemas.email.parse({
  to: '[email protected]',
  subject: 'Hello',
  body: 'World',
});

Common Job Schemas

  • CommonJobSchemas.email - Email job with to, subject, body, attachments
  • CommonJobSchemas.notification - User notifications
  • CommonJobSchemas.fileProcessing - File operations
  • CommonJobSchemas.webhook - HTTP webhook calls

Event System

Queue events are emitted to Rikta's EventBus:

import { EventBus } from '@riktajs/core';
import { QUEUE_EVENTS } from '@riktajs/queue';

@Injectable()
class MonitoringService {
  constructor(private eventBus: EventBus) {
    // Listen to queue events
    eventBus.on(QUEUE_EVENTS.JOB_COMPLETED, (payload) => {
      console.log(`Job ${payload.jobId} completed in ${payload.queueName}`);
    });

    eventBus.on(QUEUE_EVENTS.JOB_FAILED, (payload) => {
      console.error(`Job ${payload.jobId} failed: ${payload.error}`);
    });
  }
}

Available Events

| Event | Description | |-------|-------------| | queue:job:added | Job added to queue | | queue:job:completed | Job completed successfully | | queue:job:failed | Job failed | | queue:job:progress | Job progress updated | | queue:job:stalled | Job stalled | | queue:job:delayed | Job delayed | | queue:worker:ready | Worker ready | | queue:worker:error | Worker error |

Bull Board Dashboard (Optional)

import { registerBullBoard } from '@riktajs/queue';

// After app is created and queue provider initialized
await registerBullBoard(app.server, {
  queues: queueProvider.getAllQueues(),
  path: '/admin/queues',
  readOnly: false,
  auth: async (request) => {
    // Your authentication logic
    const token = request.headers.authorization;
    return validateAdminToken(token);
  },
});

Note: Bull Board packages must be installed separately:

npm install @bull-board/api @bull-board/fastify

QueueService API

Adding Jobs

// Single job
await queueService.addJob(queueName, jobName, data, options?);

// Multiple jobs (bulk)
await queueService.addJobs(queueName, [{ name, data, options? }]);

// Delayed job
await queueService.addDelayedJob(queueName, jobName, data, delayMs, options?);

// Repeatable job
await queueService.addRepeatableJob(queueName, jobName, data, repeatOptions);

Job Options

await queueService.addJob('queue', 'job', data, {
  attempts: 3,              // Retry attempts
  backoff: {
    type: 'exponential',    // 'fixed' | 'exponential'
    delay: 1000,
  },
  priority: 1,              // Lower = higher priority
  delay: 5000,              // Delay in ms
  deduplicationKey: 'id',   // Prevent duplicates
  removeOnComplete: true,   // Clean up completed jobs
  removeOnFail: false,      // Keep failed jobs for debugging
});

Queue Management

// Get job by ID
const job = await queueService.getJob(queueName, jobId);

// Get queue statistics
const stats = await queueService.getQueueStats(queueName);
// { waiting: 5, active: 2, completed: 100, failed: 3, delayed: 1, paused: 0 }

// Pause/Resume
await queueService.pauseQueue(queueName);
await queueService.resumeQueue(queueName);

// Clear jobs
await queueService.clearQueue(queueName, 'completed');
await queueService.clearQueue(queueName); // Clear all

// Get all queue names
const names = queueService.getQueueNames();

Error Handling

import {
  QueueNotFoundError,
  QueueConnectionError,
  QueueInitializationError,
  JobSchemaValidationError,
} from '@riktajs/queue';

try {
  await queueService.addJob('unknown-queue', 'job', {});
} catch (error) {
  if (error instanceof QueueNotFoundError) {
    console.error('Queue does not exist:', error.message);
  }
}

Best Practices

1. Use Type-Safe Job Data

interface MyJobData {
  userId: string;
  action: 'create' | 'update' | 'delete';
}

@Process('my-job')
async handle(job: Job<MyJobData>) {
  const { userId, action } = job.data; // Fully typed
}

2. Handle Failures Gracefully

@Process('risky-job')
async handle(job: Job) {
  try {
    await this.riskyOperation(job.data);
  } catch (error) {
    // Log for debugging
    console.error('Job failed:', error);
    // Re-throw to trigger retry
    throw error;
  }
}

3. Use Progress Updates for Long Jobs

@Process('long-job')
async handle(job: Job<{ items: string[] }>) {
  const { items } = job.data;
  
  for (let i = 0; i < items.length; i++) {
    await this.processItem(items[i]);
    await job.updateProgress(Math.round((i + 1) / items.length * 100));
  }
}

4. Configure Appropriate Concurrency

// CPU-intensive tasks: lower concurrency
@Processor('image-processing', { concurrency: 2 })

// I/O-bound tasks: higher concurrency
@Processor('api-calls', { concurrency: 20 })

License

MIT