npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@onivoro/server-aws-sqs

v24.33.2

Published

A NestJS module for integrating with AWS SQS (Simple Queue Service) using AWS SDK v3. This library provides a simplified service for message publishing, queue verification, message counting, and batch message processing.

Readme

@onivoro/server-aws-sqs

A NestJS module for integrating with AWS SQS (Simple Queue Service) using AWS SDK v3. This library provides a simplified service for message publishing, queue verification, message counting, and batch message processing.

Installation

This library is part of the Onivoro monorepo and should be used as an internal dependency.

Features

  • Message Publishing: Send JSON messages to SQS queues with error handling
  • Queue Verification: Verify queue existence and check permissions
  • Message Counting: Get approximate number of messages in queue
  • Batch Processing: Process messages in batches with automatic deletion
  • AWS SDK v3: Uses the latest AWS SDK with modern command-based API
  • Environment-Based Configuration: Simple configuration via environment variables

Quick Start

1. Module Configuration

import { ServerAwsSqsModule } from '@onivoro/server-aws-sqs';

@Module({
  imports: [
    ServerAwsSqsModule.configure({
      AWS_REGION: 'us-east-1',
      AWS_SQS_URL: 'https://sqs.us-east-1.amazonaws.com/account-id/queue-name',
      AWS_PROFILE: 'your-aws-profile', // optional
    }),
  ],
})
export class AppModule {}

2. Basic Usage

import { SqsService } from '@onivoro/server-aws-sqs';

@Injectable()
export class MessageService {
  constructor(private sqsService: SqsService) {}

  async sendMessage(data: any) {
    await this.sqsService.publish(data);
  }

  async processMessages(maxIterations: number = 10) {
    await this.sqsService.processMessageBatches(maxIterations);
  }

  async checkQueueHealth() {
    const isHealthy = await this.sqsService.verifyQueue();
    const messageCount = await this.sqsService.getApproximateNumberOfMessages();

    return { isHealthy, messageCount };
  }
}
@Injectable()
export class MessageProcessingService implements MessageHandler<T> {

    async handleMessage(messageBody: T, messageAttributes?: Record<string, any>): Promise<void> {

        const source = messageAttributes?.source || messageBody.source;
        const type = messageAttributes?.type || messageBody.type;
        const data = messageBody.data;

        // things and stuff...
    }
}

Configuration

ServerAwsSqsConfig

The configuration class requires these properties:

export class ServerAwsSqsConfig {
  AWS_PROFILE?: string;      // Optional AWS profile
  AWS_REGION: string;        // AWS region (required)
  AWS_SQS_URL: string;       // Full SQS queue URL (required)
}

Environment Variables

# Required
AWS_REGION=us-east-1
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue

# Optional
AWS_PROFILE=default

SqsService API

publish(event: TData)

Publishes a message to the configured SQS queue. The message is JSON-serialized automatically.

// Simple message
await sqsService.publish({ type: 'USER_CREATED', userId: '123' });

// Complex message
await sqsService.publish({
  eventType: 'ORDER_PROCESSED',
  orderId: 'order-456',
  customerData: {
    id: 'customer-789',
    email: '[email protected]'
  },
  timestamp: new Date().toISOString(),
  metadata: {
    source: 'order-service',
    version: '1.0'
  }
});

Error Handling: Errors are caught and logged to console, but the method does not throw.

verifyQueue()

Verifies that the queue exists and the service has proper permissions to access it.

try {
  await sqsService.verifyQueue();
  console.log('Queue is accessible');
} catch (error) {
  // Handle specific error types:
  // - AWS.SimpleQueueService.NonExistentQueue: Queue doesn't exist
  // - AccessDeniedException: Insufficient permissions
  console.error('Queue verification failed:', error);
}

Returns: Promise<boolean> - Returns true on success, throws on failure.

getApproximateNumberOfMessages()

Gets the approximate number of messages currently in the queue.

const messageCount = await sqsService.getApproximateNumberOfMessages();
console.log(`Queue has approximately ${messageCount} messages`);

// Use for monitoring
if (messageCount > 1000) {
  console.warn('Queue backlog is high!');
}

Returns: Promise<number> - The approximate message count.

processMessageBatches(maxIterations: number)

