npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

rabbitmq-with-retry-and-dlq

v1.0.18

Published

RabbitMQ implementation with dynamic retry logic and Dead Letter Queue support

Downloads

1,796

Readme

RabbitMQ with Retry and Dead Letter Queue

A production-ready TypeScript RabbitMQ library with automatic retry, exponential backoff, and dead letter queue support.

Features

  • Lazy Loading - Automatic connection on first use
  • 🔄 Smart Retry - Exponential/linear backoff with jitter
  • 💀 Dead Letter Queue - Auto-routing after max retries
  • 🔌 Auto-Reconnection - Handles network issues automatically
  • 📦 TypeScript - Full type safety
  • 🎯 Exchange Support - Direct, topic, fanout, headers

Installation

npm install rabbitmq-with-retry-and-dlq

Usage

1. Initialize (Required - Call Once with Await)

You must call await initializeRabbitMQ() once in your application setup or helper file:

import { initializeRabbitMQ, isPublisherConnected, isConsumerConnected } from 'rabbitmq-with-retry-and-dlq';

try {
  // Call once at application startup (in your helper/setup file)
  // Use await - this establishes the connection and waits for it to be ready
  await initializeRabbitMQ('amqp://user:pass@localhost:5672');

  // Check connection status (both should be true after await)
  console.log('Publisher connected:', isPublisherConnected()); // true
  console.log('Consumer connected:', isConsumerConnected());   // true
} catch (error) {
  // Connection failed (wrong URL, RabbitMQ not running, timeout, etc.)
  console.error('Failed to connect to RabbitMQ:', error);
  process.exit(1); // Fail fast
}

Error Handling:

  • Throws error if URL is wrong
  • Throws error if RabbitMQ is not running
  • Throws error if connection timeout (default: 30 seconds)
  • Returns only when connections are established

2. Lazy Loading (Automatic Connection)

After initialization, just use publisher/consumer - connection happens automatically:

import { publisher, consumer } from 'rabbitmq-with-retry-and-dlq';

// No additional setup needed - just use it!
// First call automatically establishes connection
await publisher.publishToQueue({
  queueName: 'orders',
  message: { orderId: 123 }
});

3. Assert Queues (with Retry Config)

Note: assertQueues() is idempotent - it safely skips queues that already exist. You can call it multiple times without errors.

await publisher.assertQueues('orders', {
  durable: true,                    // Queue survives restart (default: true)
  retryConfig: {
    maxRetries: 5,                  // Required
    retryDelayMs: 2000,             // Base delay (default: 5000ms)
    backoffStrategy: 'exponential', // 'exponential' or 'linear' (default: exponential)
    maxDelayMs: 300000,             // Max delay cap (default: 300000 = 5min)
    jitterMs: 1000,                 // Random jitter 0-1000ms (default: 0)
  },
});

// Assert multiple queues
await publisher.assertQueues(['orders', 'payments', 'notifications'], {
  retryConfig: { maxRetries: 3 }
});

// With exchange
await publisher.assertQueues('order_processing', {
  exchangeName: 'orders_exchange',
  exchangeType: 'direct',           // 'direct' | 'topic' | 'fanout' | 'headers'
  routingKey: 'order.created',
  retryConfig: { maxRetries: 3 },
});

4. Publish to Queue

await publisher.publishToQueue({
  queueName: 'orders',
  message: {
    orderId: 'ORDER-123',
    amount: 99.99,
  },
  options: {
    persistent: true,  // Survives broker restart (default: true)
    priority: 5,       // 0-10 priority (optional)
    expiration: '5000' // TTL in ms (optional)
  },
});

5. Publish to Exchange

await publisher.publishToExchange({
  exchangeName: 'orders_exchange',
  exchangeType: 'direct',
  queueName: 'order_processing',
  routingKey: 'order.created',
  message: { orderId: 'ORDER-456' },
  options: {
    persistent: true,
    priority: 5,
  },
});

6. Consumer

// Set up error handler
consumer.on('error', (errorEvent) => {
  console.error('RabbitMQ Error:', errorEvent);
  if (errorEvent.type === 'DLQ_FAILED') {
    console.error('CRITICAL: Message lost!');
  }
});

