smart-queue-nestjs
v1.1.0
Published
A powerful yet simple BullMQ integration for NestJS applications
Downloads
174
Maintainers
Readme
smart-queue-nestjs
Author: Nurul Islam Rimon
Repository: https://github.com/nurulislamrimon/smart-queue-nestjs
Contributing: CONTRIBUTING.md
A powerful yet simple BullMQ integration for NestJS applications. Simplify queue management while exposing full BullMQ capabilities.
Features
- Queue Decorator - Register queues automatically with
@Queue("name") - Processor Decorator - Define worker classes with
@Processor("name") - Process Decorator - Handle specific job types with
@Process("handler") - Queue Service - Injectable service for producing jobs
- Typed Jobs - Full TypeScript generics support
- Retry Strategies - Easy retry configuration
- Rate Limiting - Built-in rate limiting support
- Job Events - Listen for completed, failed, progress events
- Queue Metrics - Expose waiting, active, completed, failed, delayed stats
- Error Handling - Worker failures won't crash your application
Installation
npm install smart-queue-nestjs bullmq ioredisCompatibility
| Package Version | NestJS Version | | --------------- | -------------- | | 1.x | 10.x - 11.x |
Quick Start
1. Configure the Module
import { Module } from "@nestjs/common";
import { SmartQueueModule } from "smart-queue-nestjs";
@Module({
imports: [
SmartQueueModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
],
})
export class AppModule {}2. Create a Producer Service
import { Injectable } from "@nestjs/common";
import { QueueService } from "smart-queue-nestjs";
interface EmailJobData {
to: string;
subject: string;
body: string;
}
@Injectable()
export class EmailService {
constructor(private readonly queue: QueueService) {}
async sendEmail(data: EmailJobData) {
await this.queue.add("email", "send-email", data);
}
}3. Create a Worker
import { Processor, Process, QueueService } from "smart-queue-nestjs";
import { Injectable, Logger } from "@nestjs/common";
interface EmailJobData {
to: string;
subject: string;
body: string;
}
@Processor("email", { concurrency: 5 })
@Injectable()
export class EmailProcessor {
private readonly logger = new Logger(EmailProcessor.name);
constructor(private readonly queue: QueueService) {
// Register event listeners
this.queue.on("email", "completed", (jobId, result) => {
this.logger.log(`Email job ${jobId} completed: ${result}`);
});
this.queue.on("email", "failed", (jobId, error) => {
this.logger.error(`Email job ${jobId} failed: ${error.message}`);
});
}
@Process("send-email")
async handleSendEmail({ id, data }: { id: string; data: EmailJobData }) {
this.logger.log(`Sending email to ${data.to}`);
// Your email sending logic here
return { success: true, messageId: `msg-${id}` };
}
}Advanced Usage
Async Configuration
import { Module } from "@nestjs/common";
import { SmartQueueModule } from "smart-queue-nestjs";
import { ConfigService } from "./config.service";
@Module({
imports: [
SmartQueueModule.forRootAsync({
useFactory: (config: ConfigService) => ({
connection: {
host: config.get("REDIS_HOST"),
port: config.get("REDIS_PORT"),
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}Delayed Jobs
await this.queue.delay("email", "send-email", data, 5000); // 5 seconds delayRepeating Jobs (Cron)
await this.queue.repeat(
"notifications",
"daily-digest",
{ userId: "123" },
"0 9 * * *", // Every day at 9 AM
);Rate Limiting
@Processor("rate-limited-queue", {
limiter: {
max: 10, // Max 10 jobs
duration: 1000, // Per 1 second
},
})
@Injectable()
export class RateLimitedProcessor {}Retry Strategy
this.queue.worker(
"my-queue",
async ({ id, data }) => {
// Your job logic
},
{
retryStrategy: {
attempts: 5,
backoff: {
type: "exponential",
delay: 1000,
},
},
},
);Queue Metrics
const metrics = await this.queue.getMetrics("email");
console.log(metrics);
// {
// waiting: 10,
// active: 2,
// completed: 100,
// failed: 5,
// delayed: 3,
// paused: 0
// }Remove Jobs
await this.queue.remove("email", "job-id-123");TypeScript Generics
The library supports full TypeScript generics for type-safe job payloads:
interface UserNotification {
userId: string;
message: string;
}
// Producer
await this.queue.add<UserNotification>('notifications', 'push', {
userId: '123',
message: 'Hello!',
});
// Worker
@Process('push')
async handlePushNotification({
id,
data
}: {
id: string;
data: UserNotification
}) {
// TypeScript knows data.userId and data.message
}Job Options
All BullMQ job options are supported:
await this.queue.add("queue-name", "job-name", data, {
jobId: "unique-id",
priority: 10,
delay: 1000,
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
timeout: 30000,
removeOnComplete: true,
removeOnFail: 100,
});Error Handling
Worker failures are handled gracefully and won't crash your application:
@Processor("safe-queue")
@Injectable()
export class SafeProcessor {
@Process("risky-operation")
async handleRiskyOperation({ id, data }: { id: string; data: any }) {
try {
// Risky operation
} catch (error) {
// Error is logged, job is marked as failed
// Application keeps running
throw error; // Optionally rethrow for retry logic
}
}
}API Reference
QueueService Methods
| Method | Description |
| ----------------------------------- | --------------------------- |
| add(queue, name, data, options) | Add a new job to the queue |
| delay(queue, name, data, delay) | Add a delayed job |
| repeat(queue, name, data, cron) | Add a repeating job (cron) |
| remove(queue, jobId) | Remove a job by ID |
| getJob(queue, jobId) | Get a job by ID |
| getMetrics(queue) | Get queue statistics |
| pause(queue) | Pause the queue |
| resume(queue) | Resume the queue |
| drain(queue) | Drain all waiting jobs |
| clean(queue, grace, status) | Clean completed/failed jobs |
| worker(queue, processor, options) | Create a worker |
| on(queue, event, handler) | Register event listener |
License
MIT
