npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@bvhoach2393/nest-rabbitmq

v1.0.4

Published

NestJS RabbitMQ library for handling message queues with consumer and producer services

Readme

@bvhoach2393/nest-rabbitmq

A comprehensive NestJS library for RabbitMQ integration that provides easy-to-use producer and consumer services with automatic connection management, message acknowledgment, and error handling.

Features

  • 🚀 Easy Integration: Simple setup with NestJS modules
  • 🔄 Auto-Reconnection: Automatic connection recovery and error handling
  • 📤 Producer Service: Send messages to exchanges or queues
  • 📥 Consumer Service: Consume messages with custom handlers
  • ⚙️ Environment Configuration: Configuration through environment variables
  • 🔒 Type Safety: Full TypeScript support with comprehensive interfaces
  • 📊 Health Monitoring: Connection health check methods
  • 🎯 Message Patterns: Support for request-reply messaging patterns
  • 🧪 Well Tested: Comprehensive unit tests included

Installation

npm install @bvhoach2393/nest-rabbitmq

Peer Dependencies

Make sure you have the following peer dependencies installed:

npm install @nestjs/common @nestjs/config @nestjs/core amqplib reflect-metadata rxjs
npm install -D @types/amqplib

Configuration

Environment Variables

Set up the following environment variables in your application:

# RabbitMQ Connection
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest

# Exchanges, Routing Keys, and Queues (comma-separated, must have equal length)
RABBITMQ_EXCHANGES=user.exchange,order.exchange,notification.exchange
RABBITMQ_ROUTING_KEY=user.created,order.placed,notification.sent
RABBITMQ_QUEUES=user.queue,order.queue,notification.queue

Important: The number of exchanges, routing keys, and queues must be equal. Each exchange will be bound to its corresponding queue using the corresponding routing key.

Usage

1. Import the Module

import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { NestRabbitmqModule } from '@bvhoach2393/nest-rabbitmq';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    NestRabbitmqModule.forRoot(), // For global usage
  ],
})
export class AppModule {}

2. Producer Usage

import { Injectable } from '@nestjs/common';
import { NestRabbitmqService } from '@bvhoach2393/nest-rabbitmq';

@Injectable()
export class UserService {
  constructor(private readonly rabbitmqService: NestRabbitmqService) {}

  async createUser(userData: any) {
    // Create user logic...
    
    // Publish message to exchange
    await this.rabbitmqService.publish(
      { userId: 123, action: 'created', data: userData },
      {
        exchange: 'user.exchange',
        routingKey: 'user.created',
        persistent: true,
        timestamp: Date.now(),
      }
    );

    // Or send directly to queue
    await this.rabbitmqService.sendToQueue(
      'user.queue',
      { userId: 123, action: 'created' },
      {
        persistent: true,
        correlationId: 'unique-id-123',
      }
    );
  }
}

3. Consumer Usage

import { Injectable, OnModuleInit } from '@nestjs/common';
import { NestRabbitmqService, RabbitMQMessage } from '@bvhoach2393/nest-rabbitmq';

@Injectable()
export class UserConsumerService implements OnModuleInit {
  constructor(private readonly rabbitmqService: NestRabbitmqService) {}

  async onModuleInit() {
    // Start consuming messages
    await this.rabbitmqService.consume(
      'user.queue',
      this.handleUserMessage.bind(this),
      {
        noAck: false, // Manual acknowledgment
      }
    );
  }

  private async handleUserMessage(message: RabbitMQMessage): Promise<any> {
    try {
      const data = JSON.parse(message.content.toString());
      console.log('Received user message:', data);

      // Process the message
      await this.processUser(data);

      // Return response if replyTo is specified
      if (message.properties.replyTo) {
        return { status: 'success', userId: data.userId };
      }
    } catch (error) {
      console.error('Error processing user message:', error);
      throw error; // Will reject the message
    }
  }

  private async processUser(data: any) {
    // Your business logic here
  }
}

4. Request-Reply Pattern

@Injectable()
export class OrderService {
  constructor(private readonly rabbitmqService: NestRabbitmqService) {}

  // Producer side - send message with replyTo
  async validateOrder(orderData: any) {
    await this.rabbitmqService.sendToQueue(
      'validation.queue',
      orderData,
      {
        replyTo: 'validation.reply.queue',
        correlationId: `order-${Date.now()}`,
      }
    );
  }

