npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@message-in-the-middle/aws

v0.1.3

Published

AWS integration for message-in-the-middle (SQS, SNS, DynamoDB, EventBridge)

Readme

@message-in-the-middle/aws

⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment

message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.

Why This Exists

Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.


AWS integration for message-in-the-middle - middleware for AWS messaging services (SQS, SNS, DynamoDB, EventBridge).

Features

  • SQS Consumer & Publisher - Receive and send messages to Amazon SQS
  • SNS Publisher - Publish messages to Amazon SNS topics
  • EventBridge Publisher - Publish events to Amazon EventBridge
  • DynamoDB Store - Deduplication store using DynamoDB

Installation

npm install @message-in-the-middle/aws @message-in-the-middle/core
# Install AWS SDK v3 clients (peer dependencies)
npm install @aws-sdk/client-sqs @aws-sdk/client-sns @aws-sdk/client-eventbridge @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb

Quick Start

SQS Consumer

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue',
  parseJson: true,
  autoDelete: true
});

// Add your business logic
manager.use(async (context, next) => {
  console.log('Processing message:', context.message);
  // Your processing logic here
  await next();
});

// IMPORTANT: Clean up resources on shutdown
process.on('SIGTERM', async () => {
  await manager.destroy();
  process.exit(0);
});

// Process SQS messages
const sqsMessage = {
  Body: JSON.stringify({ orderId: '123' }),
  ReceiptHandle: 'xxx',
  MessageId: 'yyy'
};

await manager.processInbound(sqsMessage.Body, sqsMessage);

SQS Publisher

import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSPublisherPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });

const manager = createSQSPublisherPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
});

// Send a message
await manager.processOutbound({ orderId: '123', status: 'pending' });

SNS Publisher

import { SNSClient } from '@aws-sdk/client-sns';
import { createSNSPublisherPipeline } from '@message-in-the-middle/aws';

const snsClient = new SNSClient({ region: 'us-east-1' });

const manager = createSNSPublisherPipeline({
  client: snsClient,
  topicArn: 'arn:aws:sns:us-east-1:123456789:my-topic',
  subject: 'Order Notification'
});

// Publish a message
await manager.processOutbound({ orderId: '123', event: 'created' });

EventBridge Publisher

import { EventBridgeClient } from '@aws-sdk/client-eventbridge';
import { createEventBridgePublisherPipeline } from '@message-in-the-middle/aws';

const ebClient = new EventBridgeClient({ region: 'us-east-1' });

const manager = createEventBridgePublisherPipeline({
  client: ebClient,
  source: 'myapp.orders',
  detailType: 'Order Event'
});

// Publish an event
await manager.processOutbound({
  eventType: 'created',
  orderId: '123'
});

DynamoDB Deduplication

import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { SQSClient } from '@aws-sdk/client-sqs';
import { createSQSConsumerPipeline } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

const manager = createSQSConsumerPipeline({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  deduplication: {
    dynamoClient: dynamoClient,
    tableName: 'message-deduplication',
    ttlSeconds: 86400 // 24 hours
  }
});

SQS Poller (Production-Ready Polling)

The SQSPoller provides production-ready SQS message polling with concurrency control, graceful shutdown, and event-driven lifecycle hooks.

Basic Usage

import { SQSClient } from '@aws-sdk/client-sqs';
import { SQSPoller } from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const poller = new SQSPoller(sqsClient, { logger: console });

// Start polling - returns QueueController
const ordersQueue = poller.start({
  queueUrl: process.env.ORDERS_QUEUE_URL,
  manager: ordersManager,
  name: 'orders',
  concurrency: 10,
  waitTimeSeconds: 20 // Long polling
});

// Per-queue event handlers (eliminates if-else chains!)
ordersQueue.on('message:processed', (message, duration) => {
  console.log(`Order processed in ${duration}ms`, { messageId: message.MessageId });
});

ordersQueue.on('message:failed', (message, error) => {
  console.error('Order processing failed', { messageId: message.MessageId, error });
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await ordersQueue.stop();
});

Two Event Systems: Semantic Separation

The poller provides two complementary event systems:

  1. Global Events (on SQSPoller) - For cross-queue system concerns
  2. Per-Queue Events (on QueueController) - For queue-specific business logic
const poller = new SQSPoller(sqsClient, { logger });

// Global events: System-wide metrics across ALL queues
poller.on('message:processed', (queueName, message, duration) => {
  metrics.timing('sqs.duration', duration, { queue: queueName });
  apm.track({ event: 'sqs.processed', queue: queueName, duration });
});

// Per-queue events: Queue-specific business logic (no if-else chains!)
const ordersQueue = poller.start({ name: 'orders', ... });
ordersQueue.on('message:processed', async (message, duration) => {
  const order = JSON.parse(message.Body);
  await db.orders.update(order.orderId, { status: 'processed', duration });
  await sendOrderConfirmation(order.customerId);
});

