rabbitmq-with-retry-and-dlq
v1.0.18
Published
RabbitMQ implementation with dynamic retry logic and Dead Letter Queue support
Downloads
1,796
Maintainers
Readme
RabbitMQ with Retry and Dead Letter Queue
A production-ready TypeScript RabbitMQ library with automatic retry, exponential backoff, and dead letter queue support.
Features
- ✨ Lazy Loading - Automatic connection on first use
- 🔄 Smart Retry - Exponential/linear backoff with jitter
- 💀 Dead Letter Queue - Auto-routing after max retries
- 🔌 Auto-Reconnection - Handles network issues automatically
- 📦 TypeScript - Full type safety
- 🎯 Exchange Support - Direct, topic, fanout, headers
Installation
npm install rabbitmq-with-retry-and-dlqUsage
1. Initialize (Required - Call Once with Await)
You must call await initializeRabbitMQ() once in your application setup or helper file:
import { initializeRabbitMQ, isPublisherConnected, isConsumerConnected } from 'rabbitmq-with-retry-and-dlq';
try {
// Call once at application startup (in your helper/setup file)
// Use await - this establishes the connection and waits for it to be ready
await initializeRabbitMQ('amqp://user:pass@localhost:5672');
// Check connection status (both should be true after await)
console.log('Publisher connected:', isPublisherConnected()); // true
console.log('Consumer connected:', isConsumerConnected()); // true
} catch (error) {
// Connection failed (wrong URL, RabbitMQ not running, timeout, etc.)
console.error('Failed to connect to RabbitMQ:', error);
process.exit(1); // Fail fast
}Error Handling:
- Throws error if URL is wrong
- Throws error if RabbitMQ is not running
- Throws error if connection timeout (default: 30 seconds)
- Returns only when connections are established
2. Lazy Loading (Automatic Connection)
After initialization, just use publisher/consumer - connection happens automatically:
import { publisher, consumer } from 'rabbitmq-with-retry-and-dlq';
// No additional setup needed - just use it!
// First call automatically establishes connection
await publisher.publishToQueue({
queueName: 'orders',
message: { orderId: 123 }
});3. Assert Queues (with Retry Config)
Note: assertQueues() is idempotent - it safely skips queues that already exist. You can call it multiple times without errors.
await publisher.assertQueues('orders', {
durable: true, // Queue survives restart (default: true)
retryConfig: {
maxRetries: 5, // Required
retryDelayMs: 2000, // Base delay (default: 5000ms)
backoffStrategy: 'exponential', // 'exponential' or 'linear' (default: exponential)
maxDelayMs: 300000, // Max delay cap (default: 300000 = 5min)
jitterMs: 1000, // Random jitter 0-1000ms (default: 0)
},
});
// Assert multiple queues
await publisher.assertQueues(['orders', 'payments', 'notifications'], {
retryConfig: { maxRetries: 3 }
});
// With exchange
await publisher.assertQueues('order_processing', {
exchangeName: 'orders_exchange',
exchangeType: 'direct', // 'direct' | 'topic' | 'fanout' | 'headers'
routingKey: 'order.created',
retryConfig: { maxRetries: 3 },
});4. Publish to Queue
await publisher.publishToQueue({
queueName: 'orders',
message: {
orderId: 'ORDER-123',
amount: 99.99,
},
options: {
persistent: true, // Survives broker restart (default: true)
priority: 5, // 0-10 priority (optional)
expiration: '5000' // TTL in ms (optional)
},
});5. Publish to Exchange
await publisher.publishToExchange({
exchangeName: 'orders_exchange',
exchangeType: 'direct',
queueName: 'order_processing',
routingKey: 'order.created',
message: { orderId: 'ORDER-456' },
options: {
persistent: true,
priority: 5,
},
});6. Consumer
// Set up error handler
consumer.on('error', (errorEvent) => {
console.error('RabbitMQ Error:', errorEvent);
if (errorEvent.type === 'DLQ_FAILED') {
console.error('CRITICAL: Message lost!');
}
});
// Consume messages
await consumer.consumeQueue({
queueName: 'orders',
onMessage: async (message, messageInfo) => {
console.log('Processing:', message);
// Your business logic
await processOrder(message);
// Success = auto-ack
// Throw error = retry with backoff
},
options: {
prefetch: 5, // Max unacked messages (default: 5)
noAck: false, // Auto-ack (default: false)
},
});
// Consume from exchange
await consumer.consumeQueue({
queueName: 'order_processing',
exchangeName: 'orders_exchange',
routingKey: 'order.created',
onMessage: async (message) => {
await processOrder(message);
},
});7. Graceful Shutdown (Required!)
import { closeRabbitMQ } from 'rabbitmq-with-retry-and-dlq';
const shutdown = async () => {
console.log('Shutting down...');
await closeRabbitMQ();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Retry Configuration
Retry Config Keys
interface RetryConfig {
maxRetries: number; // Required: max retry attempts
retryDelayMs?: number; // Optional: base delay in ms (default: 5000)
backoffStrategy?: string; // Optional: 'exponential' | 'linear' (default: 'exponential')
maxDelayMs?: number; // Optional: max delay cap (default: 300000)
jitterMs?: number; // Optional: random jitter range (default: 0)
}Exponential Backoff Example
retryConfig: {
maxRetries: 4,
retryDelayMs: 1000,
backoffStrategy: 'exponential'
}
// Delays: 1s → 2s → 4s → 8s → DLQLinear Backoff Example
retryConfig: {
maxRetries: 4,
retryDelayMs: 5000,
backoffStrategy: 'linear'
}
// Delays: 5s → 10s → 15s → 20s → DLQWith Jitter (Prevents Thundering Herd)
retryConfig: {
maxRetries: 3,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
jitterMs: 1000
}
// Retry 1: 2000ms + random(0-1000ms) = 2000-3000ms
// Retry 2: 4000ms + random(0-1000ms) = 4000-5000ms
// Retry 3: 8000ms + random(0-1000ms) = 8000-9000msComplete Example
import express from 'express';
import {
initializeRabbitMQ,
closeRabbitMQ,
publisher,
consumer,
isPublisherConnected,
isConsumerConnected,
type RabbitMQErrorEvent,
} from 'rabbitmq-with-retry-and-dlq';
const app = express();
app.use(express.json());
async function startApp() {
try {
// 1. Initialize RabbitMQ connection (required - use await)
console.log('Connecting to RabbitMQ...');
await initializeRabbitMQ('amqp://user:pass@localhost:5672');
console.log('✓ Publisher connected:', isPublisherConnected()); // true
console.log('✓ Consumer connected:', isConsumerConnected()); // true
// 2. Set up error handler
consumer.on('error', (errorEvent: RabbitMQErrorEvent) => {
console.error('RabbitMQ Error:', errorEvent);
});
// 3. Assert queues
await publisher.assertQueues('orders', {
retryConfig: {
maxRetries: 5,
retryDelayMs: 2000,
backoffStrategy: 'exponential',
jitterMs: 1000,
},
});
console.log('✓ Queues asserted');
// 4. Start consumer
await consumer.consumeQueue({
queueName: 'orders',
onMessage: async (message) => {
console.log('Processing order:', message);
await processOrder(message);
},
});
console.log('✓ Consumer started');
// 5. Start server
app.listen(3000, () => {
console.log('✓ Server running on port 3000');
});
} catch (error) {
console.error('❌ Failed to start app:', error);
console.error(' Make sure RabbitMQ is running and URL is correct');
process.exit(1);
}
}
// API endpoint
app.post('/orders', async (req, res) => {
try {
await publisher.publishToQueue({
queueName: 'orders',
message: req.body,
});
res.json({ success: true });
} catch (error) {
console.error('Failed to publish:', error);
res.status(500).json({ error: 'Failed to publish order' });
}
});
// Graceful shutdown
const shutdown = async () => {
console.log('Shutting down...');
await closeRabbitMQ();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
// Helper function
async function processOrder(order: any) {
// Your business logic here
console.log('Order processed:', order.orderId);
}
// Start the app
startApp();Additional Options
Queue Options
{
durable: true, // Queue survives restart (default: true)
exclusive: false, // Exclusive to connection (default: false)
autoDelete: false, // Auto-delete when unused (default: false)
}Publish Options
{
persistent: true, // Message survives restart (default: true)
priority: 5, // Priority 0-10 (optional)
expiration: '5000', // TTL in ms (optional)
}Consumer Options
{
prefetch: 5, // Max unacked messages (default: 5)
noAck: false, // Auto-acknowledge (default: false)
exclusive: false, // Exclusive consumer (default: false)
durable: true, // Queue survives restart (default: true)
}Error Event Types
RETRY_QUEUE_FAILED- Failed to send to retry queueDLQ_FAILED- Failed to send to DLQ (CRITICAL!)MESSAGE_PROCESSING_FAILED- Processing errorPUBLISH_FAILED- Publish failedCONNECTION_ERROR- Connection error
Queue Management
// Delete queue (includes retry and DLQ by default)
await publisher.deleteQueues('orders');
// Delete multiple queues
await publisher.deleteQueues(['orders', 'payments']);
// Delete only main queue
await publisher.deleteQueues('orders', {
includeRetry: false,
includeDLQ: false,
});Important Notes
Required (Must Implement)
- ⚠️ Must call
await initializeRabbitMQ(url)with try/catch - throws error on connection failure - ⚠️ Must implement graceful shutdown handlers (
SIGTERM,SIGINT) - ⚠️ Must monitor DLQ failures (indicates message loss)
Error Handling
initializeRabbitMQ()throws error if:- RabbitMQ URL is wrong
- RabbitMQ is not running
- Connection timeout (default: 30 seconds)
- Always wrap in try/catch for fail-fast behavior
Automatic (No Action Needed)
- ✅ Auto-reconnection on connection loss
- ✅ Lazy loading - connects on first use (after initialization)
- ✅ Message durability with
persistent: true - ✅ Queue durability with
durable: true - ✅ Thread-safe initialization
Best Practices
- Call
initializeRabbitMQ(url)once in your setup/helper file - Set
retryConfiginassertQueues()- auto-used when publishing assertQueues()is idempotent - safe to call multiple times (skips existing queues)- Use exponential backoff with jitter to prevent thundering herd
- Always set up error event handlers before consuming
- Use graceful shutdown in production
Troubleshooting
Error: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'
Problem:
Error: Operation failed: QueueDeclare; 406 (PRECONDITION_FAILED) with message
"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue
'my_queue' in vhost '/': received 'my_exchange' but current is ''"Cause: The queue already exists in RabbitMQ with different configuration arguments than what your code is trying to set.
Solution: Delete the existing queue and let your code recreate it with the correct configuration.
Option 1 - Using the utility script:
npx ts-node scripts/delete-queue.ts my_queueOption 2 - Using RabbitMQ Management UI:
- Navigate to
http://localhost:15672(default credentials: guest/guest) - Go to the "Queues" tab
- Find and delete the problematic queue
Option 3 - Using RabbitMQ CLI:
rabbitmqadmin delete queue name=my_queueOption 4 - Using this library:
import { publisher } from 'rabbitmq-with-retry-and-dlq';
await publisher.deleteQueues('my_queue', {
includeRetry: true,
includeDLQ: true
});Prevention:
Always use publisher.assertQueues() to create queues with the proper retry configuration before publishing or consuming.
License
MIT
