npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@pawells/nestjs-nats

v2.0.2

Published

NestJS NATS pub/sub integration module

Readme

NestJS NATS Module

GitHub Release CI npm version Node License: MIT GitHub Sponsors

NestJS module for NATS pub/sub integration with automatic subscriber discovery, request-reply patterns, and JetStream support.

Installation

yarn add @pawells/nestjs-nats @nats-io/transport-node @nats-io/jetstream

Requirements

  • Node.js: >= 22.0.0
  • NestJS: >= 10.0.0
  • @nats-io/transport-node: >= 3.0.0
  • @nats-io/jetstream: >= 3.0.0

Peer Dependencies

{
  "@nats-io/jetstream": ">=3.0.0",
  "@nats-io/transport-node": ">=3.0.0",
  "@nestjs/common": ">=10.0.0",
  "@nestjs/core": ">=10.0.0"
}

Quick Start

Module Setup

import { Module } from '@nestjs/common';
import { NatsModule } from '@pawells/nestjs-nats';

@Module({
  imports: [
    NatsModule.ForRoot({
      servers: 'nats://localhost:4222',
    }, true), // isGlobal defaults to false; pass true to register globally
  ],
})
export class AppModule {}

Using NatsService

import { Injectable } from '@nestjs/common';
import { NatsService } from '@pawells/nestjs-nats';

@Injectable()
export class OrderService {
  constructor(private readonly natsService: NatsService) {}

  publishOrder(order: Order): void {
    this.natsService.PublishJson('orders.created', order);
  }

  async getUser(userId: string): Promise<User> {
    return this.natsService.RequestJson<{ id: string }, User>(
      'users.get',
      { id: userId },
    );
  }

  subscribeToOrders(): void {
    this.natsService.Subscribe('orders.*', (msg) => {
      console.log('Order received:', msg.json());
    });
  }
}

Automatic Subscriber Discovery with @Subscribe

The NatsSubscriberRegistry automatically discovers and registers handler methods decorated with @Subscribe:

import { Injectable } from '@nestjs/common';
import { Subscribe } from '@pawells/nestjs-nats';
import type { Msg } from '@nats-io/transport-node';

@Injectable()
export class OrderHandler {
  @Subscribe('orders.created')
  async onOrderCreated(msg: Msg): Promise<void> {
    const order = msg.json<Order>();
    console.log('Order created:', order);
    // Handle order creation
  }

  @Subscribe('orders.updated')
  async onOrderUpdated(msg: Msg): Promise<void> {
    const order = msg.json<Order>();
    console.log('Order updated:', order);
  }

  @Subscribe('tasks.process', 'worker-pool')
  async processTask(msg: Msg): Promise<void> {
    const task = msg.json<Task>();
    console.log('Processing task in worker pool:', task);
    // Multiple instances share the queue group for load balancing
  }
}

Async Configuration

Using a Factory Function

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { NatsModule } from '@pawells/nestjs-nats';

@Module({
  imports: [
    ConfigModule.forRoot(),
    NatsModule.ForRootAsync(
      {
        imports: [ConfigModule],
        inject: [ConfigService],
        useFactory: async (configService: ConfigService) => ({
          servers: configService.get('NATS_SERVERS') || 'nats://localhost:4222',
          user: configService.get('NATS_USER'),
          pass: configService.get('NATS_PASS'),
        }),
      },
      true, // isGlobal
    ),
  ],
})
export class AppModule {}

Using a Class-Based Factory

import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { INatsOptionsFactory, TNatsModuleOptions } from '@pawells/nestjs-nats';

@Injectable()
export class NatsConfigService implements INatsOptionsFactory {
  constructor(private configService: ConfigService) {}

  async createNatsOptions(): Promise<TNatsModuleOptions> {
    return {
      servers: this.configService.get('NATS_SERVERS') || 'nats://localhost:4222',
      user: this.configService.get('NATS_USER'),
      pass: this.configService.get('NATS_PASS'),
    };
  }
}

@Module({
  imports: [
    ConfigModule.forRoot(),
    NatsModule.ForRootAsync(
      {
        useClass: NatsConfigService,
      },
      true, // isGlobal
    ),
  ],
})
export class AppModule {}

Reuse Existing Factory

@Module({
  imports: [
    ConfigModule.forRoot(),
    NatsModule.ForRootAsync(
      {
        useExisting: NatsConfigService,
      },
      true, // isGlobal
    ),
  ],
})
export class AppModule {}

Configuration Options

The TNatsModuleOptions type is an alias for the NATS client's NodeConnectionOptions. Key options include:

| Option | Type | Description | Default | |--------|------|-------------|---------| | servers | string \| string[] | NATS server URL(s) | 'nats://localhost:4222' | | user | string | Username for authentication | (optional) | | pass | string | Password for authentication | (optional) | | token | string | Token for authentication | (optional) | | timeout | number | Connection timeout in milliseconds | 5000 | | reconnect | boolean \| number | Enable auto-reconnect or max attempts | true | | pingInterval | number | Ping interval in milliseconds | 120000 | | maxReconnectAttempts | number | Maximum reconnection attempts | 60 |

