npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@allwhere/sqs-connector

v1.0.3

Published

Context-aware SQS service for automatic context extraction from incoming messages and context injection for outgoing messages

Readme

@allwhere/sqs-connector

A NestJS module for interacting with AWS SQS queues with built-in support for request context propagation.

Overview

This package provides two approaches for working with SQS:

  1. Legacy SQS Service - Direct SQS client wrapper for basic queue operations
  2. SQS Transport (NEW) - Full NestJS microservices integration with advanced features

Installation

yarn add @allwhere/sqs-connector @allwhere/audit-logging nestjs-cls

Note: @allwhere/audit-logging and nestjs-cls are peer dependencies required for context propagation features.

Table of Contents


Legacy SQS Service

Note: The SQS Service is the original implementation for direct queue operations. For microservices architecture, see SQS Transport below.

Basic Setup

import { Module } from '@nestjs/common';
import { SQSModule } from '@allwhere/sqs-connector';
import { AuditLoggingContextModule, ContextResolverModule } from '@allwhere/audit-logging';

@Module({
  imports: [
    SQSModule.forRoot({
      auditLoggingContextModule: AuditLoggingContextModule,
      contextResolverModule: ContextResolverModule,
      config: {
        queueUrl: 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
        region: 'us-east-2',
        maxNumberOfMessages: 1,
        visibilityTimeout: 60,
        waitTimeSeconds: 0,
      },
    }),
  ],
})
export class AppModule {}

Using the Service

import { Injectable } from '@nestjs/common';
import { SQSService } from '@allwhere/sqs-connector';

@Injectable()
export class MyService {
  constructor(private readonly sqsService: SQSService) {}

  async sendMessage(data: any) {
    await this.sqsService.sendMessage({
      event: 'MY_EVENT',
      recordId: 'unique-id',
      data,
    });
  }

  async processMessages() {
    const messages = await this.sqsService.receiveMessages();
    for (const message of messages) {
      await this.sqsService.deleteMessage(message);
    }
  }
}

SQS Transport (Recommended)

The SQS Transport provides a full-featured NestJS microservices integration with automatic message handling, context propagation, and graceful shutdown.

Features

  • NestJS Microservices Integration - Works seamlessly with NestJS @MessagePattern decorators
  • Message Transformers - Automatic detection and transformation of S3, SNS, Binary, and standard messages
  • CLS Context Isolation - Each message processed in isolated continuation-local storage context
  • Request Context Propagation - Automatic extraction and restoration of X-Request-Context
  • Heartbeat Management - Visibility timeout extension for long-running handlers
  • Graceful Shutdown - Waits for in-flight messages to complete before shutdown
  • Error Handling - Automatic retry via visibility timeout, DLQ integration
  • TypeScript - Full type safety and IntelliSense support

Quick Start

1. Configure the Transport Module

import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { SqsTransportModule } from '@allwhere/sqs-connector/transport';

@Module({
  imports: [
    SqsTransportModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        queueUrl: configService.get('SQS_QUEUE_URL'),
        region: configService.get('AWS_REGION', 'us-west-2'),
        batchSize: 10,
        waitTimeSeconds: 20,
        visibilityTimeoutSeconds: 300,
        heartbeatIntervalMs: 45000,
      }),
      inject: [ConfigService],
    }),
  ],
})
export class AppModule {}

2. Bootstrap the Microservice

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { SqsTransportServer } from '@allwhere/sqs-connector/transport';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const sqsTransport = app.get(SqsTransportServer);
  app.connectMicroservice<MicroserviceOptions>({
    strategy: sqsTransport,
  });

  await app.startAllMicroservices();
  await app.listen(3000);
}

bootstrap();

3. Create Message Handlers

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class OrderController {
  @MessagePattern('order.created')
  async handleOrderCreated(data: any) {
    console.log('Order created:', data);
    // Process order...
  }

  @MessagePattern('order.updated')
  async handleOrderUpdated(data: any) {
    console.log('Order updated:', data);
    // Process order update...
  }
}

Architecture