Processes messages from the queue in batches, automatically deleting successfully processed messages.

// Process up to 10 iterations (batches)
await sqsService.processMessageBatches(10);

// Process fewer batches for quick processing
await sqsService.processMessageBatches(3);

Batch Configuration:

  • MaxNumberOfMessages: 10 messages per batch
  • WaitTimeSeconds: 20 seconds (long polling)
  • VisibilityTimeout: 30 seconds
  • Auto-deletion: Messages are automatically deleted after retrieval

Processing Logic:

  1. Checks message count before each iteration
  2. Receives up to 10 messages with long polling
  3. Parses message bodies as JSON
  4. Logs received messages to console
  5. Automatically deletes all received messages
  6. Continues until max iterations or no messages remain

Usage Examples

Event Publishing Service

@Injectable()
export class EventPublisherService {
  constructor(private sqsService: SqsService) {}

  async publishUserEvent(userId: string, eventType: string, eventData: any) {
    const message = {
      eventType,
      userId,
      data: eventData,
      timestamp: new Date().toISOString(),
      correlationId: `user-${userId}-${Date.now()}`
    };

    await this.sqsService.publish(message);
  }

  async publishOrderEvents(orders: Array<{ id: string; status: string; customerId: string }>) {
    // Publish multiple events
    const publishPromises = orders.map(order =>
      this.sqsService.publish({
        type: 'ORDER_STATUS_CHANGED',
        orderId: order.id,
        customerId: order.customerId,
        newStatus: order.status,
        timestamp: new Date().toISOString()
      })
    );

    await Promise.allSettled(publishPromises);
  }
}

Queue Monitoring Service

@Injectable()
export class QueueMonitoringService {
  constructor(private sqsService: SqsService) {}

  async getQueueHealth() {
    try {
      const [isAccessible, messageCount] = await Promise.all([
        this.sqsService.verifyQueue().then(() => true).catch(() => false),
        this.sqsService.getApproximateNumberOfMessages().catch(() => -1)
      ]);

      return {
        accessible: isAccessible,
        messageCount,
        status: this.determineStatus(messageCount),
        timestamp: new Date().toISOString()
      };
    } catch (error) {
      return {
        accessible: false,
        messageCount: -1,
        status: 'ERROR',
        error: error.message,
        timestamp: new Date().toISOString()
      };
    }
  }

  private determineStatus(messageCount: number): string {
    if (messageCount < 0) return 'ERROR';
    if (messageCount === 0) return 'EMPTY';
    if (messageCount < 100) return 'NORMAL';
    if (messageCount < 1000) return 'BUSY';
    return 'OVERLOADED';
  }

  async startMonitoring(intervalSeconds: number = 60) {
    setInterval(async () => {
      const health = await this.getQueueHealth();
      console.log('Queue Health:', health);

      if (health.status === 'OVERLOADED') {
        console.warn('⚠️ Queue is overloaded! Consider scaling processing.');
      }
    }, intervalSeconds * 1000);
  }
}

Message Processing Worker

@Injectable()
export class MessageProcessorService {
  constructor(private sqsService: SqsService) {}

  async startProcessing(maxBatches: number = 50) {
    console.log(`Starting message processing (max ${maxBatches} batches)...`);

    const startTime = Date.now();
    await this.sqsService.processMessageBatches(maxBatches);
    const duration = Date.now() - startTime;

    console.log(`Processing completed in ${duration}ms`);
  }

  async processUntilEmpty(maxIterations: number = 100) {
    let iteration = 0;
    let messageCount = await this.sqsService.getApproximateNumberOfMessages();

    console.log(`Starting processing: ${messageCount} messages in queue`);

    while (messageCount > 0 && iteration < maxIterations) {
      await this.sqsService.processMessageBatches(1);
      messageCount = await this.sqsService.getApproximateNumberOfMessages();
      iteration++;

      console.log(`Iteration ${iteration}: ${messageCount} messages remaining`);

      // Brief pause between iterations
      await new Promise(resolve => setTimeout(resolve, 1000));
    }

    console.log(`Processing complete. Final count: ${messageCount} messages`);
  }
}

Scheduled Processing Service

@Injectable()
export class ScheduledProcessorService {
  private isProcessing = false;

  constructor(private sqsService: SqsService) {}

