npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@message-in-the-middle/rabbitmq

v0.1.2

Published

RabbitMQ integration for message-in-the-middle - Production-ready AMQP messaging with zero boilerplate

Readme

@message-in-the-middle/rabbitmq

⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment

message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.

Why This Exists

Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.


Production-ready RabbitMQ integration for message-in-the-middle

Eliminate 50-100 lines of RabbitMQ boilerplate with zero dependencies beyond amqplib.

Features

  • RabbitMQPoller - Production-ready message consumption (50+ lines → 5 lines)
  • RabbitMQPublisher - Middleware-aware publishing
  • Auto-reconnection - Automatic reconnection with exponential backoff
  • Multi-consumer - Manage multiple consumers from single poller
  • Graceful shutdown - Wait for in-flight messages before closing
  • Per-consumer events - No if-else chains for consumer-specific logic
  • QoS/Prefetch - Full control over message prefetch
  • TypeScript-first - Full type safety with generics
  • Zero overhead - Thin wrapper over amqplib

Installation

npm install @message-in-the-middle/rabbitmq @message-in-the-middle/core amqplib

Quick Start

Before (Manual Setup - 50+ lines)

// Manual connection, channel management, error handling, etc.
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('orders', { durable: true });
channel.prefetch(10);

await channel.consume('orders', async (msg) => {
  if (msg) {
    try {
      const content = msg.content.toString();
      const parsed = JSON.parse(content);
      // ... validation, retry logic, error handling
      await processOrder(parsed);
      channel.ack(msg);
    } catch (error) {
      channel.nack(msg, false, true);
    }
  }
});

// Reconnection logic, graceful shutdown, etc. (30+ more lines)

After (With RabbitMQPoller - 5 lines)

import { RabbitMQPoller } from '@message-in-the-middle/rabbitmq';
import { createQueuePipeline } from '@message-in-the-middle/core';

// 1. Create pipeline with all middleware
const ordersManager = createQueuePipeline({
  queueName: 'orders',
  validation: OrderSchema,
  handler: async (ctx) => await processOrder(ctx.message),
  logger: console,
});

// 2. Create poller and start consuming
const poller = new RabbitMQPoller('amqp://localhost', { logger: console });
const ordersConsumer = await poller.start({
  queue: 'orders-queue',
  manager: ordersManager,
  name: 'orders',
  prefetch: 10,
});

// 3. Graceful shutdown
await poller.stopAll();

Result: 50+ lines → 5 lines. Production-ready with auto-reconnection, graceful shutdown, and full middleware support.


API Documentation

RabbitMQPoller

Production-ready message poller with connection management.

Constructor

new RabbitMQPoller(
  connectionConfig: string | Options.Connect,
  options?: RabbitMQPollerOptions
)

Options:

interface RabbitMQPollerOptions {
  logger?: Logger;                    // Logger instance
  defaultPrefetch?: number;           // Default: 10
  defaultNoAck?: boolean;             // Default: false
  reconnectDelayMs?: number;          // Default: 5000
  maxReconnectAttempts?: number;      // Default: Infinity
  heartbeat?: number;                 // Default: 30 seconds
}

start(options) → ConsumerController

Start consuming from a queue.

const consumer = await poller.start({
  queue: 'orders-queue',              // Queue name (required)
  manager: ordersManager,             // Middleware manager (required)
  name: 'orders',                     // Consumer name (required)
  prefetch: 10,                       // QoS prefetch count
  noAck: false,                       // Auto-ack (default: false)
  queueOptions: {                     // Queue assertion options
    durable: true,
    autoDelete: false,
  },
  requeueOnError: true,               // Requeue on error (default: true)
});

Returns: ConsumerController for per-consumer control

stopAll() → Promise

Stop all consumers gracefully and close connection.

await poller.stopAll();

Global Events

// Connection lifecycle
poller.on('connection:connected', (connection) => {});
poller.on('connection:error', (error) => {});
poller.on('connection:closed', () => {});
poller.on('connection:reconnecting', (attempt) => {});

