npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@nam088/nestjs-rabbitmq

v1.2.0

Published

A comprehensive RabbitMQ module for NestJS with decorator-based message handling

Readme

@nam088/nestjs-rabbitmq

A comprehensive and production-ready RabbitMQ module for NestJS with decorator-based message handling, inspired by modern NestJS patterns.

Features

Decorator-Based API - Use @RabbitSubscribe to handle messages declaratively
Multi-Connection Support - Manage multiple RabbitMQ connections
Health Checks - Built-in health indicators for monitoring
Auto-Discovery - Automatic message handler registration (configurable scan scope)
TypeScript First - Full type safety and IntelliSense support
Exchange Patterns - Support for direct, topic, fanout exchanges
Message Patterns - Pub/Sub, Request/Reply, Work Queues
Error Handling - Built-in retry logic and dead letter queues
Well Tested - Comprehensive test coverage

Installation

npm install @nam088/nestjs-rabbitmq amqplib amqp-connection-manager
# or
yarn add @nam088/nestjs-rabbitmq amqplib amqp-connection-manager
# or
pnpm add @nam088/nestjs-rabbitmq amqplib amqp-connection-manager

Note: amqplib and amqp-connection-manager are peer dependencies. Install them in your application.

Quick Start

1. Import the Module

import { Module } from '@nestjs/common';
import { RabbitMQModule } from '@nam088/nestjs-rabbitmq';

@Module({
  imports: [
    RabbitMQModule.forRoot({
      uri: 'amqp://localhost:5672',
      connectionName: 'default',
    }),
  ],
})
export class AppModule {}

2. Create a Message Handler

import { Injectable } from '@nestjs/common';
import { RabbitSubscribe } from '@nam088/nestjs-rabbitmq';

@Injectable()
export class UserService {
  @RabbitSubscribe({
    exchange: 'users',
    routingKey: 'user.created',
    queue: 'user-service-queue',
  })
  async handleUserCreated(message: any) {
    console.log('User created:', message);
  }
}

3. Publish Messages

import { Injectable } from '@nestjs/common';
import { InjectRabbitMQ, RabbitMQService } from '@nam088/nestjs-rabbitmq';

@Injectable()
export class NotificationService {
  constructor(
    @InjectRabbitMQ() private readonly rabbitmq: RabbitMQService,
  ) {}

  async notifyUserCreated(userId: string) {
    await this.rabbitmq.publish('users', 'user.created', {
      userId,
      timestamp: new Date(),
    });
  }
}

Configuration

Basic Configuration

RabbitMQModule.forRoot({
  uri: 'amqp://localhost:5672',
  connectionName: 'default',
})

Advanced Configuration

RabbitMQModule.forRoot({
  uri: 'amqp://localhost:5672',
  connectionName: 'default',
  exchanges: [
    {
      name: 'users',
      type: 'topic',
      options: { durable: true },
    },
    {
      name: 'orders',
      type: 'direct',
      options: { durable: true },
    },
  ],
  connectionOptions: {
    heartbeatIntervalInSeconds: 5,
    reconnectTimeInSeconds: 10,
  },
})

Async Configuration

RabbitMQModule.forRootAsync({
  imports: [ConfigModule],
  useFactory: (config: ConfigService) => ({
    uri: config.get('RABBITMQ_URI'),
    connectionName: 'default',
  }),
  inject: [ConfigService],
})

Multiple Connections

@Module({
  imports: [
    RabbitMQModule.forRoot({
      uri: 'amqp://localhost:5672',
      connectionName: 'primary',
    }),
    RabbitMQModule.forRoot({
      uri: 'amqp://other-host:5672',
      connectionName: 'secondary',
    }),
  ],
})
export class AppModule {}

// Inject specific connection
@Injectable()
export class MyService {
  constructor(
    @InjectRabbitMQ('primary') private readonly primary: RabbitMQService,
    @InjectRabbitMQ('secondary') private readonly secondary: RabbitMQService,
  ) {}
}