The SQS Transport follows a layered architecture:

SQS Queue → Transport Server → Transformer Factory → Transformers
                ↓
          CLS Context Restoration
                ↓
        NestJS Handler Registry
                ↓
         @MessagePattern Handlers

Components:

  1. SqsTransportServer - Polls SQS queue, manages lifecycle, coordinates processing
  2. TransformerFactory - Auto-detects message type and selects appropriate transformer
  3. Transformers - Convert AWS message formats to standardized internal format
  4. CLS Service - Provides context isolation per message
  5. Handler Registry - Maps patterns to NestJS @MessagePattern handlers

Message Transformers

The transport uses a Strategy Pattern to handle different message formats. Transformers are evaluated in order of precedence:

Built-in Transformers

  1. S3EventTransformer - Handles S3 bucket event notifications
  2. SNSEventTransformer - Unwraps SNS notifications delivered via SQS
  3. BinaryDataTransformer - Handles binary message data
  4. BasicSQSTransformer - Fallback for standard JSON messages (must be last)

Transformer Selection

Each transformer implements canHandle(message) to determine if it can process a message. The factory returns the first matching transformer:

// Message with S3 event structure
{
  Records: [{
    eventSource: 'aws:s3',
    s3: {
      bucket: { name: 'my-bucket' },
      object: { key: 'path/to/file.txt' }
    }
  }]
}
// → S3EventTransformer

// Message with SNS notification
{
  Type: 'Notification',
  Message: '{"orderId":"123"}',
  TopicArn: 'arn:aws:sns:...'
}
// → SNSEventTransformer

// Standard message
{
  pattern: 'order.created',
  data: { orderId: '123' }
}
// → BasicSQSTransformer

Custom Transformers

Create a custom transformer by extending BaseTransformer:

import { Injectable } from '@nestjs/common';
import { Message } from '@aws-sdk/client-sqs';
import { BaseTransformer } from '@allwhere/sqs-connector/transport/transformers';
import { SQSQueueMessage } from '@allwhere/sqs-connector/transport/transformers';

@Injectable()
export class CustomTransformer extends BaseTransformer {
  canHandle(message: Message): boolean {
    const body = this.parseMessageBody(message.Body);
    return body?.source === 'custom-system';
  }

  transform(message: Message): SQSQueueMessage {
    const body = this.parseMessageBody(message.Body);
    const context = this.extractContext(message);

    return {
      messageId: message.MessageId || '',
      receiptHandle: message.ReceiptHandle || '',
      body,
      messageAttributes: message.MessageAttributes || {},
      attributes: message.Attributes || {},
      content: {
        data: body.payload,
        pattern: body.eventType,
        eventName: body.eventType,
        context,
      },
    };
  }
}

Register your custom transformer before BasicSQSTransformer:

{
  provide: TransformerFactory,
  useFactory: (
    custom: CustomTransformer,
    s3: S3EventTransformer,
    sns: SNSEventTransformer,
    binary: BinaryDataTransformer,
    basic: BasicSQSTransformer,
  ) => {
    return new TransformerFactory([custom, s3, sns, binary, basic]);
  },
  inject: [
    CustomTransformer,
    S3EventTransformer,
    SNSEventTransformer,
    BinaryDataTransformer,
    BasicSQSTransformer,
  ],
}

Configuration

All configuration options with defaults:

| Option | Type | Required | Default | Description | |--------|------|----------|---------|-------------| | queueUrl | string | Yes | - | Full SQS queue URL | | region | string | No | AWS SDK default | AWS region | | batchSize | number | No | 10 | Messages per poll (1-10) | | waitTimeSeconds | number | No | 20 | Long polling wait time (0-20) | | visibilityTimeoutSeconds | number | No | 300 | Message visibility timeout (0-43200) | | heartbeatIntervalMs | number | No | 45000 | Heartbeat interval for long handlers |

Environment Variables:

# Required
SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/123456789012/my-queue

# Optional
AWS_REGION=us-west-2
SQS_BATCH_SIZE=10
SQS_WAIT_TIME_SECONDS=20
SQS_VISIBILITY_TIMEOUT_SECONDS=300
SQS_HEARTBEAT_INTERVAL_MS=45000