// Consumer lifecycle
poller.on('consumer:started', (name, queue) => {});
poller.on('consumer:stopped', (name) => {});
poller.on('consumer:error', (name, error) => {});

// Message processing (cross-consumer)
poller.on('message:received', (name, message) => {});
poller.on('message:processed', (name, message, duration) => {});
poller.on('message:failed', (name, message, error) => {});

ConsumerController

Per-consumer control and events.

Methods

consumer.pause()              // Pause consuming
consumer.resume()             // Resume consuming
await consumer.stop()         // Stop consumer gracefully
consumer.getStatus()          // Get consumer status
consumer.getName()            // Get consumer name
consumer.getQueue()           // Get queue name
consumer.isPaused()           // Check if paused

Per-Consumer Events

// No if-else chains! Each consumer has its own events
ordersConsumer.on('message:processed', (message, duration) => {
  logger.info('Order processed', { duration });
});

ordersConsumer.on('message:failed', (message, error) => {
  alerting.notify('Order processing failed', { error });
});

notificationsConsumer.on('message:processed', (message, duration) => {
  logger.info('Notification sent', { duration });
});

RabbitMQPublisher

Middleware-aware message publishing.

Constructor

new RabbitMQPublisher(
  connectionConfig: string | Options.Connect,
  options?: RabbitMQPublisherOptions
)

Options:

interface RabbitMQPublisherOptions {
  logger?: Logger;
  confirmChannel?: boolean;           // Use confirm channel (default: false)
  defaultExchange?: string;           // Default: '' (direct)
  defaultRoutingKey?: string;
}

use(middleware) → this

Add outbound middleware to pipeline.

publisher
  .use(new StringifyJsonOutboundMiddleware())
  .use(new EncryptOutboundMiddleware(key))
  .use(new MetricsOutboundMiddleware(collector));

publish(message, options) → Promise

Publish message through middleware pipeline.

await publisher.publish(
  { orderId: '123', amount: 99.99 },
  {
    exchange: 'orders',
    routingKey: 'order.created',
    options: { persistent: true },
  }
);

destroy() → Promise

Close connections and cleanup.

await publisher.destroy();

RabbitMQMetadataMiddleware

Extract RabbitMQ-specific metadata.

import { RabbitMQMetadataMiddleware } from '@message-in-the-middle/rabbitmq';

manager.addInboundMiddleware(new RabbitMQMetadataMiddleware());

// Access in handler
const handler = async (ctx) => {
  const rabbitmq = ctx.metadata.rabbitmq; // RabbitMQMessageMetadata
  console.log('Exchange:', rabbitmq.exchange);
  console.log('Routing Key:', rabbitmq.routingKey);
  console.log('Redelivered:', rabbitmq.redelivered);
  console.log('Correlation ID:', rabbitmq.correlationId);
};

Extracted metadata:

  • exchange - Exchange name
  • routingKey - Routing key
  • redelivered - Redelivery flag
  • messageId - Message ID
  • correlationId - Correlation ID
  • timestamp - Timestamp
  • headers - Custom headers
  • deliveryTag - Delivery tag

Complete Example

import { RabbitMQPoller, RabbitMQMetadataMiddleware } from '@message-in-the-middle/rabbitmq';
import { createQueuePipeline } from '@message-in-the-middle/core';
import { z } from 'zod';

// Define schema
const OrderSchema = z.object({
  orderId: z.string().uuid(),
  amount: z.number().positive(),
});

// Create pipeline
const ordersManager = createQueuePipeline({
  queueName: 'orders',
  validation: OrderSchema,
  maxRetries: 3,
  handler: async (ctx) => {
    // Access RabbitMQ metadata
    const rabbitmq = ctx.metadata.rabbitmq;
    console.log('Routing key:', rabbitmq.routingKey);

    // Process order
    await processOrder(ctx.message);
  },
  logger: console,
});

// Add RabbitMQ metadata extraction
ordersManager.addInboundMiddleware(new RabbitMQMetadataMiddleware());