  // Consumer side - handle and reply
  async onModuleInit() {
    await this.rabbitmqService.consume(
      'validation.queue',
      async (message: RabbitMQMessage) => {
        const orderData = JSON.parse(message.content.toString());
        const isValid = await this.validateOrderData(orderData);
        
        // Return response (will be sent to replyTo queue)
        return { valid: isValid, orderId: orderData.id };
      }
    );
  }
}

5. Dynamic Exchange and Queue Management

@Injectable()
export class DynamicQueueService {
  constructor(private readonly rabbitmqService: NestRabbitmqService) {}

  async createTemporaryQueue(name: string) {
    // Create exchange
    await this.rabbitmqService.createExchange('temp.exchange', 'direct');
    
    // Create queue
    await this.rabbitmqService.createQueue(name, {
      durable: false,
      autoDelete: true,
    });
    
    // Bind queue to exchange
    await this.rabbitmqService.bindQueue(name, 'temp.exchange', 'temp.key');
  }

  async cleanupTemporaryQueue(name: string) {
    // Unbind and delete
    await this.rabbitmqService.unbindQueue(name, 'temp.exchange', 'temp.key');
    await this.rabbitmqService.deleteQueue(name);
    await this.rabbitmqService.deleteExchange('temp.exchange');
  }
}

6. Health Check

@Injectable()
export class HealthService {
  constructor(private readonly rabbitmqService: NestRabbitmqService) {}

  checkRabbitMQHealth(): boolean {
    return this.rabbitmqService.isConnectionHealthy();
  }

  getRabbitMQConfig() {
    return this.rabbitmqService.getConfig();
  }
}

API Reference

NestRabbitmqService

Publishing Methods

  • publish(message: any, options: PublishOptions): Promise<boolean>
  • sendToQueue(queue: string, message: any, options?: MessageOptions): Promise<boolean>

Consuming Methods

  • consume(queue: string, handler: MessageHandler, options?: ConsumeOptions): Promise<void>

Management Methods

  • createExchange(name: string, type?: string, options?: ExchangeOptions): Promise<void>
  • createQueue(name: string, options?: QueueOptions): Promise<void>
  • deleteExchange(name: string): Promise<void>
  • deleteQueue(name: string): Promise<void>
  • bindQueue(queue: string, exchange: string, routingKey: string): Promise<void>
  • unbindQueue(queue: string, exchange: string, routingKey: string): Promise<void>

Utility Methods

  • isConnectionHealthy(): boolean
  • getConfig(): RabbitMQConfig

Interfaces

interface PublishOptions extends MessageOptions {
  exchange: string;
  routingKey: string;
  mandatory?: boolean;
  immediate?: boolean;
}

interface MessageOptions {
  persistent?: boolean;
  priority?: number;
  expiration?: string;
  timestamp?: number;
  correlationId?: string;
  replyTo?: string;
  messageId?: string;
  headers?: Record<string, any>;
}

interface RabbitMQMessage {
  content: Buffer;
  fields: any;
  properties: any;
}

Running Tests

# Unit tests
npm run test

# Test coverage
npm run test:cov

# Watch mode
npm run test:watch

Error Handling

The library provides comprehensive error handling:

  • Connection Errors: Automatic logging and error throwing for connection issues
  • Missing Configuration: Clear error messages for missing environment variables
  • Array Length Mismatch: Validation for exchanges, routing keys, and queues arrays
  • Message Processing Errors: Proper message rejection and error logging

Best Practices

  1. Environment Validation: Always validate your environment variables before deploying
  2. Message Acknowledgment: Use manual acknowledgment (noAck: false) for important messages
  3. Error Handling: Implement proper try-catch blocks in message handlers
  4. Connection Health: Monitor connection health in production environments
  5. Resource Cleanup: Clean up unused exchanges and queues to avoid resource leaks

Publishing to NPM

# Build the library
npm run build

# Publish to NPM registry
npm publish

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

For support and questions:

  • Create an issue on the GitHub repository
  • Check the documentation and examples above
  • Review the TypeScript interfaces for detailed method signatures

Changelog

See CHANGELOG.md for detailed changes and version history.