  @Cron('*/30 * * * * *') // Every 30 seconds
  async processMessages() {
    if (this.isProcessing) {
      console.log('Processing already in progress, skipping...');
      return;
    }

    this.isProcessing = true;

    try {
      const messageCount = await this.sqsService.getApproximateNumberOfMessages();

      if (messageCount > 0) {
        console.log(`Processing ${messageCount} messages...`);
        await this.sqsService.processMessageBatches(5); // Process up to 5 batches
      }
    } catch (error) {
      console.error('Scheduled processing failed:', error);
    } finally {
      this.isProcessing = false;
    }
  }
}

Error Handling

The service handles errors gracefully:

// Publishing - errors are logged but not thrown
await sqsService.publish({ data: 'test' }); // Won't throw even if queue is inaccessible

// Queue verification - throws on error for proper error handling
try {
  await sqsService.verifyQueue();
} catch (error) {
  if (error.name === 'AWS.SimpleQueueService.NonExistentQueue') {
    console.error('Queue does not exist');
  } else if (error.name === 'AccessDeniedException') {
    console.error('Insufficient permissions');
  }
}

// Message counting - throws on error
try {
  const count = await sqsService.getApproximateNumberOfMessages();
} catch (error) {
  console.error('Failed to get message count:', error);
}

// Batch processing - errors logged per batch, doesn't stop processing
await sqsService.processMessageBatches(10); // Continues even if some batches fail

Best Practices

1. Message Structure

Use consistent message structures for better processing:

interface StandardMessage {
  type: string;
  data: any;
  timestamp: string;
  correlationId?: string;
  source?: string;
  version?: string;
}

await sqsService.publish({
  type: 'USER_REGISTRATION',
  data: { userId: '123', email: '[email protected]' },
  timestamp: new Date().toISOString(),
  correlationId: 'reg-123',
  source: 'auth-service',
  version: '1.0'
});

2. Batch Processing Considerations

  • The service processes messages but doesn't provide custom message handling
  • Messages are automatically deleted after being received
  • Use moderate maxIterations values to avoid infinite processing
  • Monitor message counts before and after processing

3. Queue Monitoring

// Regular health checks
const health = await sqsService.verifyQueue().catch(() => false);
const count = await sqsService.getApproximateNumberOfMessages().catch(() => 0);

if (!health) {
  console.error('Queue health check failed');
}

if (count > threshold) {
  console.warn(`Queue backlog: ${count} messages`);
}

Testing

import { Test, TestingModule } from '@nestjs/testing';
import { ServerAwsSqsModule, SqsService } from '@onivoro/server-aws-sqs';

describe('SqsService', () => {
  let service: SqsService;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      imports: [
        ServerAwsSqsModule.configure({
          AWS_REGION: 'us-east-1',
          AWS_SQS_URL: 'https://sqs.us-east-1.amazonaws.com/123456789012/test-queue',
          AWS_PROFILE: 'test'
        }),
      ],
    }).compile();

    service = module.get<SqsService>(SqsService);
  });

  it('should be defined', () => {
    expect(service).toBeDefined();
  });

  it('should publish message without throwing', async () => {
    const testData = { type: 'TEST_EVENT', data: 'test' };
    await expect(service.publish(testData)).resolves.not.toThrow();
  });

  it('should get message count', async () => {
    const count = await service.getApproximateNumberOfMessages();
    expect(typeof count).toBe('number');
    expect(count).toBeGreaterThanOrEqual(0);
  });
});

API Reference

Configuration Classes

  • ServerAwsSqsConfig: Configuration class requiring AWS_REGION and AWS_SQS_URL

Modules

  • ServerAwsSqsModule: NestJS module with configure() method for setup

Services

  • SqsService: Main service with four methods:
    • publish<TData>(event: TData): Promise<void>
    • verifyQueue(): Promise<boolean>
    • getApproximateNumberOfMessages(): Promise<number>
    • processMessageBatches(maxIterations: number): Promise<void>

AWS SDK Dependencies

  • Uses @aws-sdk/client-sqs v3
  • Commands used: SendMessageCommand, GetQueueAttributesCommand, ReceiveMessageCommand, DeleteMessageBatchCommand

License

This library is licensed under the MIT License. See the LICENSE file in this package for details.