// Consume messages
await consumer.consumeQueue({
  queueName: 'orders',
  onMessage: async (message, messageInfo) => {
    console.log('Processing:', message);
    
    // Your business logic
    await processOrder(message);
    
    // Success = auto-ack
    // Throw error = retry with backoff
  },
  options: {
    prefetch: 5,      // Max unacked messages (default: 5)
    noAck: false,     // Auto-ack (default: false)
  },
});

// Consume from exchange
await consumer.consumeQueue({
  queueName: 'order_processing',
  exchangeName: 'orders_exchange',
  routingKey: 'order.created',
  onMessage: async (message) => {
    await processOrder(message);
  },
});

7. Graceful Shutdown (Required!)

import { closeRabbitMQ } from 'rabbitmq-with-retry-and-dlq';

const shutdown = async () => {
  console.log('Shutting down...');
  await closeRabbitMQ();
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Retry Configuration

Retry Config Keys

interface RetryConfig {
  maxRetries: number;        // Required: max retry attempts
  retryDelayMs?: number;     // Optional: base delay in ms (default: 5000)
  backoffStrategy?: string;  // Optional: 'exponential' | 'linear' (default: 'exponential')
  maxDelayMs?: number;       // Optional: max delay cap (default: 300000)
  jitterMs?: number;         // Optional: random jitter range (default: 0)
}

Exponential Backoff Example

retryConfig: {
  maxRetries: 4,
  retryDelayMs: 1000,
  backoffStrategy: 'exponential'
}
// Delays: 1s → 2s → 4s → 8s → DLQ

Linear Backoff Example

retryConfig: {
  maxRetries: 4,
  retryDelayMs: 5000,
  backoffStrategy: 'linear'
}
// Delays: 5s → 10s → 15s → 20s → DLQ

With Jitter (Prevents Thundering Herd)

retryConfig: {
  maxRetries: 3,
  retryDelayMs: 2000,
  backoffStrategy: 'exponential',
  jitterMs: 1000
}
// Retry 1: 2000ms + random(0-1000ms) = 2000-3000ms
// Retry 2: 4000ms + random(0-1000ms) = 4000-5000ms
// Retry 3: 8000ms + random(0-1000ms) = 8000-9000ms

Complete Example

import express from 'express';
import {
  initializeRabbitMQ,
  closeRabbitMQ,
  publisher,
  consumer,
  isPublisherConnected,
  isConsumerConnected,
  type RabbitMQErrorEvent,
} from 'rabbitmq-with-retry-and-dlq';

const app = express();
app.use(express.json());

async function startApp() {
  try {
    // 1. Initialize RabbitMQ connection (required - use await)
    console.log('Connecting to RabbitMQ...');
    await initializeRabbitMQ('amqp://user:pass@localhost:5672');
    
    console.log('✓ Publisher connected:', isPublisherConnected()); // true
    console.log('✓ Consumer connected:', isConsumerConnected());   // true

    // 2. Set up error handler
    consumer.on('error', (errorEvent: RabbitMQErrorEvent) => {
      console.error('RabbitMQ Error:', errorEvent);
    });

    // 3. Assert queues
    await publisher.assertQueues('orders', {
      retryConfig: {
        maxRetries: 5,
        retryDelayMs: 2000,
        backoffStrategy: 'exponential',
        jitterMs: 1000,
      },
    });
    console.log('✓ Queues asserted');

    // 4. Start consumer
    await consumer.consumeQueue({
      queueName: 'orders',
      onMessage: async (message) => {
        console.log('Processing order:', message);
        await processOrder(message);
      },
    });
    console.log('✓ Consumer started');

    // 5. Start server
    app.listen(3000, () => {
      console.log('✓ Server running on port 3000');
    });
  } catch (error) {
    console.error('❌ Failed to start app:', error);
    console.error('   Make sure RabbitMQ is running and URL is correct');
    process.exit(1);
  }
}

// API endpoint
app.post('/orders', async (req, res) => {
  try {
    await publisher.publishToQueue({
      queueName: 'orders',
      message: req.body,
    });
    res.json({ success: true });
  } catch (error) {
    console.error('Failed to publish:', error);
    res.status(500).json({ error: 'Failed to publish order' });
  }
});

// Graceful shutdown
const shutdown = async () => {
  console.log('Shutting down...');
  await closeRabbitMQ();
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

// Helper function
async function processOrder(order: any) {
  // Your business logic here
  console.log('Order processed:', order.orderId);
}

// Start the app
startApp();

Additional Options

Queue Options

{
  durable: true,        // Queue survives restart (default: true)
  exclusive: false,     // Exclusive to connection (default: false)
  autoDelete: false,    // Auto-delete when unused (default: false)
}

Publish Options

{
  persistent: true,     // Message survives restart (default: true)
  priority: 5,          // Priority 0-10 (optional)
  expiration: '5000',   // TTL in ms (optional)
}

Consumer Options

{
  prefetch: 5,          // Max unacked messages (default: 5)
  noAck: false,         // Auto-acknowledge (default: false)
  exclusive: false,     // Exclusive consumer (default: false)
  durable: true,        // Queue survives restart (default: true)
}

Error Event Types

  • RETRY_QUEUE_FAILED - Failed to send to retry queue
  • DLQ_FAILED - Failed to send to DLQ (CRITICAL!)
  • MESSAGE_PROCESSING_FAILED - Processing error
  • PUBLISH_FAILED - Publish failed
  • CONNECTION_ERROR - Connection error

Queue Management

// Delete queue (includes retry and DLQ by default)
await publisher.deleteQueues('orders');

// Delete multiple queues
await publisher.deleteQueues(['orders', 'payments']);

// Delete only main queue
await publisher.deleteQueues('orders', {
  includeRetry: false,
  includeDLQ: false,
});

Important Notes

Required (Must Implement)

  • ⚠️ Must call await initializeRabbitMQ(url) with try/catch - throws error on connection failure
  • ⚠️ Must implement graceful shutdown handlers (SIGTERM, SIGINT)
  • ⚠️ Must monitor DLQ failures (indicates message loss)

Error Handling

  • initializeRabbitMQ() throws error if:
    • RabbitMQ URL is wrong
    • RabbitMQ is not running
    • Connection timeout (default: 30 seconds)
  • Always wrap in try/catch for fail-fast behavior

Automatic (No Action Needed)

  • ✅ Auto-reconnection on connection loss
  • ✅ Lazy loading - connects on first use (after initialization)
  • ✅ Message durability with persistent: true
  • ✅ Queue durability with durable: true
  • ✅ Thread-safe initialization

Best Practices

  • Call initializeRabbitMQ(url) once in your setup/helper file
  • Set retryConfig in assertQueues() - auto-used when publishing
  • assertQueues() is idempotent - safe to call multiple times (skips existing queues)
  • Use exponential backoff with jitter to prevent thundering herd
  • Always set up error event handlers before consuming
  • Use graceful shutdown in production

Troubleshooting

Error: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'

Problem:

Error: Operation failed: QueueDeclare; 406 (PRECONDITION_FAILED) with message 
"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 
'my_queue' in vhost '/': received 'my_exchange' but current is ''"

Cause: The queue already exists in RabbitMQ with different configuration arguments than what your code is trying to set.

Solution: Delete the existing queue and let your code recreate it with the correct configuration.

Option 1 - Using the utility script:

npx ts-node scripts/delete-queue.ts my_queue

Option 2 - Using RabbitMQ Management UI:

  1. Navigate to http://localhost:15672 (default credentials: guest/guest)
  2. Go to the "Queues" tab
  3. Find and delete the problematic queue

Option 3 - Using RabbitMQ CLI:

rabbitmqadmin delete queue name=my_queue

Option 4 - Using this library:

import { publisher } from 'rabbitmq-with-retry-and-dlq';

await publisher.deleteQueues('my_queue', {
  includeRetry: true,
  includeDLQ: true
});

Prevention: Always use publisher.assertQueues() to create queues with the proper retry configuration before publishing or consuming.


License

MIT