npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

fastfast-pubsub

v1.0.11

Published

NestJS microservice transport for AWS SQS and SNS with pub/sub patterns, featuring message acknowledgment, batch processing, retry logic, and SNS fan-out support.

Readme

NestJS AWS Pub/Sub

A NestJS microservice transport for AWS SQS and SNS with pub/sub patterns, featuring message acknowledgment, batch processing, retry logic, and SNS fan-out support.

Features

  • 🚀 NestJS Microservice Transport - Custom transport strategy for AWS SQS/SNS
  • 📨 Message Pattern Support - @MessagePattern and @EventPattern decorators
  • Message Acknowledgment - Manual ack()/nack() and auto-ack based on return values
  • 🔄 Retry Logic - Configurable retry mechanisms for both SQS and SNS
  • 📦 Batch Processing - Handle multiple messages in a single handler
  • 🌐 SNS Fan-out - Support for SNS Topic → SQS fan-out patterns
  • 🔧 Cross-Service Compatibility - Handle messages from Laravel, other NestJS services
  • 📊 Observability - Built-in event system for monitoring
  • 🛡️ Type Safety - Full TypeScript support

Installation

npm install nestjs-aws-pubsub

Quick Start

1. Configure the Microservice

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { PubSubServer } from 'nestjs-aws-pubsub';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, {
    strategy: new PubSubServer({
        // Consumer configurations
        consumers: [
          {
            name: 'orders-queue',
            queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue',
            region: 'us-east-1',
          },
          {
            name: 'notifications-queue',
            queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/notifications-queue',
            region: 'us-east-1',
          },
        ],
        
        // Producer configurations
        producers: [
          {
            name: 'orders-producer',
            queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue',
            region: 'us-east-1',
          },
        ],
        
        // SNS configuration
        topics: [
          {
            name: 'orders',
            topicArn: 'arn:aws:sns:us-east-1:123456789012:orders-topic',
          },
          {
            name: 'notifications',
            topicArn: 'arn:aws:sns:us-east-1:123456789012:notifications-topic',
          },
        ],
        sns: {
          region: 'us-east-1',
        },
        
        // Serialization
        serializer: { serialize: (value: any) => value },
        deserializer: { deserialize: (value: any) => value },
        
        // Environment scoping (optional)
        scopedEnvKey: 'PROD',
      }),
  });

  // Listen to internal events for observability
  const pubSubServer = app.get(PubSubServer);
  pubSubServer.on('message_received', (message) => {
    console.log('Message received:', message.MessageId);
  });

  pubSubServer.on('message_processed', (message) => {
    console.log('Message processed:', message.MessageId);
  });

  pubSubServer.on('processing_error', () => {
    console.log('Error processing message');
  });

  await app.listen();
}
bootstrap();

2. Create Message Handlers

import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern } from '@nestjs/microservices';
import { PubSubContext, PubSubMessagePattern } from 'nestjs-aws-pubsub';

@Controller()
export class OrdersController {
  @MessagePattern('order_created')
  async handleOrderCreated(data: any, context: PubSubContext) {
    console.log('Processing order:', data);
    
    // Auto-ack: return true to acknowledge, false to reject
    return true;
  }

  @EventPattern('order_approved')
  async handleOrderApproved(data: any, context: PubSubContext) {
    console.log('Order approved:', data);
    
    // Manual acknowledgment
    await context.ack();
  }

  @PubSubMessagePattern('batch_orders', { batch: true })
  async handleBatchOrders(batch: Array<{ data: any; context: PubSubContext }>) {
    console.log('Processing batch of orders:', batch.length);
    
    for (const { data, context } of batch) {
      // Process each message
      await context.ack();
    }
  }
}

3. Send Messages

import { Injectable } from '@nestjs/common';
import { PubSubClient } from 'nestjs-aws-pubsub';

@Injectable()
export class OrdersService {
  constructor(private readonly pubSubClient: PubSubClient) {}

  async createOrder(orderData: any) {
    // Send to SQS queue
    await this.pubSubClient.sendMessage('order_created', orderData, {
      queueName: 'orders-queue',
    });

    // Send to SNS topic (fan-out to multiple SQS queues)
    await this.pubSubClient.sendMessage('order_approved', orderData, {
      topic: 'orders',
    });

    // Use emit for fire-and-forget
    this.pubSubClient.emit('order_notification', orderData, {
      topic: 'notifications',
    });
  }
}

Configuration Options

PubSubOptions

interface PubSubOptions {
  // Consumer configurations
  consumer?: PubSubConsumerOptions;
  consumers?: PubSubConsumerOptions[];
  
