@allwhere/sqs-connector
v1.0.3
Published
Context-aware SQS service for automatic context extraction from incoming messages and context injection for outgoing messages
Maintainers
Keywords
Readme
@allwhere/sqs-connector
A NestJS module for interacting with AWS SQS queues with built-in support for request context propagation.
Overview
This package provides two approaches for working with SQS:
- Legacy SQS Service - Direct SQS client wrapper for basic queue operations
- SQS Transport (NEW) - Full NestJS microservices integration with advanced features
Installation
yarn add @allwhere/sqs-connector @allwhere/audit-logging nestjs-clsNote:
@allwhere/audit-loggingandnestjs-clsare peer dependencies required for context propagation features.
Table of Contents
Legacy SQS Service
Note: The SQS Service is the original implementation for direct queue operations. For microservices architecture, see SQS Transport below.
Basic Setup
import { Module } from '@nestjs/common';
import { SQSModule } from '@allwhere/sqs-connector';
import { AuditLoggingContextModule, ContextResolverModule } from '@allwhere/audit-logging';
@Module({
imports: [
SQSModule.forRoot({
auditLoggingContextModule: AuditLoggingContextModule,
contextResolverModule: ContextResolverModule,
config: {
queueUrl: 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
region: 'us-east-2',
maxNumberOfMessages: 1,
visibilityTimeout: 60,
waitTimeSeconds: 0,
},
}),
],
})
export class AppModule {}Using the Service
import { Injectable } from '@nestjs/common';
import { SQSService } from '@allwhere/sqs-connector';
@Injectable()
export class MyService {
constructor(private readonly sqsService: SQSService) {}
async sendMessage(data: any) {
await this.sqsService.sendMessage({
event: 'MY_EVENT',
recordId: 'unique-id',
data,
});
}
async processMessages() {
const messages = await this.sqsService.receiveMessages();
for (const message of messages) {
await this.sqsService.deleteMessage(message);
}
}
}SQS Transport (Recommended)
The SQS Transport provides a full-featured NestJS microservices integration with automatic message handling, context propagation, and graceful shutdown.
Features
- NestJS Microservices Integration - Works seamlessly with NestJS @MessagePattern decorators
- Message Transformers - Automatic detection and transformation of S3, SNS, Binary, and standard messages
- CLS Context Isolation - Each message processed in isolated continuation-local storage context
- Request Context Propagation - Automatic extraction and restoration of X-Request-Context
- Heartbeat Management - Visibility timeout extension for long-running handlers
- Graceful Shutdown - Waits for in-flight messages to complete before shutdown
- Error Handling - Automatic retry via visibility timeout, DLQ integration
- TypeScript - Full type safety and IntelliSense support
Quick Start
1. Configure the Transport Module
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { SqsTransportModule } from '@allwhere/sqs-connector/transport';
@Module({
imports: [
SqsTransportModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
queueUrl: configService.get('SQS_QUEUE_URL'),
region: configService.get('AWS_REGION', 'us-west-2'),
batchSize: 10,
waitTimeSeconds: 20,
visibilityTimeoutSeconds: 300,
heartbeatIntervalMs: 45000,
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}2. Bootstrap the Microservice
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { SqsTransportServer } from '@allwhere/sqs-connector/transport';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const sqsTransport = app.get(SqsTransportServer);
app.connectMicroservice<MicroserviceOptions>({
strategy: sqsTransport,
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();3. Create Message Handlers
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class OrderController {
@MessagePattern('order.created')
async handleOrderCreated(data: any) {
console.log('Order created:', data);
// Process order...
}
@MessagePattern('order.updated')
async handleOrderUpdated(data: any) {
console.log('Order updated:', data);
// Process order update...
}
}Architecture
The SQS Transport follows a layered architecture:
SQS Queue → Transport Server → Transformer Factory → Transformers
↓
CLS Context Restoration
↓
NestJS Handler Registry
↓
@MessagePattern HandlersComponents:
- SqsTransportServer - Polls SQS queue, manages lifecycle, coordinates processing
- TransformerFactory - Auto-detects message type and selects appropriate transformer
- Transformers - Convert AWS message formats to standardized internal format
- CLS Service - Provides context isolation per message
- Handler Registry - Maps patterns to NestJS @MessagePattern handlers
Message Transformers
The transport uses a Strategy Pattern to handle different message formats. Transformers are evaluated in order of precedence:
Built-in Transformers
- S3EventTransformer - Handles S3 bucket event notifications
- SNSEventTransformer - Unwraps SNS notifications delivered via SQS
- BinaryDataTransformer - Handles binary message data
- BasicSQSTransformer - Fallback for standard JSON messages (must be last)
Transformer Selection
Each transformer implements canHandle(message) to determine if it can process a message. The factory returns the first matching transformer:
// Message with S3 event structure
{
Records: [{
eventSource: 'aws:s3',
s3: {
bucket: { name: 'my-bucket' },
object: { key: 'path/to/file.txt' }
}
}]
}
// → S3EventTransformer
// Message with SNS notification
{
Type: 'Notification',
Message: '{"orderId":"123"}',
TopicArn: 'arn:aws:sns:...'
}
// → SNSEventTransformer
// Standard message
{
pattern: 'order.created',
data: { orderId: '123' }
}
// → BasicSQSTransformerCustom Transformers
Create a custom transformer by extending BaseTransformer:
import { Injectable } from '@nestjs/common';
import { Message } from '@aws-sdk/client-sqs';
import { BaseTransformer } from '@allwhere/sqs-connector/transport/transformers';
import { SQSQueueMessage } from '@allwhere/sqs-connector/transport/transformers';
@Injectable()
export class CustomTransformer extends BaseTransformer {
canHandle(message: Message): boolean {
const body = this.parseMessageBody(message.Body);
return body?.source === 'custom-system';
}
transform(message: Message): SQSQueueMessage {
const body = this.parseMessageBody(message.Body);
const context = this.extractContext(message);
return {
messageId: message.MessageId || '',
receiptHandle: message.ReceiptHandle || '',
body,
messageAttributes: message.MessageAttributes || {},
attributes: message.Attributes || {},
content: {
data: body.payload,
pattern: body.eventType,
eventName: body.eventType,
context,
},
};
}
}Register your custom transformer before BasicSQSTransformer:
{
provide: TransformerFactory,
useFactory: (
custom: CustomTransformer,
s3: S3EventTransformer,
sns: SNSEventTransformer,
binary: BinaryDataTransformer,
basic: BasicSQSTransformer,
) => {
return new TransformerFactory([custom, s3, sns, binary, basic]);
},
inject: [
CustomTransformer,
S3EventTransformer,
SNSEventTransformer,
BinaryDataTransformer,
BasicSQSTransformer,
],
}Configuration
All configuration options with defaults:
| Option | Type | Required | Default | Description |
|--------|------|----------|---------|-------------|
| queueUrl | string | Yes | - | Full SQS queue URL |
| region | string | No | AWS SDK default | AWS region |
| batchSize | number | No | 10 | Messages per poll (1-10) |
| waitTimeSeconds | number | No | 20 | Long polling wait time (0-20) |
| visibilityTimeoutSeconds | number | No | 300 | Message visibility timeout (0-43200) |
| heartbeatIntervalMs | number | No | 45000 | Heartbeat interval for long handlers |
Environment Variables:
# Required
SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/123456789012/my-queue
# Optional
AWS_REGION=us-west-2
SQS_BATCH_SIZE=10
SQS_WAIT_TIME_SECONDS=20
SQS_VISIBILITY_TIMEOUT_SECONDS=300
SQS_HEARTBEAT_INTERVAL_MS=45000Context Propagation
The transport automatically propagates request context through message attributes:
Sending Messages with Context:
// Context is automatically added by SQSService
await sqsService.sendMessage({
event: 'order.created',
data: { orderId: '123' },
});The X-Request-Context is stored in message attributes and includes:
organizationId- Current organization IDcollaboratorPublicId- Current user/collaborator IDsource- Request sourcecorrelationId- Request correlation ID
Accessing Context in Handlers:
import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { ClsService } from 'nestjs-cls';
import { RequestContext } from '@allwhere/audit-logging';
@Controller()
export class OrderController {
constructor(private readonly cls: ClsService) {}
@MessagePattern('order.created')
async handleOrderCreated(data: any) {
const context = this.cls.get<RequestContext>('requestContext');
const orgId = this.cls.get<string>('organizationId');
console.log('Organization:', orgId);
console.log('Collaborator:', context.collaboratorPublicId);
// Context is automatically available for downstream operations
}
}Error Handling
Automatic Retry:
- On error, messages remain on queue and become visible after
visibilityTimeoutSeconds - Configure redrive policy in AWS to send to DLQ after max receive count
Message Deletion:
- Messages deleted automatically on successful handler completion
- On error, message left on queue for retry
- No manual deletion required
Error Logging:
- All errors logged with full context (messageId, pattern, error details)
- Stack traces included for debugging
Example:
@MessagePattern('order.created')
async handleOrderCreated(data: any) {
if (!data.orderId) {
// Error thrown, message will retry after visibility timeout
throw new Error('Missing orderId');
}
// Process successfully
// Message automatically deleted
}Testing
Unit Testing with Mock Transport:
import { Test } from '@nestjs/testing';
import { SqsTransportServer } from '@allwhere/sqs-connector/transport';
describe('OrderController', () => {
let controller: OrderController;
beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [OrderController],
providers: [
{
provide: SqsTransportServer,
useValue: {
listen: jest.fn(),
close: jest.fn(),
},
},
],
}).compile();
controller = module.get<OrderController>(OrderController);
});
it('should handle order created', async () => {
await controller.handleOrderCreated({
orderId: '123',
status: 'created',
});
// Assert expectations
});
});Integration Testing:
Use LocalStack or AWS SQS for integration tests. See order-worker service for examples.
Attribute Propagation System
The SQS Connector includes a flexible attribute propagation system that allows automatic extraction and restoration of context through SQS message attributes. This system is built on a provider-based architecture that is extensible and supports multiple context types.
Architecture Overview
The attribute propagation system consists of three main components:
- AttributeProvider Interface - Defines the contract for context providers
- AttributeProviderRegistry - Manages provider registration and execution
- Built-in Providers - Pre-built providers for common context types
AttributeProvider Interface
The AttributeProvider interface defines how context is extracted and processed:
import { AttributeProvider, MessageAttributeValue } from '@allwhere/sqs-connector';
export interface AttributeProvider {
// Unique identifier for this provider
readonly name: string;
// Execution priority (higher = executed first)
readonly priority: number;
// Extract attributes for outgoing messages
extractAttributes(): Promise<Record<string, MessageAttributeValue> | undefined>;
// Process attributes from incoming messages
processAttributes(attributes: Record<string, MessageAttributeValue>): Promise<void>;
// Optional cleanup method
cleanup?(): Promise<void>;
}Built-in Providers
RequestContextAttributeProvider
Propagates request context (organization ID, collaborator ID, correlation ID) through SQS messages.
Priority: 100 (highest)
Attributes: X-Request-Context
This provider is automatically registered when you import SQSModule. It extracts context from CLS (Continuation-Local Storage) and restores it when processing messages.
// Context is automatically propagated
await sqsService.sendMessage({
event: 'order.created',
data: { orderId: '123' },
});
// In the consumer, context is automatically restored
@MessagePattern('order.created')
async handleOrderCreated(data: any) {
const context = this.cls.get<RequestContext>('requestContext');
console.log('Organization:', context.organizationId);
}Creating Custom Providers
You can create custom providers to propagate additional context types:
import { Injectable, Logger } from '@nestjs/common';
import { AttributeProvider, MessageAttributeValue } from '@allwhere/sqs-connector';
@Injectable()
export class FeatureFlagProvider implements AttributeProvider {
public readonly name = 'FeatureFlagProvider';
public readonly priority = 75; // Medium priority
constructor(
private readonly featureFlagService: FeatureFlagService,
private readonly logger: Logger,
) {}
async extractAttributes(): Promise<Record<string, MessageAttributeValue> | undefined> {
try {
const flags = await this.featureFlagService.getCurrentFlags();
if (!flags || Object.keys(flags).length === 0) {
return undefined;
}
return {
'X-Feature-Flags': {
DataType: 'String',
StringValue: JSON.stringify(flags),
},
};
} catch (error) {
this.logger.error('Failed to extract feature flags', error);
return undefined;
}
}
async processAttributes(
attributes: Record<string, MessageAttributeValue>,
): Promise<void> {
try {
const flagsData = attributes['X-Feature-Flags']?.StringValue;
if (!flagsData) {
return;
}
const flags = JSON.parse(flagsData);
await this.featureFlagService.restoreFlags(flags);
this.logger.debug('Feature flags restored from message');
} catch (error) {
this.logger.error('Failed to restore feature flags', error);
}
}
}Registering Custom Providers
Register custom providers by accessing the AttributeProviderRegistry:
import { Module, OnModuleInit } from '@nestjs/common';
import { SQSModule, AttributeProviderRegistry } from '@allwhere/sqs-connector';
@Module({
imports: [
SQSModule.forRootAsync({
useFactory: (configService: ConfigService, clsService: ClsService) => ({
queueUrl: configService.get('QUEUE_URL'),
clsService,
}),
inject: [ConfigService, ClsService],
}),
],
providers: [FeatureFlagProvider],
})
export class AppModule implements OnModuleInit {
constructor(
private readonly registry: AttributeProviderRegistry,
private readonly featureFlagProvider: FeatureFlagProvider,
) {}
onModuleInit() {
// Register custom provider
this.registry.register(this.featureFlagProvider);
}
}Provider Priority System
Providers are executed in priority order (highest to lowest) during both extraction and processing:
- 100+ - Critical context (authentication, request context)
- 50-99 - Observability (tracing, metrics)
- 1-49 - Additional context (feature flags, custom metadata)
Example execution order:
- RequestContextProvider (priority: 100)
- CustomTracingProvider (priority: 75)
- FeatureFlagProvider (priority: 50)
Error Handling
The attribute propagation system is designed to be fault-tolerant:
- If one provider fails during extraction, other providers continue
- If one provider fails during processing, other providers continue
- All errors are logged with full context
- Failed providers don't block message publishing or consumption
// Even if FeatureFlagProvider throws an error,
// RequestContextProvider will still execute
const attributes = await registry.extractAllAttributes();
// Returns merged attributes from all successful providersBest Practices
- Keep extraction fast - Providers run synchronously during message publishing
- Handle errors gracefully - Return undefined instead of throwing
- Use appropriate priorities - Higher priority for critical context
- Document your providers - Include JSDoc comments with examples
- Test in isolation - Unit test providers independently
- Cleanup resources - Implement cleanup() for proper resource management
Contributing
- Fork the repository
- Create your feature branch
- Commit your changes
- Push to the branch
- Create a new Pull Request
License
Proprietary - Allwhere Inc.
