nestjs-sqs-transporter
v0.4.1
Published
NestJS SQS Custom Transporter following official microservice patterns with S3 large message support, DLQ handling, and OpenTelemetry observability
Maintainers
Readme
Tested with: AWS SQS and LocalStack.
Table of Contents
- Features
- Why nestjs-sqs-transporter?
- Installation
- Quick Start
- Advanced Features
- Testing
- API Reference
- Contributing
- License
Features
@EventPatterndecorator (fire-and-forget)- S3 large message support (>256KB)
- FIFO queue support with flexible configuration
- OpenTelemetry observability (optional)
- Testing utilities (MockServerSqs, MockClientSqs)
Why nestjs-sqs-transporter?
- Official NestJS patterns — Uses
@EventPatternandClientProxyjust like other NestJS transporters - S3 large message support — Automatically offloads payloads >256KB to S3, no manual handling
- FIFO queue support — Full support with dynamic
messageGroupIdanddeduplicationIdfunctions - OpenTelemetry ready — Optional tracing and metrics for production observability
- Testing utilities —
MockClientSqsfor easy unit testing without AWS - Flexible routing — Custom
patternKeyfor integration with external systems
Installation
npm i nestjs-sqs-transporterFor S3 large message support:
npm i @aws-sdk/client-s3Quick Start
Register the transporter
// main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { SQSClient } from '@aws-sdk/client-sqs';
import { ServerSqs } from 'nestjs-sqs-transporter';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice<MicroserviceOptions>({
strategy: new ServerSqs({
sqs: new SQSClient({ region: 'us-east-1' }),
consumerOptions: {
queueUrl: process.env.SQS_QUEUE_URL,
},
}),
});
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();Handle messages
import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { SqsContext } from 'nestjs-sqs-transporter';
@Controller()
export class MessageController {
@EventPattern('ORDER_CREATED')
async handleOrderCreated(
@Payload() data: OrderDto,
@Ctx() context: SqsContext,
) {
console.log(`Processing message ${context.getMessageId()}`);
// Process the order...
}
}Send messages
// app.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule } from '@nestjs/microservices';
import { SQSClient } from '@aws-sdk/client-sqs';
import { ClientSqs } from 'nestjs-sqs-transporter';
@Module({
imports: [
ClientsModule.register([
{
name: 'SQS_SERVICE',
customClass: ClientSqs,
options: {
sqs: new SQSClient({ region: 'us-east-1' }),
queueUrl: process.env.SQS_QUEUE_URL,
},
},
]),
],
})
export class AppModule {}// order.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class OrderService {
constructor(@Inject('SQS_SERVICE') private sqsClient: ClientProxy) {}
async createOrder(order: OrderDto) {
this.sqsClient.emit('ORDER_CREATED', order);
}
}Advanced Features
Custom Pattern Key
By default, messages use pattern as the field name for event routing. If your system uses type instead:
// Server - read pattern from 'type' field
new ServerSqs({
sqs: new SQSClient({ region: 'us-east-1' }),
consumerOptions: { queueUrl: '...' },
patternKey: 'type', // Reads from { type: 'ORDER_CREATED', data: {...} }
});
// Client - serialize pattern as 'type' field
{
customClass: ClientSqs,
options: {
sqs: new SQSClient({ region: 'us-east-1' }),
queueUrl: '...',
patternKey: 'type', // Sends as { type: 'ORDER_CREATED', data: {...} }
},
}S3 Large Messages
For messages larger than 256KB:
import { S3Client } from '@aws-sdk/client-s3';
// Server
new ServerSqs({
sqs: new SQSClient({ region: 'us-east-1' }),
consumerOptions: { queueUrl: '...' },
s3LargeMessage: {
enabled: true,
s3Client: new S3Client({ region: 'us-east-1' }),
bucket: 'my-large-messages-bucket',
// Optional: customize the pointer key (default: '__s3pointer')
pointerKey: 's3Pointer', // Results in { s3Pointer: { bucket, key } }
},
});
// Client
{
customClass: ClientSqs,
options: {
sqs: new SQSClient({ region: 'us-east-1' }),
queueUrl: '...',
s3LargeMessage: {
enabled: true,
s3Client: new S3Client({ region: 'us-east-1' }),
bucket: 'my-large-messages-bucket',
pointerKey: 's3Pointer', // Must match server config
},
},
}FIFO Queues
{
customClass: ClientSqs,
options: {
sqs: new SQSClient({ region: 'us-east-1' }),
queueUrl: 'https://sqs.../my-queue.fifo',
fifo: {
enabled: true,
messageGroupId: (pattern, data) => data.customerId,
deduplicationId: (pattern, data) => `${pattern}-${data.orderId}`,
},
},
}Observability
new ServerSqs({
// ...
observability: {
tracing: true,
metrics: true,
logging: { level: 'debug' },
},
});Testing
import { MockClientSqs } from 'nestjs-sqs-transporter';
describe('OrderService', () => {
let mockClient: MockClientSqs;
beforeEach(() => {
mockClient = new MockClientSqs();
});
it('should emit order created event', async () => {
const service = new OrderService(mockClient);
await service.createOrder({ id: '123' });
const event = mockClient.expectEventEmitted('ORDER_CREATED');
expect(event.data.id).toBe('123');
});
});API Reference
ServerSqs Options
| Option | Type | Required | Description |
|--------|------|----------|-------------|
| sqs | SQSClient | Yes | AWS SQS client |
| consumerOptions.queueUrl | string | Yes | Queue URL |
| consumerOptions.waitTimeSeconds | number | No | Long poll wait (default: 20) |
| consumerOptions.batchSize | number | No | Messages per poll (default: 10) |
| patternKey | string | No | Field name for pattern (default: pattern) |
| s3LargeMessage | object | No | S3 offloading config |
| observability | object | No | Tracing/metrics config |
ClientSqs Options
| Option | Type | Required | Description |
|--------|------|----------|-------------|
| sqs | SQSClient | Yes | AWS SQS client |
| queueUrl | string | Yes | Target queue URL |
| patternKey | string | No | Field name for pattern (default: pattern) |
| fifo | object | No | FIFO queue config |
| s3LargeMessage | object | No | S3 offloading config |
| observability | object | No | Tracing/metrics config |
SqsContext Methods
| Method | Description |
|--------|-------------|
| getMessage() | Original SQS message |
| getMessageId() | SQS message ID |
| getPattern() | Message pattern |
| getReceiptHandle() | Receipt handle |
| getApproximateReceiveCount() | Receive count |
Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
License
MIT