  // Producer configurations
  producer?: PubSubProducerOptions;
  producers?: PubSubProducerOptions[];
  
  // SNS configuration
  topics?: Array<{ name: string; topicArn: string }>;
  sns?: any; // AWS SNS client configuration
  
  // Environment scoping
  scopedEnvKey?: string;
  
  // Serialization
  serializer: Serializer;
  deserializer: Deserializer;
  
  // Logging
  logger?: LoggerService;
  
  // Graceful shutdown
  globalStopOptions?: StopOptions;
}

Consumer Options

interface PubSubConsumerOptions {
  name: string;
  queueUrl: string;
  region?: string;
  credentials?: any;
  stopOptions?: StopOptions;
  // ... other sqs-consumer options
}

Producer Options

interface PubSubProducerOptions {
  name: string;
  queueUrl: string;
  region?: string;
  credentials?: any;
  // ... other sqs-producer options
}

Advanced Features

Batch Processing

import { PubSubMessagePattern } from 'nestjs-aws-pubsub';

@Controller()
export class BatchController {
  @PubSubMessagePattern('batch_orders', { batch: true })
  async handleBatchOrders(batch: Array<{ data: any; context: PubSubContext }>) {
    // Process multiple messages at once
    for (const { data, context } of batch) {
      await this.processOrder(data);
      await context.ack();
    }
  }
}

SNS Fan-out Support

The library automatically handles SNS envelope unwrapping for messages sent via SNS Topic → SQS fan-out:

// Messages sent to SNS topics will be automatically unwrapped
@MessagePattern('order_created')
async handleOrderCreated(data: any, context: PubSubContext) {
  // This will work for messages sent directly to SQS
  // AND for messages sent via SNS fan-out
  console.log('Processing order:', data);
  return true;
}

Cross-Service Compatibility

Handle messages from non-NestJS services (e.g., Laravel):

// Laravel sends: { "pattern": "order_created", "data": {...} }
// NestJS automatically extracts pattern and data
@MessagePattern('order_created')
async handleOrderCreated(data: any, context: PubSubContext) {
  // Works seamlessly with Laravel or any other service
  return true;
}

Event Observability

// Listen to internal events
pubSubServer.on('message_received', (message) => {
  console.log('Message received:', message);
});

pubSubServer.on('message_processed', (message) => {
  console.log('Message processed:', message);
});

pubSubServer.on('processing_error', () => {
  console.log('Error processing message');
});

API Reference

PubSubContext

class PubSubContext {
  getMessage(): any;           // Get raw SQS message
  getPattern(): string;        // Get message pattern
  ack(): Promise<void>;        // Manually acknowledge message
  nack(): Promise<void>;       // Manually reject message
}

PubSubClient

class PubSubClient extends ClientProxy {
  // Send message to SQS or SNS
  sendMessage<T>(
    pattern: string,
    data: T,
    options?: {
      queueName?: string;
      topic?: string;
      topicArn?: string;
      type?: 'sqs' | 'sns';
    }
  ): Promise<void>;

  // Emit event (fire-and-forget)
  emit<TInput>(
    pattern: string,
    data: TInput,
    options?: { /* same as sendMessage */ }
  ): Observable<any>;

  // Dispatch event
  dispatchEvent(packet: any): Promise<any>;
}

PubSubMessagePattern Decorator

function PubSubMessagePattern(
  pattern: string, 
  options?: {
    batch?: boolean;
    retry?: number;
  }
)

Usage from Another Service

import { PubSubClient } from 'nestjs-aws-pubsub';

async function sendMessageExample() {
  const client = new PubSubClient({
    producers: [
      {
        name: 'orders-producer',
        queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue',
        region: 'us-east-1',
      },
    ],
    topics: [
      {
        name: 'orders',
        topicArn: 'arn:aws:sns:us-east-1:123456789012:orders-topic',
      },
    ],
    sns: {
      region: 'us-east-1',
    },
    serializer: { serialize: (value: any) => value },
    deserializer: { deserialize: (value: any) => value },
  });

  // Send to SQS
  await client.sendMessage('order_created', { 
    orderId: '123', 
    customerId: '456',
    amount: 99.99 
  }, { 
    queueName: 'orders-queue' 
  });

  // Send to SNS (fan-out)
  await client.sendMessage('order_approved', { 
    orderId: '123' 
  }, { 
    topic: 'orders' 
  });

  // Emit event
  client.emit('order_notification', { 
    orderId: '123',
    status: 'approved' 
  }, { 
    topic: 'notifications' 
  });
}

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the ISC License.