@message-in-the-middle/pipe-adapters-sqs
v0.1.2
Published
AWS SQS adapter for message pipeline orchestration
Maintainers
Readme
@message-in-the-middle/pipe-adapters-sqs
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
AWS SQS adapter for message pipeline orchestration
Queue adapter implementation for AWS SQS, enabling pipeline orchestration with SQS queues.
Installation
npm install @message-in-the-middle/pipe-core @message-in-the-middle/pipe-adapters-sqs @aws-sdk/client-sqsQuick Start
import { MessagePipelineOrchestrator } from '@message-in-the-middle/pipe-core';
import { SQSPipeAdapter } from '@message-in-the-middle/pipe-adapters-sqs';
import { SQSClient } from '@aws-sdk/client-sqs';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const pipeline = new MessagePipelineOrchestrator()
.source('orders', new SQSPipeAdapter({
client: sqsClient,
queueUrl: process.env.ORDERS_QUEUE_URL,
}))
.pipe('process', processOrder)
.destination('processed', new SQSPipeAdapter({
client: sqsClient,
queueUrl: process.env.PROCESSED_QUEUE_URL,
}))
.start();Features
- ✅ Long polling support (0-20 seconds configurable)
- ✅ Batch send (automatically chunks to 10 messages per batch)
- ✅ Message attributes support
- ✅ Visibility timeout configuration
- ✅ Full statistics tracking
- ✅ Graceful shutdown with in-flight message handling
API
SQSPipeAdapter Options
interface SQSPipeAdapterOptions {
client: SQSClient; // Required: AWS SQS client
queueUrl: string; // Required: SQS queue URL
name?: string; // Optional: Custom adapter name
defaultWaitTimeSeconds?: number; // Default: 20 (long polling)
defaultVisibilityTimeout?: number; // Default: 30 seconds
defaultMaxMessages?: number; // Default: 10 (AWS max)
messageAttributeNames?: string[]; // Optional: Specific attributes
receiveAllMessageAttributes?: boolean; // Default: true
logger?: Logger; // Optional: Logger instance
}Example with Options
const adapter = new SQSPipeAdapter({
client: sqsClient,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
name: 'my-queue',
defaultWaitTimeSeconds: 20, // Long polling
defaultVisibilityTimeout: 30, // 30 seconds visibility
defaultMaxMessages: 10, // Fetch 10 messages per poll
receiveAllMessageAttributes: true,
logger: console,
});Examples
See examples/pipelines for complete examples:
sqs-to-sqs-simple.ts- Basic SQS → SQS pipelinecross-queue-sqs-to-rabbitmq.ts- Cross-queue integrationmulti-stage-ecommerce.ts- Multi-stage processing
License
MIT
