@message-in-the-middle/rabbitmq
v0.1.2
Published
RabbitMQ integration for message-in-the-middle - Production-ready AMQP messaging with zero boilerplate
Maintainers
Readme
@message-in-the-middle/rabbitmq
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
Production-ready RabbitMQ integration for message-in-the-middle
Eliminate 50-100 lines of RabbitMQ boilerplate with zero dependencies beyond amqplib.
Features
- ✅ RabbitMQPoller - Production-ready message consumption (50+ lines → 5 lines)
- ✅ RabbitMQPublisher - Middleware-aware publishing
- ✅ Auto-reconnection - Automatic reconnection with exponential backoff
- ✅ Multi-consumer - Manage multiple consumers from single poller
- ✅ Graceful shutdown - Wait for in-flight messages before closing
- ✅ Per-consumer events - No if-else chains for consumer-specific logic
- ✅ QoS/Prefetch - Full control over message prefetch
- ✅ TypeScript-first - Full type safety with generics
- ✅ Zero overhead - Thin wrapper over amqplib
Installation
npm install @message-in-the-middle/rabbitmq @message-in-the-middle/core amqplibQuick Start
Before (Manual Setup - 50+ lines)
// Manual connection, channel management, error handling, etc.
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('orders', { durable: true });
channel.prefetch(10);
await channel.consume('orders', async (msg) => {
if (msg) {
try {
const content = msg.content.toString();
const parsed = JSON.parse(content);
// ... validation, retry logic, error handling
await processOrder(parsed);
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, true);
}
}
});
// Reconnection logic, graceful shutdown, etc. (30+ more lines)After (With RabbitMQPoller - 5 lines)
import { RabbitMQPoller } from '@message-in-the-middle/rabbitmq';
import { createQueuePipeline } from '@message-in-the-middle/core';
// 1. Create pipeline with all middleware
const ordersManager = createQueuePipeline({
queueName: 'orders',
validation: OrderSchema,
handler: async (ctx) => await processOrder(ctx.message),
logger: console,
});
// 2. Create poller and start consuming
const poller = new RabbitMQPoller('amqp://localhost', { logger: console });
const ordersConsumer = await poller.start({
queue: 'orders-queue',
manager: ordersManager,
name: 'orders',
prefetch: 10,
});
// 3. Graceful shutdown
await poller.stopAll();Result: 50+ lines → 5 lines. Production-ready with auto-reconnection, graceful shutdown, and full middleware support.
API Documentation
RabbitMQPoller
Production-ready message poller with connection management.
Constructor
new RabbitMQPoller(
connectionConfig: string | Options.Connect,
options?: RabbitMQPollerOptions
)Options:
interface RabbitMQPollerOptions {
logger?: Logger; // Logger instance
defaultPrefetch?: number; // Default: 10
defaultNoAck?: boolean; // Default: false
reconnectDelayMs?: number; // Default: 5000
maxReconnectAttempts?: number; // Default: Infinity
heartbeat?: number; // Default: 30 seconds
}start(options) → ConsumerController
Start consuming from a queue.
const consumer = await poller.start({
queue: 'orders-queue', // Queue name (required)
manager: ordersManager, // Middleware manager (required)
name: 'orders', // Consumer name (required)
prefetch: 10, // QoS prefetch count
noAck: false, // Auto-ack (default: false)
queueOptions: { // Queue assertion options
durable: true,
autoDelete: false,
},
requeueOnError: true, // Requeue on error (default: true)
});Returns: ConsumerController for per-consumer control
stopAll() → Promise
Stop all consumers gracefully and close connection.
await poller.stopAll();Global Events
// Connection lifecycle
poller.on('connection:connected', (connection) => {});
poller.on('connection:error', (error) => {});
poller.on('connection:closed', () => {});
poller.on('connection:reconnecting', (attempt) => {});
// Consumer lifecycle
poller.on('consumer:started', (name, queue) => {});
poller.on('consumer:stopped', (name) => {});
poller.on('consumer:error', (name, error) => {});
// Message processing (cross-consumer)
poller.on('message:received', (name, message) => {});
poller.on('message:processed', (name, message, duration) => {});
poller.on('message:failed', (name, message, error) => {});ConsumerController
Per-consumer control and events.
Methods
consumer.pause() // Pause consuming
consumer.resume() // Resume consuming
await consumer.stop() // Stop consumer gracefully
consumer.getStatus() // Get consumer status
consumer.getName() // Get consumer name
consumer.getQueue() // Get queue name
consumer.isPaused() // Check if pausedPer-Consumer Events
// No if-else chains! Each consumer has its own events
ordersConsumer.on('message:processed', (message, duration) => {
logger.info('Order processed', { duration });
});
ordersConsumer.on('message:failed', (message, error) => {
alerting.notify('Order processing failed', { error });
});
notificationsConsumer.on('message:processed', (message, duration) => {
logger.info('Notification sent', { duration });
});RabbitMQPublisher
Middleware-aware message publishing.
Constructor
new RabbitMQPublisher(
connectionConfig: string | Options.Connect,
options?: RabbitMQPublisherOptions
)Options:
interface RabbitMQPublisherOptions {
logger?: Logger;
confirmChannel?: boolean; // Use confirm channel (default: false)
defaultExchange?: string; // Default: '' (direct)
defaultRoutingKey?: string;
}use(middleware) → this
Add outbound middleware to pipeline.
publisher
.use(new StringifyJsonOutboundMiddleware())
.use(new EncryptOutboundMiddleware(key))
.use(new MetricsOutboundMiddleware(collector));publish(message, options) → Promise
Publish message through middleware pipeline.
await publisher.publish(
{ orderId: '123', amount: 99.99 },
{
exchange: 'orders',
routingKey: 'order.created',
options: { persistent: true },
}
);destroy() → Promise
Close connections and cleanup.
await publisher.destroy();RabbitMQMetadataMiddleware
Extract RabbitMQ-specific metadata.
import { RabbitMQMetadataMiddleware } from '@message-in-the-middle/rabbitmq';
manager.addInboundMiddleware(new RabbitMQMetadataMiddleware());
// Access in handler
const handler = async (ctx) => {
const rabbitmq = ctx.metadata.rabbitmq; // RabbitMQMessageMetadata
console.log('Exchange:', rabbitmq.exchange);
console.log('Routing Key:', rabbitmq.routingKey);
console.log('Redelivered:', rabbitmq.redelivered);
console.log('Correlation ID:', rabbitmq.correlationId);
};Extracted metadata:
exchange- Exchange nameroutingKey- Routing keyredelivered- Redelivery flagmessageId- Message IDcorrelationId- Correlation IDtimestamp- Timestampheaders- Custom headersdeliveryTag- Delivery tag
Complete Example
import { RabbitMQPoller, RabbitMQMetadataMiddleware } from '@message-in-the-middle/rabbitmq';
import { createQueuePipeline } from '@message-in-the-middle/core';
import { z } from 'zod';
// Define schema
const OrderSchema = z.object({
orderId: z.string().uuid(),
amount: z.number().positive(),
});
// Create pipeline
const ordersManager = createQueuePipeline({
queueName: 'orders',
validation: OrderSchema,
maxRetries: 3,
handler: async (ctx) => {
// Access RabbitMQ metadata
const rabbitmq = ctx.metadata.rabbitmq;
console.log('Routing key:', rabbitmq.routingKey);
// Process order
await processOrder(ctx.message);
},
logger: console,
});
// Add RabbitMQ metadata extraction
ordersManager.addInboundMiddleware(new RabbitMQMetadataMiddleware());
// Create poller
const poller = new RabbitMQPoller('amqp://localhost', {
logger: console,
defaultPrefetch: 10,
});
// Start consumer
const ordersConsumer = await poller.start({
queue: 'orders-queue',
manager: ordersManager,
name: 'orders',
queueOptions: { durable: true },
});
// Per-consumer events
ordersConsumer.on('message:processed', (message, duration) => {
console.log(`✅ Processed in ${duration}ms`);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await poller.stopAll();
await ordersManager.destroy();
});Multi-Consumer Example
Manage multiple consumers from a single poller:
const poller = new RabbitMQPoller('amqp://localhost', { logger: console });
// Start multiple consumers
const ordersConsumer = await poller.start({
queue: 'orders',
manager: ordersManager,
name: 'orders',
prefetch: 10,
});
const notificationsConsumer = await poller.start({
queue: 'notifications',
manager: notificationsManager,
name: 'notifications',
prefetch: 20,
});
const analyticsConsumer = await poller.start({
queue: 'analytics',
manager: analyticsManager,
name: 'analytics',
prefetch: 50,
});
// Per-consumer events (no if-else chains!)
ordersConsumer.on('message:processed', (msg, duration) => {
metrics.timing('orders.duration', duration);
});
notificationsConsumer.on('message:processed', (msg, duration) => {
metrics.timing('notifications.duration', duration);
});
// Stop all at once
await poller.stopAll();Publishing Example
import { RabbitMQPublisher } from '@message-in-the-middle/rabbitmq';
import { StringifyJsonOutboundMiddleware } from '@message-in-the-middle/core';
const publisher = new RabbitMQPublisher('amqp://localhost', {
confirmChannel: true,
logger: console,
});
// Add middlewares
publisher.use(new StringifyJsonOutboundMiddleware());
// Publish with middleware processing
await publisher.publish(
{ orderId: '123', amount: 99.99 },
{
exchange: 'orders',
routingKey: 'order.created',
options: {
persistent: true,
headers: { source: 'api' },
},
}
);
// Cleanup
await publisher.destroy();Comparison
vs Manual RabbitMQ
| Feature | Manual | @message-in-the-middle/rabbitmq | |---------|--------|-------------------------------| | Setup code | 50-100 lines | 5 lines | | Connection management | Manual | Automatic | | Reconnection | Manual (30+ lines) | Automatic | | Graceful shutdown | Manual (20+ lines) | Built-in | | Multi-consumer | Duplicate code | Single poller | | Middleware | None | Full support | | Type safety | Basic | Full generics |
vs SQS Package
Both packages follow the same API pattern for consistency:
// SQS
const sqsPoller = new SQSPoller(sqsClient, { logger });
sqsPoller.start({ queueUrl, manager, name });
// RabbitMQ (same pattern!)
const rabbitPoller = new RabbitMQPoller('amqp://localhost', { logger });
rabbitPoller.start({ queue, manager, name });Best Practices
1. Use Per-Consumer Events
// ✅ Good - Per-consumer events
ordersConsumer.on('message:failed', async (msg, error) => {
await notifyTeam('Orders queue failing', { error });
});
// ❌ Bad - Global events with if-else
poller.on('message:failed', (name, msg, error) => {
if (name === 'orders') {
await notifyTeam('Orders queue failing', { error });
}
});2. Graceful Shutdown
// Always implement graceful shutdown
process.on('SIGTERM', async () => {
await poller.stopAll();
await manager.destroy();
process.exit(0);
});3. Queue-Native Features
Use RabbitMQ's native features for infrastructure concerns:
// ✅ Use RabbitMQ DLX for dead letters
await channel.assertQueue('orders', {
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed.orders',
},
});
// ✅ Use prefetch for rate limiting
poller.start({ queue: 'orders', prefetch: 10 });
// ✅ Use middleware for message processing
manager
.use(parseJson())
.use(validate(schema))
.use(retry({ maxRetries: 3 }));Examples
Links
License
MIT