Context Propagation

The transport automatically propagates request context through message attributes:

Sending Messages with Context:

// Context is automatically added by SQSService
await sqsService.sendMessage({
  event: 'order.created',
  data: { orderId: '123' },
});

The X-Request-Context is stored in message attributes and includes:

  • organizationId - Current organization ID
  • collaboratorPublicId - Current user/collaborator ID
  • source - Request source
  • correlationId - Request correlation ID

Accessing Context in Handlers:

import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { ClsService } from 'nestjs-cls';
import { RequestContext } from '@allwhere/audit-logging';

@Controller()
export class OrderController {
  constructor(private readonly cls: ClsService) {}

  @MessagePattern('order.created')
  async handleOrderCreated(data: any) {
    const context = this.cls.get<RequestContext>('requestContext');
    const orgId = this.cls.get<string>('organizationId');

    console.log('Organization:', orgId);
    console.log('Collaborator:', context.collaboratorPublicId);

    // Context is automatically available for downstream operations
  }
}

Error Handling

Automatic Retry:

  • On error, messages remain on queue and become visible after visibilityTimeoutSeconds
  • Configure redrive policy in AWS to send to DLQ after max receive count

Message Deletion:

  • Messages deleted automatically on successful handler completion
  • On error, message left on queue for retry
  • No manual deletion required

Error Logging:

  • All errors logged with full context (messageId, pattern, error details)
  • Stack traces included for debugging

Example:

@MessagePattern('order.created')
async handleOrderCreated(data: any) {
  if (!data.orderId) {
    // Error thrown, message will retry after visibility timeout
    throw new Error('Missing orderId');
  }

  // Process successfully
  // Message automatically deleted
}

Testing

Unit Testing with Mock Transport:

import { Test } from '@nestjs/testing';
import { SqsTransportServer } from '@allwhere/sqs-connector/transport';

describe('OrderController', () => {
  let controller: OrderController;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      controllers: [OrderController],
      providers: [
        {
          provide: SqsTransportServer,
          useValue: {
            listen: jest.fn(),
            close: jest.fn(),
          },
        },
      ],
    }).compile();

    controller = module.get<OrderController>(OrderController);
  });

  it('should handle order created', async () => {
    await controller.handleOrderCreated({
      orderId: '123',
      status: 'created',
    });

    // Assert expectations
  });
});

Integration Testing:

Use LocalStack or AWS SQS for integration tests. See order-worker service for examples.


Attribute Propagation System

The SQS Connector includes a flexible attribute propagation system that allows automatic extraction and restoration of context through SQS message attributes. This system is built on a provider-based architecture that is extensible and supports multiple context types.

Architecture Overview

The attribute propagation system consists of three main components:

  1. AttributeProvider Interface - Defines the contract for context providers
  2. AttributeProviderRegistry - Manages provider registration and execution
  3. Built-in Providers - Pre-built providers for common context types

AttributeProvider Interface

The AttributeProvider interface defines how context is extracted and processed:

import { AttributeProvider, MessageAttributeValue } from '@allwhere/sqs-connector';

export interface AttributeProvider {
  // Unique identifier for this provider
  readonly name: string;

  // Execution priority (higher = executed first)
  readonly priority: number;

  // Extract attributes for outgoing messages
  extractAttributes(): Promise<Record<string, MessageAttributeValue> | undefined>;

  // Process attributes from incoming messages
  processAttributes(attributes: Record<string, MessageAttributeValue>): Promise<void>;

  // Optional cleanup method
  cleanup?(): Promise<void>;
}

Built-in Providers

RequestContextAttributeProvider

Propagates request context (organization ID, collaborator ID, correlation ID) through SQS messages.

Priority: 100 (highest) Attributes: X-Request-Context

This provider is automatically registered when you import SQSModule. It extracts context from CLS (Continuation-Local Storage) and restores it when processing messages.

