@bvhoach2393/nest-rabbitmq
v1.0.4
Published
NestJS RabbitMQ library for handling message queues with consumer and producer services
Maintainers
Readme
@bvhoach2393/nest-rabbitmq
A comprehensive NestJS library for RabbitMQ integration that provides easy-to-use producer and consumer services with automatic connection management, message acknowledgment, and error handling.
Features
- 🚀 Easy Integration: Simple setup with NestJS modules
- 🔄 Auto-Reconnection: Automatic connection recovery and error handling
- 📤 Producer Service: Send messages to exchanges or queues
- 📥 Consumer Service: Consume messages with custom handlers
- ⚙️ Environment Configuration: Configuration through environment variables
- 🔒 Type Safety: Full TypeScript support with comprehensive interfaces
- 📊 Health Monitoring: Connection health check methods
- 🎯 Message Patterns: Support for request-reply messaging patterns
- 🧪 Well Tested: Comprehensive unit tests included
Installation
npm install @bvhoach2393/nest-rabbitmqPeer Dependencies
Make sure you have the following peer dependencies installed:
npm install @nestjs/common @nestjs/config @nestjs/core amqplib reflect-metadata rxjs
npm install -D @types/amqplibConfiguration
Environment Variables
Set up the following environment variables in your application:
# RabbitMQ Connection
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
# Exchanges, Routing Keys, and Queues (comma-separated, must have equal length)
RABBITMQ_EXCHANGES=user.exchange,order.exchange,notification.exchange
RABBITMQ_ROUTING_KEY=user.created,order.placed,notification.sent
RABBITMQ_QUEUES=user.queue,order.queue,notification.queueImportant: The number of exchanges, routing keys, and queues must be equal. Each exchange will be bound to its corresponding queue using the corresponding routing key.
Usage
1. Import the Module
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { NestRabbitmqModule } from '@bvhoach2393/nest-rabbitmq';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
NestRabbitmqModule.forRoot(), // For global usage
],
})
export class AppModule {}2. Producer Usage
import { Injectable } from '@nestjs/common';
import { NestRabbitmqService } from '@bvhoach2393/nest-rabbitmq';
@Injectable()
export class UserService {
constructor(private readonly rabbitmqService: NestRabbitmqService) {}
async createUser(userData: any) {
// Create user logic...
// Publish message to exchange
await this.rabbitmqService.publish(
{ userId: 123, action: 'created', data: userData },
{
exchange: 'user.exchange',
routingKey: 'user.created',
persistent: true,
timestamp: Date.now(),
}
);
// Or send directly to queue
await this.rabbitmqService.sendToQueue(
'user.queue',
{ userId: 123, action: 'created' },
{
persistent: true,
correlationId: 'unique-id-123',
}
);
}
}3. Consumer Usage
import { Injectable, OnModuleInit } from '@nestjs/common';
import { NestRabbitmqService, RabbitMQMessage } from '@bvhoach2393/nest-rabbitmq';
@Injectable()
export class UserConsumerService implements OnModuleInit {
constructor(private readonly rabbitmqService: NestRabbitmqService) {}
async onModuleInit() {
// Start consuming messages
await this.rabbitmqService.consume(
'user.queue',
this.handleUserMessage.bind(this),
{
noAck: false, // Manual acknowledgment
}
);
}
private async handleUserMessage(message: RabbitMQMessage): Promise<any> {
try {
const data = JSON.parse(message.content.toString());
console.log('Received user message:', data);
// Process the message
await this.processUser(data);
// Return response if replyTo is specified
if (message.properties.replyTo) {
return { status: 'success', userId: data.userId };
}
} catch (error) {
console.error('Error processing user message:', error);
throw error; // Will reject the message
}
}
private async processUser(data: any) {
// Your business logic here
}
}4. Request-Reply Pattern
@Injectable()
export class OrderService {
constructor(private readonly rabbitmqService: NestRabbitmqService) {}
// Producer side - send message with replyTo
async validateOrder(orderData: any) {
await this.rabbitmqService.sendToQueue(
'validation.queue',
orderData,
{
replyTo: 'validation.reply.queue',
correlationId: `order-${Date.now()}`,
}
);
}
// Consumer side - handle and reply
async onModuleInit() {
await this.rabbitmqService.consume(
'validation.queue',
async (message: RabbitMQMessage) => {
const orderData = JSON.parse(message.content.toString());
const isValid = await this.validateOrderData(orderData);
// Return response (will be sent to replyTo queue)
return { valid: isValid, orderId: orderData.id };
}
);
}
}5. Dynamic Exchange and Queue Management
@Injectable()
export class DynamicQueueService {
constructor(private readonly rabbitmqService: NestRabbitmqService) {}
async createTemporaryQueue(name: string) {
// Create exchange
await this.rabbitmqService.createExchange('temp.exchange', 'direct');
// Create queue
await this.rabbitmqService.createQueue(name, {
durable: false,
autoDelete: true,
});
// Bind queue to exchange
await this.rabbitmqService.bindQueue(name, 'temp.exchange', 'temp.key');
}
async cleanupTemporaryQueue(name: string) {
// Unbind and delete
await this.rabbitmqService.unbindQueue(name, 'temp.exchange', 'temp.key');
await this.rabbitmqService.deleteQueue(name);
await this.rabbitmqService.deleteExchange('temp.exchange');
}
}6. Health Check
@Injectable()
export class HealthService {
constructor(private readonly rabbitmqService: NestRabbitmqService) {}
checkRabbitMQHealth(): boolean {
return this.rabbitmqService.isConnectionHealthy();
}
getRabbitMQConfig() {
return this.rabbitmqService.getConfig();
}
}API Reference
NestRabbitmqService
Publishing Methods
publish(message: any, options: PublishOptions): Promise<boolean>sendToQueue(queue: string, message: any, options?: MessageOptions): Promise<boolean>
Consuming Methods
consume(queue: string, handler: MessageHandler, options?: ConsumeOptions): Promise<void>
Management Methods
createExchange(name: string, type?: string, options?: ExchangeOptions): Promise<void>createQueue(name: string, options?: QueueOptions): Promise<void>deleteExchange(name: string): Promise<void>deleteQueue(name: string): Promise<void>bindQueue(queue: string, exchange: string, routingKey: string): Promise<void>unbindQueue(queue: string, exchange: string, routingKey: string): Promise<void>
Utility Methods
isConnectionHealthy(): booleangetConfig(): RabbitMQConfig
Interfaces
interface PublishOptions extends MessageOptions {
exchange: string;
routingKey: string;
mandatory?: boolean;
immediate?: boolean;
}
interface MessageOptions {
persistent?: boolean;
priority?: number;
expiration?: string;
timestamp?: number;
correlationId?: string;
replyTo?: string;
messageId?: string;
headers?: Record<string, any>;
}
interface RabbitMQMessage {
content: Buffer;
fields: any;
properties: any;
}Running Tests
# Unit tests
npm run test
# Test coverage
npm run test:cov
# Watch mode
npm run test:watchError Handling
The library provides comprehensive error handling:
- Connection Errors: Automatic logging and error throwing for connection issues
- Missing Configuration: Clear error messages for missing environment variables
- Array Length Mismatch: Validation for exchanges, routing keys, and queues arrays
- Message Processing Errors: Proper message rejection and error logging
Best Practices
- Environment Validation: Always validate your environment variables before deploying
- Message Acknowledgment: Use manual acknowledgment (
noAck: false) for important messages - Error Handling: Implement proper try-catch blocks in message handlers
- Connection Health: Monitor connection health in production environments
- Resource Cleanup: Clean up unused exchanges and queues to avoid resource leaks
Publishing to NPM
# Build the library
npm run build
# Publish to NPM registry
npm publishContributing
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
For support and questions:
- Create an issue on the GitHub repository
- Check the documentation and examples above
- Review the TypeScript interfaces for detailed method signatures
Changelog
See CHANGELOG.md for detailed changes and version history.