const notificationsQueue = poller.start({ name: 'notifications', ... });
notificationsQueue.on('message:processed', (message, duration) => {
  logger.info('Notification sent', { messageId: message.MessageId, duration });
});

Benefits:

  • ✅ No more if-else chains for queue-specific handling
  • ✅ Type-safe per-queue events with full IntelliSense
  • ✅ Clean separation of concerns (system vs business logic)
  • ✅ Easy to add new queues (no scattered updates)

For comprehensive documentation, see SQS Poller Guide.

⚠️ Best Practices

CRITICAL: Reuse AWS SDK Client Instances

Always reuse AWS SDK client instances across your application. Creating multiple client instances causes connection pool duplication, leading to memory leaks and resource exhaustion.

✅ CORRECT - Reuse clients:

// Create clients ONCE at application startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const snsClient = new SNSClient({ region: 'us-east-1' });
const dynamoClient = new DynamoDBClient({ region: 'us-east-1' });

// Reuse the same client instances for all pipelines
const consumerManager = createSQSConsumerPipeline({
  client: sqsClient, // ✅ Reusing client
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/input-queue'
});

const publisherManager = createSQSPublisherPipeline({
  client: sqsClient, // ✅ Same client instance
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
});

const snsManager = createSNSPublisherPipeline({
  client: snsClient, // ✅ Reusing SNS client
  topicArn: 'arn:aws:sns:us-east-1:123:my-topic'
});

❌ WRONG - Creating new clients causes memory leaks:

// ❌ DON'T DO THIS - Creates new connection pool each time!
for (let i = 0; i < 1000; i++) {
  const manager = createSQSConsumerPipeline({
    client: new SQSClient({ region: 'us-east-1' }), // ❌ NEW CLIENT = MEMORY LEAK!
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue'
  });
}
// Result: 1000 connection pools = file descriptor exhaustion = crash

❌ WRONG - Creating clients in handlers:

// ❌ DON'T DO THIS
app.post('/process', async (req, res) => {
  // ❌ Creating new client on every request = memory leak
  const sqsClient = new SQSClient({ region: 'us-east-1' });
  const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });
  // ...
});

// ✅ DO THIS INSTEAD - Create client once at startup
const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = createSQSConsumerPipeline({ client: sqsClient, queueUrl: '...' });

app.post('/process', async (req, res) => {
  // ✅ Reuse existing manager/client
  await manager.processInbound(...);
});

Why This Matters

Each AWS SDK client instance:

  • Opens its own HTTP connection pool (default: 50 connections)
  • Allocates memory for request/response buffers
  • Maintains separate credentials and configuration

Creating 100 clients = 5,000 open connections + significant memory overhead.

Recommended Pattern

// config/aws.ts - Create clients once at startup
export const awsClients = {
  sqs: new SQSClient({ region: process.env.AWS_REGION }),
  sns: new SNSClient({ region: process.env.AWS_REGION }),
  dynamodb: new DynamoDBClient({ region: process.env.AWS_REGION }),
  eventbridge: new EventBridgeClient({ region: process.env.AWS_REGION }),
};

// services/queue-processor.ts - Reuse clients
import { awsClients } from '../config/aws';

export const queueManager = createSQSConsumerPipeline({
  client: awsClients.sqs, // ✅ Reuse shared client
  queueUrl: process.env.QUEUE_URL,
});

Manual Configuration

For more control, you can configure middlewares manually:

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { SQSClient } from '@aws-sdk/client-sqs';
import {
  SQSConsumerMiddleware,
  SQSPublisherMiddleware
} from '@message-in-the-middle/aws';

const sqsClient = new SQSClient({ region: 'us-east-1' });
const manager = new MessageMiddlewareManager();

// Add middlewares
manager.useInbound(new SQSConsumerMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  autoDelete: true
}));

manager.useOutbound(new SQSPublisherMiddleware({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/output-queue'
}));

DynamoDB Table Setup

For deduplication, create a DynamoDB table with:

  • Partition Key: id (String)
  • TTL Attribute: ttl (Number) - Enable TTL in table settings

Example CloudFormation:

DeduplicationTable:
  Type: AWS::DynamoDB::Table
  Properties:
    TableName: message-deduplication
    AttributeDefinitions:
      - AttributeName: id
        AttributeType: S
    KeySchema:
      - AttributeName: id
        KeyType: HASH
    BillingMode: PAY_PER_REQUEST
    TimeToLiveSpecification:
      Enabled: true
      AttributeName: ttl

TypeScript Support

Full TypeScript support with generics:

interface OrderMessage {
  orderId: string;
  status: 'pending' | 'completed';
}

const manager = createSQSConsumerPipeline<OrderMessage>({
  client: sqsClient,
  queueUrl: 'https://sqs.us-east-1.amazonaws.com/123/orders'
});

// Type-safe message processing
manager.use(async (context, next) => {
  const order = context.message; // Type: OrderMessage
  console.log(order.orderId);
  await next();
});

License

MIT