Message Patterns

Pub/Sub Pattern

// Publisher
await rabbitmq.publish('events', 'user.updated', { userId: 123 });

// Subscriber
@RabbitSubscribe({
  exchange: 'events',
  routingKey: 'user.*',
  queue: 'analytics-service',
})
async handleUserEvents(message: any) {
  // Handle all user events
}

Work Queue Pattern

// Multiple workers sharing the same queue
@RabbitSubscribe({
  queue: 'heavy-tasks',
  queueOptions: { durable: true },
})
async processTask(task: any) {
  // Only one worker will process each task
}

RPC Pattern

Note: For RPC pattern, use @RabbitRPC decorator. The queue will be automatically declared when the handler is registered.

// Request
const result = await rabbitmq.request('calculator-rpc', { a: 10, b: 5, op: 'add' });

// Reply Handler
@RabbitRPC({
  queue: 'calculator-rpc',
  prefetchCount: 1,
})
async handleRPC(@RabbitPayload() data: { a: number; b: number; op: string }) {
  const result = performCalculation(data);
  return result; // Automatically sends reply back to requestor
}

Minimal example (add two numbers)

// Handler
@RabbitRPC({ queue: 'math.add', prefetchCount: 1 })
add(@RabbitPayload() payload: { a: number; b: number }) {
  return { ok: true, sum: Number(payload?.a ?? 0) + Number(payload?.b ?? 0) };
}

// Client
const res = await rabbitmq.request('math.add', { a: 5, b: 7 });
// -> { ok: true, sum: 12 }

Advanced Decorators

RPC Handler with @RabbitRPC

import { Injectable } from '@nestjs/common';
import { RabbitRPC, RabbitPayload } from '@nam088/nestjs-rabbitmq';

@Injectable()
export class CalculatorService {
  @RabbitRPC({
    queue: 'calculator-rpc',
    noAck: false,
    prefetchCount: 1,
  })
  async calculate(@RabbitPayload() data: { a: number; b: number; op: string }) {
    switch (data.op) {
      case 'add':
        return data.a + data.b;
      case 'multiply':
        return data.a * data.b;
      default:
        throw new Error('Unknown operation');
    }
  }
}

Message Handler with @RabbitHandler

import { Injectable } from '@nestjs/common';
import { RabbitHandler, RabbitPayload, RabbitMessage } from '@nam088/nestjs-rabbitmq';

@Injectable()
export class OrderService {
  @RabbitHandler({
    exchange: 'orders',
    routingKey: 'order.created',
    queue: 'order-processing',
  })
  async handleOrderCreated(
    @RabbitPayload() order: { id: string; amount: number },
    @RabbitMessage('properties') properties: any,
  ) {
    console.log('Order ID:', order.id);
    console.log('Message ID:', properties.messageId);
    
    // Process order
    await this.processOrder(order);
  }
}

Parameter Decorators

import { Injectable } from '@nestjs/common';
import { 
  RabbitSubscribe, 
  RabbitPayload, 
  RabbitMessage,
  RabbitContext 
} from '@nam088/nestjs-rabbitmq';

@Injectable()
export class MessageProcessor {
  // Extract entire payload
  @RabbitSubscribe({ queue: 'user-events' })
  async handleUser(@RabbitPayload() user: { id: string; name: string }) {
    console.log('User:', user);
  }

  // Extract specific field from payload
  @RabbitSubscribe({ queue: 'notifications' })
  async handleNotification(
    @RabbitPayload('userId') userId: string,
    @RabbitPayload('message') message: string,
  ) {
    console.log(`Send ${message} to user ${userId}`);
  }

