@mrigind/rabbitmq-with-retry-and-dlq
v1.0.27
Published
RabbitMQ implementation with dynamic retry logic and Dead Letter Queue support
Downloads
4
Maintainers
Readme
RabbitMQ with Retry and Dead Letter Queue
A production-ready TypeScript RabbitMQ library with automatic retry, exponential backoff, and dead letter queue support.
Features
- ✨ Lazy Loading - Automatic connection on first use
- 🔄 Smart Retry - Exponential/linear backoff with jitter
- 💀 Dead Letter Queue - Auto-routing after max retries
- 🔌 Auto-Reconnection - Handles network issues automatically
- 📦 TypeScript - Full type safety
- 🎯 Exchange Support - Direct, topic, fanout, headers
- 🏭 Industry Best Practice - Separation of concerns (Publisher→Exchange, Consumer→Queue)
Installation
npm install rabbitmq-with-retry-and-dlqQuick Start (Best Practice)
import {
initializeRabbitMQ,
publisher,
consumer,
} from 'rabbitmq-with-retry-and-dlq';
// 1. Initialize connection
await initializeRabbitMQ('amqp://localhost:5672');
// 2. Publisher: Assert exchange
await publisher.assertExchange('orders', { exchangeType: 'topic' });
// 3. Consumer: Setup queue with exchange and routing keys
await consumer.setupQueue({
queueName: 'orders-processor',
exchangeName: 'orders',
exchangeType: 'topic',
routingKeys: ['order.created', 'order.updated'],
retryConfig: { maxRetries: 5, retryDelayMs: 2000 },
});
// 4. Consumer: Start consuming
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message) => {
await processOrder(message);
},
});
// 5. Publisher: Publish messages
await publisher.publishToExchange({
exchangeName: 'orders',
routingKey: 'order.created',
message: { orderId: 123 },
});Usage
1. Initialize (Required - Call Once with Await)
You must call await initializeRabbitMQ() once in your application setup or helper file:
import {
initializeRabbitMQ,
isPublisherConnected,
isConsumerConnected,
} from 'rabbitmq-with-retry-and-dlq';
try {
// Call once at application startup (in your helper/setup file)
// Use await - this establishes the connection and waits for it to be ready
await initializeRabbitMQ('amqp://user:pass@localhost:5672');
// Check connection status (both should be true after await)
console.log('Publisher connected:', isPublisherConnected()); // true
console.log('Consumer connected:', isConsumerConnected()); // true
} catch (error) {
// Connection failed (wrong URL, RabbitMQ not running, timeout, etc.)
console.error('Failed to connect to RabbitMQ:', error);
process.exit(1); // Fail fast
}Error Handling:
- Throws error if URL is wrong
- Throws error if RabbitMQ is not running
- Throws error if connection timeout (default: 30 seconds)
- Returns only when connections are established
2. Lazy Loading (Automatic Connection)
After initialization, just use publisher/consumer - connection happens automatically:
import { publisher, consumer } from 'rabbitmq-with-retry-and-dlq';
// No additional setup needed - just use it!
// First call automatically establishes connection
await publisher.publishToQueue({
queueName: 'orders',
message: { orderId: 123 },
});Industry Best Practice: Separation of Concerns
This library follows the RabbitMQ best practice pattern:
- Publisher → Owns Exchanges (knows the topic/domain)
- Consumer → Owns Queues and Bindings (decides what to subscribe to)
This is cleaner because:
- Publishers don't need to know about queues
- Consumers can create their own queues and choose their bindings
- Multiple consumers can bind differently to the same exchange
Publisher: Assert Exchange
import { publisher } from 'rabbitmq-with-retry-and-dlq';
// Assert a single exchange
await publisher.assertExchange('orders');
// Assert with exchange type
await publisher.assertExchange('events', { exchangeType: 'topic' });
// Assert multiple exchanges
await publisher.assertExchange(['orders', 'payments', 'notifications']);
// With options
await publisher.assertExchange('orders', {
exchangeType: 'direct', // 'direct' | 'topic' | 'fanout' | 'headers' (default: 'direct')
durable: true, // Survives broker restart (default: true)
});Consumer: Assert Exchanges (Recommended!)
import { consumer } from 'rabbitmq-with-retry-and-dlq';
// IMPORTANT: Assert exchanges on consumer's connection BEFORE binding
// This prevents "NOT_FOUND - no exchange" race condition errors
await consumer.assertExchange('orders', { exchangeType: 'topic' });
await consumer.assertExchange('payments', { exchangeType: 'direct' });
// Or assert multiple exchanges at once
await consumer.assertExchange(['orders', 'payments'], {
exchangeType: 'direct',
});Why? Publisher and consumer use separate connections. Even if publisher creates the exchange, the consumer's connection might not see it immediately. Asserting exchanges on the consumer's connection ensures they exist before binding.
Consumer: Setup Queue (RECOMMENDED - Best Practice)
Use setupQueue() for complete setup in one atomic operation - prevents race conditions and supports multiple routing keys:
// ✅ RECOMMENDED: Complete setup in one call
await consumer.setupQueue({
queueName: 'orders-processor',
exchangeName: 'orders',
exchangeType: 'topic',
routingKeys: ['order.created', 'order.updated', 'order.cancelled'], // Multiple routing keys!
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
jitterMs: 1000,
},
});
// Then consume (queue already set up)
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message) => {
console.log('Processing order:', message);
await processOrder(message);
},
});What setupQueue() does:
- ✅ Asserts exchange
- ✅ Creates queue with retry/DLQ infrastructure
- ✅ Binds queue to exchange with all routing keys
- ✅ All in ONE atomic operation (no race conditions)
Consumer: Assert Queues (Alternative - More Control)
Use assertQueues() when you need more control or want to add bindings dynamically:
// Assert a single queue
await consumer.assertQueues('orders-worker');
// Assert multiple queues
await consumer.assertQueues(['orders-worker', 'payments-worker']);
// Assert queue with exchange binding and retry/DLQ configuration
await consumer.assertQueues('orders-worker', {
exchangeName: 'orders',
exchangeType: 'direct',
routingKey: 'order.created', // Single routing key
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
},
});Consumer: Bind Queue to Exchange (For Additional Bindings)
Use bindQueue() to add more routing key bindings after initial setup:
// IMPORTANT: Call consumer.assertExchange() FIRST to prevent race conditions!
// Bind queue to exchange with routing key
await consumer.bindQueue('orders-worker', 'orders', 'order.created', {
exchangeType: 'topic', // Ensures exchange exists (recommended)
});
// Bind with pattern (for topic exchanges)
await consumer.bindQueue('all-orders', 'events', 'order.*', {
exchangeType: 'topic',
});
// Add multiple bindings to same queue
await consumer.bindQueue('orders-worker', 'orders', 'order.created');
await consumer.bindQueue('orders-worker', 'orders', 'order.updated');
await consumer.bindQueue('orders-worker', 'orders', 'order.cancelled');Complete Best Practice Example (RECOMMENDED)
// === PUBLISHER SIDE ===
// Publisher only knows about exchanges
await publisher.assertExchange('orders', { exchangeType: 'topic' });
await publisher.publishToExchange({
exchangeName: 'orders',
routingKey: 'order.created',
message: { orderId: 123, amount: 99.99 },
});
// === CONSUMER SIDE ===
// Step 1: Setup queue with exchange and multiple routing keys (RECOMMENDED)
await consumer.setupQueue({
queueName: 'orders-processor',
exchangeName: 'orders',
exchangeType: 'topic',
routingKeys: ['order.created', 'order.updated'], // Multiple keys!
retryConfig: { maxRetries: 3, retryDelayMs: 5000 },
});
// Step 2: Start consuming (queue already set up)
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message) => {
console.log('Processing order:', message);
await processOrder(message);
},
});Alternative: Step-by-Step Setup
If you need more granular control, use separate methods:
// Step 1: Assert exchanges on consumer's connection (defensive)
await consumer.assertExchange('orders', { exchangeType: 'topic' });
// Step 2: Create queues with retry/DLQ config
await consumer.assertQueues('orders-processor', {
retryConfig: { maxRetries: 3, retryDelayMs: 5000 },
});
// Step 3: Bind queues to exchanges (can add multiple routing keys)
await consumer.bindQueue('orders-processor', 'orders', 'order.created', {
exchangeType: 'topic',
});
await consumer.bindQueue('orders-processor', 'orders', 'order.updated', {
exchangeType: 'topic',
});
// Step 4: Start consuming
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message) => {
await processOrder(message);
},
});Direct Queue Publishing (Without Exchange)
For simple point-to-point messaging without exchanges:
Publish to Queue
// Note: Queue should be set up by consumer first using assertQueues() or setupQueue()
await publisher.publishToQueue({
queueName: 'orders',
message: {
orderId: 'ORDER-123',
amount: 99.99,
},
options: {
persistent: true, // Survives broker restart (default: true)
priority: 5, // 0-10 priority (optional)
expiration: '5000', // TTL in ms (optional)
},
// Optional: retryConfig if queue wasn't set up with retry
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
},
});Publish to Exchange
// Publisher only needs exchange and routing key (not queue!)
await publisher.publishToExchange({
exchangeName: 'orders_exchange',
exchangeType: 'direct',
routingKey: 'order.created',
message: { orderId: 'ORDER-456' },
options: {
persistent: true,
priority: 5,
},
});Consumer (Best Practice)
// Set up error handler FIRST
consumer.on('error', (errorEvent) => {
console.error('RabbitMQ Error:', errorEvent);
if (errorEvent.type === 'DLQ_FAILED') {
console.error('CRITICAL: Message lost!');
}
});
// Step 1: Setup queue with exchange and routing keys
await consumer.setupQueue({
queueName: 'orders-processor',
exchangeName: 'orders_exchange',
exchangeType: 'direct',
routingKeys: ['order.created', 'order.updated'],
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
},
});
// Step 2: Start consuming (queue already set up)
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message, messageInfo) => {
console.log('Processing:', message);
// Your business logic
await processOrder(message);
// Success = auto-ack
// Throw error = retry with backoff → DLQ
},
options: {
prefetch: 5, // Max unacked messages (default: 5)
noAck: false, // Auto-ack (default: false)
},
});7. Graceful Shutdown (Required!)
import { closeRabbitMQ } from 'rabbitmq-with-retry-and-dlq';
const shutdown = async () => {
console.log('Shutting down...');
await closeRabbitMQ();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Retry Configuration
Retry Config Keys
interface RetryConfig {
maxRetries: number; // Required: max retry attempts
retryDelayMs?: number; // Optional: base delay in ms (default: 5000)
backoffStrategy?: string; // Optional: 'exponential' | 'linear' (default: 'exponential')
maxDelayMs?: number; // Optional: max delay cap (default: 300000)
jitterMs?: number; // Optional: random jitter range (default: 0)
}Exponential Backoff Example
retryConfig: {
maxRetries: 4,
retryDelayMs: 1000,
backoffStrategy: 'exponential'
}
// Delays: 1s → 2s → 4s → 8s → DLQLinear Backoff Example
retryConfig: {
maxRetries: 4,
retryDelayMs: 5000,
backoffStrategy: 'linear'
}
// Delays: 5s → 10s → 15s → 20s → DLQWith Jitter (Prevents Thundering Herd)
retryConfig: {
maxRetries: 3,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
jitterMs: 1000
}
// Retry 1: 2000ms + random(0-1000ms) = 2000-3000ms
// Retry 2: 4000ms + random(0-1000ms) = 4000-5000ms
// Retry 3: 8000ms + random(0-1000ms) = 8000-9000msComplete Example (Best Practice Pattern)
import express from 'express';
import {
initializeRabbitMQ,
closeRabbitMQ,
publisher,
consumer,
isPublisherConnected,
isConsumerConnected,
type RabbitMQErrorEvent,
} from 'rabbitmq-with-retry-and-dlq';
const app = express();
app.use(express.json());
async function startApp() {
try {
// 1. Initialize RabbitMQ connection (required - use await)
console.log('Connecting to RabbitMQ...');
await initializeRabbitMQ('amqp://user:pass@localhost:5672');
console.log('✓ Publisher connected:', isPublisherConnected()); // true
console.log('✓ Consumer connected:', isConsumerConnected()); // true
// 2. Set up error handler
consumer.on('error', (errorEvent: RabbitMQErrorEvent) => {
console.error('RabbitMQ Error:', errorEvent);
});
// 3. Publisher: Assert exchange (publisher owns exchanges)
await publisher.assertExchange('orders', { exchangeType: 'topic' });
console.log('✓ Publisher exchange asserted');
// 4. Consumer: Setup queue with exchange and routing keys (RECOMMENDED)
await consumer.setupQueue({
queueName: 'orders-processor',
exchangeName: 'orders',
exchangeType: 'topic',
routingKeys: ['order.created', 'order.updated'],
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
jitterMs: 1000,
},
});
console.log('✓ Queue setup complete');
// 5. Start consumer (queue already set up)
await consumer.consumeQueue({
queueName: 'orders-processor',
onMessage: async (message) => {
console.log('Processing order:', message);
await processOrder(message);
},
});
console.log('✓ Consumer started');
// 8. Start server
app.listen(3000, () => {
console.log('✓ Server running on port 3000');
});
} catch (error) {
console.error('❌ Failed to start app:', error);
console.error(' Make sure RabbitMQ is running and URL is correct');
process.exit(1);
}
}
// API endpoint - Publisher only knows about exchanges (not queues!)
app.post('/orders', async (req, res) => {
try {
await publisher.publishToExchange({
exchangeName: 'orders',
routingKey: 'order.created',
message: req.body,
});
res.json({ success: true });
} catch (error) {
console.error('Failed to publish:', error);
res.status(500).json({ error: 'Failed to publish order' });
}
});
// Graceful shutdown
const shutdown = async () => {
console.log('Shutting down...');
await closeRabbitMQ();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
// Helper function
async function processOrder(order: any) {
// Your business logic here
console.log('Order processed:', order.orderId);
}
// Start the app
startApp();API Reference
Publisher Methods
| Method | Description |
| --------------------------------- | -------------------------------------------------------------- |
| assertExchange(names, options?) | Create exchange(s) - Publisher owns exchanges |
| publishToExchange(config) | Publish to exchange with routing key |
| publishToQueue(config) | Publish directly to queue (queue should be set up by consumer) |
| publishBatch(configs) | Publish multiple messages |
| close() | Close publisher connection |
Consumer Methods
| Method | Description |
| -------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| setupQueue(config) | ⭐ RECOMMENDED - Complete setup: exchange + queue + bindings + retry/DLQ in one atomic operation |
| assertExchange(names, options?) | Assert exchange(s) on consumer connection (defensive) |
| assertQueues(names, options?) | Create queue(s) with optional exchange binding and retry/DLQ config |
| bindQueue(queue, exchange, routingKey, options?) | Bind queue to exchange (for additional routing keys) |
| deleteQueues(names, options?) | Delete queue(s) and related retry/DLQ queues |
| consumeQueue(config) | Start consuming from queue (queue must be set up first) |
| consumeMultipleQueues(config) | Consume from multiple queues |
| consumeWithPattern(exchange, pattern, handler) | Consume with pattern matching (topic) |
| getConsumerStats() | Get consumer statistics |
| close() | Close consumer connection |
Additional Options
Exchange Options (for assertExchange)
{
exchangeType: 'direct', // 'direct' | 'topic' | 'fanout' | 'headers' (default: 'direct')
durable: true, // Exchange survives restart (default: true)
}Queue Options (for assertQueues)
{
durable: true, // Queue survives restart (default: true)
exclusive: false, // Exclusive to connection (default: false)
autoDelete: false, // Auto-delete when unused (default: false)
retryConfig: {...}, // Optional: retry and DLQ configuration
}Bind Queue (consumer.bindQueue)
await consumer.bindQueue(
'queue-name', // Queue to bind
'exchange-name', // Exchange to bind to
'routing.key', // Routing key or pattern (for topic exchanges)
{
// Options (optional)
exchangeType: 'topic', // If provided, asserts exchange before binding (RECOMMENDED)
durable: true, // Exchange durability (default: true)
}
);
// Pattern examples for topic exchanges:
// 'order.*' - matches order.created, order.updated
// 'order.#' - matches order.created, order.created.v2
// '*.created' - matches order.created, payment.createdNote: When publisher and consumer use separate connections, there can be race conditions where the consumer tries to bind before the exchange is fully created. Specifying
exchangeTypein bindQueue ensures the exchange exists on the consumer's connection (idempotent operation).
Publish Options
{
persistent: true, // Message survives restart (default: true)
priority: 5, // Priority 0-10 (optional)
expiration: '5000', // TTL in ms (optional)
}Consumer Options
{
prefetch: 5, // Max unacked messages (default: 5)
noAck: false, // Auto-acknowledge (default: false)
exclusive: false, // Exclusive consumer (default: false)
durable: true, // Queue survives restart (default: true)
}Error Event Types
RETRY_QUEUE_FAILED- Failed to send to retry queueDLQ_FAILED- Failed to send to DLQ (CRITICAL!)MESSAGE_PROCESSING_FAILED- Processing errorPUBLISH_FAILED- Publish failedCONNECTION_ERROR- Connection error
Queue Management
// Delete queue (includes retry and DLQ by default)
await consumer.deleteQueues('orders');
// Delete multiple queues
await consumer.deleteQueues(['orders', 'payments']);
// Delete only main queue
await consumer.deleteQueues('orders', {
includeRetry: false,
includeDLQ: false,
});Important Notes
Required (Must Implement)
- ⚠️ Must call
await initializeRabbitMQ(url)with try/catch - throws error on connection failure - ⚠️ Must implement graceful shutdown handlers (
SIGTERM,SIGINT) - ⚠️ Must monitor DLQ failures (indicates message loss)
Error Handling
initializeRabbitMQ()throws error if:- RabbitMQ URL is wrong
- RabbitMQ is not running
- Connection timeout (default: 30 seconds)
- Always wrap in try/catch for fail-fast behavior
Automatic (No Action Needed)
- ✅ Auto-reconnection on connection loss
- ✅ Lazy loading - connects on first use (after initialization)
- ✅ Message durability with
persistent: true - ✅ Queue durability with
durable: true - ✅ Thread-safe initialization
Best Practices
- Call
initializeRabbitMQ(url)once in your setup/helper file - Use separation of concerns pattern:
- Publisher →
assertExchange()+publishToExchange()(owns exchanges) - Consumer →
setupQueue()orassertQueues()+bindQueue()(owns queues and bindings)
- Publisher →
- Recommended workflow:
- Setup infrastructure at startup:
consumer.setupQueue()orconsumer.assertQueues() - Start consuming:
consumer.consumeQueue()
- Setup infrastructure at startup:
- Set
retryConfiginsetupQueue()orassertQueues()- auto-used when processing assertExchange(),assertQueues(),setupQueue(), andbindQueue()are idempotent - safe to call multiple times- Use exponential backoff with jitter to prevent thundering herd
- Always set up error event handlers before consuming
- Use graceful shutdown in production
Troubleshooting
Error: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'
Problem:
Error: Operation failed: QueueDeclare; 406 (PRECONDITION_FAILED) with message
"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue
'my_queue' in vhost '/': received 'my_exchange' but current is ''"Cause: The queue already exists in RabbitMQ with different configuration arguments than what your code is trying to set.
Solution: Delete the existing queue and let your code recreate it with the correct configuration.
Option 1 - Using the utility script:
npx ts-node scripts/delete-queue.ts my_queueOption 2 - Using RabbitMQ Management UI:
- Navigate to
http://localhost:15672(default credentials: guest/guest) - Go to the "Queues" tab
- Find and delete the problematic queue
Option 3 - Using RabbitMQ CLI:
rabbitmqadmin delete queue name=my_queueOption 4 - Using this library:
import { consumer } from 'rabbitmq-with-retry-and-dlq';
await consumer.deleteQueues('my_queue', {
includeRetry: true,
includeDLQ: true,
});Prevention:
Always use consumer.setupQueue() or consumer.assertQueues() to create queues with the proper retry configuration before consuming.
License
MIT