// Create poller
const poller = new RabbitMQPoller('amqp://localhost', {
  logger: console,
  defaultPrefetch: 10,
});

// Start consumer
const ordersConsumer = await poller.start({
  queue: 'orders-queue',
  manager: ordersManager,
  name: 'orders',
  queueOptions: { durable: true },
});

// Per-consumer events
ordersConsumer.on('message:processed', (message, duration) => {
  console.log(`✅ Processed in ${duration}ms`);
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await poller.stopAll();
  await ordersManager.destroy();
});

Multi-Consumer Example

Manage multiple consumers from a single poller:

const poller = new RabbitMQPoller('amqp://localhost', { logger: console });

// Start multiple consumers
const ordersConsumer = await poller.start({
  queue: 'orders',
  manager: ordersManager,
  name: 'orders',
  prefetch: 10,
});

const notificationsConsumer = await poller.start({
  queue: 'notifications',
  manager: notificationsManager,
  name: 'notifications',
  prefetch: 20,
});

const analyticsConsumer = await poller.start({
  queue: 'analytics',
  manager: analyticsManager,
  name: 'analytics',
  prefetch: 50,
});

// Per-consumer events (no if-else chains!)
ordersConsumer.on('message:processed', (msg, duration) => {
  metrics.timing('orders.duration', duration);
});

notificationsConsumer.on('message:processed', (msg, duration) => {
  metrics.timing('notifications.duration', duration);
});

// Stop all at once
await poller.stopAll();

Publishing Example

import { RabbitMQPublisher } from '@message-in-the-middle/rabbitmq';
import { StringifyJsonOutboundMiddleware } from '@message-in-the-middle/core';

const publisher = new RabbitMQPublisher('amqp://localhost', {
  confirmChannel: true,
  logger: console,
});

// Add middlewares
publisher.use(new StringifyJsonOutboundMiddleware());

// Publish with middleware processing
await publisher.publish(
  { orderId: '123', amount: 99.99 },
  {
    exchange: 'orders',
    routingKey: 'order.created',
    options: {
      persistent: true,
      headers: { source: 'api' },
    },
  }
);

// Cleanup
await publisher.destroy();

Comparison

vs Manual RabbitMQ

| Feature | Manual | @message-in-the-middle/rabbitmq | |---------|--------|-------------------------------| | Setup code | 50-100 lines | 5 lines | | Connection management | Manual | Automatic | | Reconnection | Manual (30+ lines) | Automatic | | Graceful shutdown | Manual (20+ lines) | Built-in | | Multi-consumer | Duplicate code | Single poller | | Middleware | None | Full support | | Type safety | Basic | Full generics |

vs SQS Package

Both packages follow the same API pattern for consistency:

// SQS
const sqsPoller = new SQSPoller(sqsClient, { logger });
sqsPoller.start({ queueUrl, manager, name });

// RabbitMQ (same pattern!)
const rabbitPoller = new RabbitMQPoller('amqp://localhost', { logger });
rabbitPoller.start({ queue, manager, name });

Best Practices

1. Use Per-Consumer Events

// ✅ Good - Per-consumer events
ordersConsumer.on('message:failed', async (msg, error) => {
  await notifyTeam('Orders queue failing', { error });
});

// ❌ Bad - Global events with if-else
poller.on('message:failed', (name, msg, error) => {
  if (name === 'orders') {
    await notifyTeam('Orders queue failing', { error });
  }
});

2. Graceful Shutdown

// Always implement graceful shutdown
process.on('SIGTERM', async () => {
  await poller.stopAll();
  await manager.destroy();
  process.exit(0);
});

3. Queue-Native Features

Use RabbitMQ's native features for infrastructure concerns:

// ✅ Use RabbitMQ DLX for dead letters
await channel.assertQueue('orders', {
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'failed.orders',
  },
});

// ✅ Use prefetch for rate limiting
poller.start({ queue: 'orders', prefetch: 10 });

// ✅ Use middleware for message processing
manager
  .use(parseJson())
  .use(validate(schema))
  .use(retry({ maxRetries: 3 }));

Examples


Links


License

MIT