  // Access full message context
  @RabbitSubscribe({ queue: 'logs' })
  async handleLog(
    @RabbitPayload() data: any,
    @RabbitContext() fullMessage: any,
  ) {
    console.log('Routing Key:', fullMessage.fields.routingKey);
    console.log('Exchange:', fullMessage.fields.exchange);
    console.log('Data:', data);
  }

  // Get message properties
  @RabbitSubscribe({ queue: 'tasks' })
  async handleTask(
    @RabbitPayload() task: any,
    @RabbitMessage('properties') props: any,
  ) {
    console.log('Correlation ID:', props.correlationId);
    console.log('Timestamp:', props.timestamp);
  }
}

Discovery & Performance

For large applications, you can limit scanning scope to speed up bootstrap and avoid scanning the whole app.

Options (RabbitMQModuleOptions)

RabbitMQModule.forRoot({
  uri: 'amqp://localhost:5672',
  // Disable discovery entirely (manual registration only)
  // autoDiscover: false,

  // Limit what to scan
  scanScope: 'all', // 'all' | 'modules' | 'providers' | 'annotated'

  // If using 'modules', specify which modules to scan
  // includeModules: [AppModule, 'PaymentsModule'],

  // If using 'providers', specify which providers to scan (and/or exclude)
  // includeProviders: [ConsumerService, 'BillingConsumerService'],
  // excludeProviders: ['SomeHeavyProvider'],
});

Annotated-only scanning

Use the @RabbitController() class decorator to mark consumer classes. When scanScope: 'annotated' is set, only these classes are scanned.

import { RabbitController, RabbitSubscribe, RabbitRPC } from '@nam088/nestjs-rabbitmq';

@RabbitController()
export class ConsumerService {
  @RabbitSubscribe({ queue: 'app.queue' })
  onMessage(msg: any) {}

  @RabbitRPC({ queue: 'app.rpc' })
  onRpc(@RabbitPayload() data: any) { return { ok: true, data }; }
}

Multi-Connection with Decorators

@Injectable()
export class MultiConnService {
  @RabbitRPC({
    queue: 'primary-rpc',
    connectionName: 'primary',
  })
  async handlePrimary(@RabbitPayload() data: any) {
    return { status: 'processed', data };
  }

  @RabbitHandler({
    queue: 'secondary-queue',
    connectionName: 'secondary',
    prefetchCount: 5,
  })
  async handleSecondary(@RabbitPayload() message: any) {
    console.log('From secondary connection:', message);
  }
}

Health Checks

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { RabbitMQHealthIndicator } from '@nam088/nestjs-rabbitmq';

@Module({
  imports: [TerminusModule],
  controllers: [HealthController],
  providers: [RabbitMQHealthIndicator],
})
export class HealthModule {}

@Controller('health')
export class HealthController {
  constructor(
    private health: HealthCheckService,
    private rabbitmq: RabbitMQHealthIndicator,
  ) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () => this.rabbitmq.isHealthy('default'),
    ]);
  }
}

API Reference

RabbitMQModule

  • forRoot(options) - Register with static configuration
  • forRootAsync(options) - Register with async configuration

RabbitMQService

  • publish(exchange, routingKey, message, options?) - Publish a message
  • sendToQueue(queue, message, options?) - Send to queue directly
  • request(queue, message, options?) - RPC request-reply
  • createChannel() - Get the underlying channel
  • getConnection() - Get the connection manager

Decorators

  • @RabbitSubscribe(options) - Subscribe to messages
  • @InjectRabbitMQ(connectionName?) - Inject RabbitMQ service
  • @RabbitRPC(options) - Mark method as RPC handler (request-reply pattern)
  • @RabbitHandler(options) - Generic message handler decorator
  • @RabbitPayload(property?) - Extract payload from message
  • @RabbitMessage(property?) / @RabbitContext(property?) - Get full message context
  • @RabbitController() - Mark class for annotated-only discovery

Examples

Check out the examples directory for complete working examples:

  • Basic Use - Simple pub/sub and RPC (includes math.add)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT © Nam088