Note: Sensitive fields (user, pass, token, authenticator) are automatically stripped from the publicly injectable NATS_MODULE_OPTIONS token for security.

Key Features

Core Publishing

// Publish raw string/binary message
natsService.Publish('orders.created', 'raw message');

// Publish JSON-serialized data
natsService.PublishJson('orders.updated', { id: 1, status: 'completed' });

Subscription with Message Handler

// Subscribe with automatic handler invocation
const sub = natsService.Subscribe('orders.*', (msg) => {
  const order = msg.json();
  console.log('Order:', order);
});

// Unsubscribe when done
sub.unsubscribe();

Request-Reply Pattern

// Send request and wait for reply
const reply = await natsService.Request('users.get', JSON.stringify({ id: 123 }));
const user = reply.json<User>();

// JSON request and response
const user = await natsService.RequestJson<{ id: number }, User>(
  'users.get',
  { id: 123 },
);

JetStream Integration

// Get JetStream client for persistent messaging
const js = natsService.Jetstream();
await js.publish('orders', 'message');

// Get JetStream manager for administration
const jsm = await natsService.JetstreamManager();
const streams = await jsm.streams.list();

Health Checks

// Check connection status
if (natsService.IsConnected()) {
  console.log('NATS is connected');
}

// Get raw connection for advanced usage
const conn = natsService.GetConnection();

Automatic Subscriber Discovery

The NatsSubscriberRegistry service automatically:

  1. Scans all NestJS providers and controllers after module initialization
  2. Finds methods decorated with @Subscribe(subject, [queue])
  3. Registers them as subscription handlers via NatsService.Subscribe()
  4. Logs registration with subject and optional queue group

Note: Due to NestJS dependency ordering, the NatsService connects during module init before NatsSubscriberRegistry registers handlers. This ordering is guaranteed and requires no manual configuration.

Handler Binding

Handlers are automatically bound to their class instance, so this context is preserved:

@Injectable()
export class OrderHandler {
  private readonly orderService: OrderService;

  constructor(orderService: OrderService) {
    this.orderService = orderService;
  }

  @Subscribe('orders.created')
  async onOrderCreated(msg: Msg): Promise<void> {
    // 'this' refers to OrderHandler instance
    await this.orderService.processOrder(msg.json());
  }
}

Error Handling

Handler errors are logged and do not crash the subscription:

@Subscribe('orders.created')
async onOrderCreated(msg: Msg): Promise<void> {
  // If this throws, the error is logged with context
  // The subscription continues listening for the next message
  await processOrder(msg.json());
}

Subscription errors (iterator closure, etc.) are logged at debug level.

Reconnection Behavior

The NATS client library automatically handles reconnection and re-establishes subscriptions. The NatsService monitors connection status and logs:

  • disconnect: When connection is temporarily lost
  • reconnecting: When attempting to reconnect
  • reconnect: When reconnection succeeds
  • error: When an async error occurs (with error details)
  • ldm: When the server enters lame duck mode

Security

  • Credentials Sanitization: User/pass/token are stripped from the public NATS_MODULE_OPTIONS token
  • Async Connection: Module waits for successful connection before becoming available
  • Graceful Shutdown: Connection is properly drained on application shutdown

Common Patterns

Conditional Subscription

@Injectable()
export class OrderHandler {
  @Subscribe('orders.created')
  async onOrderCreated(msg: Msg): Promise<void> {
    const order = msg.json<Order>();
    if (order.total > 1000) {
      // Handle high-value orders
      await this.handleHighValueOrder(order);
    }
  }
}

Batch Processing with Queue Groups

@Injectable()
export class BatchProcessor {
  @Subscribe('tasks.batch', 'batch-workers')
  async processBatch(msg: Msg): Promise<void> {
    const batch = msg.json<Batch>();
    // Multiple instances process batches concurrently
    // Load is distributed across the 'batch-workers' queue group
    await this.processBatchItems(batch);
  }
}

Multi-Subject Subscriptions

@Injectable()
export class EventHandler {
  constructor(private natsService: NatsService) {}

  onModuleInit(): void {
    // Subscribe to multiple subjects manually
    this.natsService.Subscribe('orders.*', msg => this.handleOrderEvent(msg));
    this.natsService.Subscribe('users.*', msg => this.handleUserEvent(msg));
    this.natsService.Subscribe('notifications.>', msg => this.handleNotification(msg));
  }

  private handleOrderEvent(msg: Msg): void { /* ... */ }
  private handleUserEvent(msg: Msg): void { /* ... */ }
  private handleNotification(msg: Msg): void { /* ... */ }
}

Related Packages

License

MIT