// Context is automatically propagated
await sqsService.sendMessage({
  event: 'order.created',
  data: { orderId: '123' },
});

// In the consumer, context is automatically restored
@MessagePattern('order.created')
async handleOrderCreated(data: any) {
  const context = this.cls.get<RequestContext>('requestContext');
  console.log('Organization:', context.organizationId);
}

Creating Custom Providers

You can create custom providers to propagate additional context types:

import { Injectable, Logger } from '@nestjs/common';
import { AttributeProvider, MessageAttributeValue } from '@allwhere/sqs-connector';

@Injectable()
export class FeatureFlagProvider implements AttributeProvider {
  public readonly name = 'FeatureFlagProvider';
  public readonly priority = 75; // Medium priority

  constructor(
    private readonly featureFlagService: FeatureFlagService,
    private readonly logger: Logger,
  ) {}

  async extractAttributes(): Promise<Record<string, MessageAttributeValue> | undefined> {
    try {
      const flags = await this.featureFlagService.getCurrentFlags();
      if (!flags || Object.keys(flags).length === 0) {
        return undefined;
      }

      return {
        'X-Feature-Flags': {
          DataType: 'String',
          StringValue: JSON.stringify(flags),
        },
      };
    } catch (error) {
      this.logger.error('Failed to extract feature flags', error);
      return undefined;
    }
  }

  async processAttributes(
    attributes: Record<string, MessageAttributeValue>,
  ): Promise<void> {
    try {
      const flagsData = attributes['X-Feature-Flags']?.StringValue;
      if (!flagsData) {
        return;
      }

      const flags = JSON.parse(flagsData);
      await this.featureFlagService.restoreFlags(flags);
      this.logger.debug('Feature flags restored from message');
    } catch (error) {
      this.logger.error('Failed to restore feature flags', error);
    }
  }
}

Registering Custom Providers

Register custom providers by accessing the AttributeProviderRegistry:

import { Module, OnModuleInit } from '@nestjs/common';
import { SQSModule, AttributeProviderRegistry } from '@allwhere/sqs-connector';

@Module({
  imports: [
    SQSModule.forRootAsync({
      useFactory: (configService: ConfigService, clsService: ClsService) => ({
        queueUrl: configService.get('QUEUE_URL'),
        clsService,
      }),
      inject: [ConfigService, ClsService],
    }),
  ],
  providers: [FeatureFlagProvider],
})
export class AppModule implements OnModuleInit {
  constructor(
    private readonly registry: AttributeProviderRegistry,
    private readonly featureFlagProvider: FeatureFlagProvider,
  ) {}

  onModuleInit() {
    // Register custom provider
    this.registry.register(this.featureFlagProvider);
  }
}

Provider Priority System

Providers are executed in priority order (highest to lowest) during both extraction and processing:

  • 100+ - Critical context (authentication, request context)
  • 50-99 - Observability (tracing, metrics)
  • 1-49 - Additional context (feature flags, custom metadata)

Example execution order:

  1. RequestContextProvider (priority: 100)
  2. CustomTracingProvider (priority: 75)
  3. FeatureFlagProvider (priority: 50)

Error Handling

The attribute propagation system is designed to be fault-tolerant:

  • If one provider fails during extraction, other providers continue
  • If one provider fails during processing, other providers continue
  • All errors are logged with full context
  • Failed providers don't block message publishing or consumption
// Even if FeatureFlagProvider throws an error,
// RequestContextProvider will still execute
const attributes = await registry.extractAllAttributes();
// Returns merged attributes from all successful providers

Best Practices

  1. Keep extraction fast - Providers run synchronously during message publishing
  2. Handle errors gracefully - Return undefined instead of throwing
  3. Use appropriate priorities - Higher priority for critical context
  4. Document your providers - Include JSDoc comments with examples
  5. Test in isolation - Unit test providers independently
  6. Cleanup resources - Implement cleanup() for proper resource management

Contributing

  1. Fork the repository
  2. Create your feature branch
  3. Commit your changes
  4. Push to the branch
  5. Create a new Pull Request

License

Proprietary - Allwhere Inc.