npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mrigind/rabbitmq-with-retry-and-dlq

v1.0.27

Published

RabbitMQ implementation with dynamic retry logic and Dead Letter Queue support

Downloads

4

Readme

RabbitMQ with Retry and Dead Letter Queue

A production-ready TypeScript RabbitMQ library with automatic retry, exponential backoff, and dead letter queue support.

Features

  • Lazy Loading - Automatic connection on first use
  • 🔄 Smart Retry - Exponential/linear backoff with jitter
  • 💀 Dead Letter Queue - Auto-routing after max retries
  • 🔌 Auto-Reconnection - Handles network issues automatically
  • 📦 TypeScript - Full type safety
  • 🎯 Exchange Support - Direct, topic, fanout, headers
  • 🏭 Industry Best Practice - Separation of concerns (Publisher→Exchange, Consumer→Queue)

Installation

npm install rabbitmq-with-retry-and-dlq

Quick Start (Best Practice)

import {
  initializeRabbitMQ,
  publisher,
  consumer,
} from 'rabbitmq-with-retry-and-dlq';

// 1. Initialize connection
await initializeRabbitMQ('amqp://localhost:5672');

// 2. Publisher: Assert exchange
await publisher.assertExchange('orders', { exchangeType: 'topic' });

// 3. Consumer: Setup queue with exchange and routing keys
await consumer.setupQueue({
  queueName: 'orders-processor',
  exchangeName: 'orders',
  exchangeType: 'topic',
  routingKeys: ['order.created', 'order.updated'],
  retryConfig: { maxRetries: 5, retryDelayMs: 2000 },
});

// 4. Consumer: Start consuming
await consumer.consumeQueue({
  queueName: 'orders-processor',
  onMessage: async (message) => {
    await processOrder(message);
  },
});

// 5. Publisher: Publish messages
await publisher.publishToExchange({
  exchangeName: 'orders',
  routingKey: 'order.created',
  message: { orderId: 123 },
});

Usage

1. Initialize (Required - Call Once with Await)

You must call await initializeRabbitMQ() once in your application setup or helper file:

import {
  initializeRabbitMQ,
  isPublisherConnected,
  isConsumerConnected,
} from 'rabbitmq-with-retry-and-dlq';

try {
  // Call once at application startup (in your helper/setup file)
  // Use await - this establishes the connection and waits for it to be ready
  await initializeRabbitMQ('amqp://user:pass@localhost:5672');

  // Check connection status (both should be true after await)
  console.log('Publisher connected:', isPublisherConnected()); // true
  console.log('Consumer connected:', isConsumerConnected()); // true
} catch (error) {
  // Connection failed (wrong URL, RabbitMQ not running, timeout, etc.)
  console.error('Failed to connect to RabbitMQ:', error);
  process.exit(1); // Fail fast
}

Error Handling:

  • Throws error if URL is wrong
  • Throws error if RabbitMQ is not running
  • Throws error if connection timeout (default: 30 seconds)
  • Returns only when connections are established

2. Lazy Loading (Automatic Connection)

After initialization, just use publisher/consumer - connection happens automatically:

import { publisher, consumer } from 'rabbitmq-with-retry-and-dlq';

// No additional setup needed - just use it!
// First call automatically establishes connection
await publisher.publishToQueue({
  queueName: 'orders',
  message: { orderId: 123 },
});

Industry Best Practice: Separation of Concerns

This library follows the RabbitMQ best practice pattern:

  • Publisher → Owns Exchanges (knows the topic/domain)
  • Consumer → Owns Queues and Bindings (decides what to subscribe to)

This is cleaner because:

  1. Publishers don't need to know about queues
  2. Consumers can create their own queues and choose their bindings
  3. Multiple consumers can bind differently to the same exchange

Publisher: Assert Exchange

import { publisher } from 'rabbitmq-with-retry-and-dlq';

// Assert a single exchange
await publisher.assertExchange('orders');

// Assert with exchange type
await publisher.assertExchange('events', { exchangeType: 'topic' });

// Assert multiple exchanges
await publisher.assertExchange(['orders', 'payments', 'notifications']);

// With options
await publisher.assertExchange('orders', {
  exchangeType: 'direct', // 'direct' | 'topic' | 'fanout' | 'headers' (default: 'direct')
  durable: true, // Survives broker restart (default: true)
});

Consumer: Assert Exchanges (Recommended!)

import { consumer } from 'rabbitmq-with-retry-and-dlq';

// IMPORTANT: Assert exchanges on consumer's connection BEFORE binding
// This prevents "NOT_FOUND - no exchange" race condition errors
await consumer.assertExchange('orders', { exchangeType: 'topic' });
await consumer.assertExchange('payments', { exchangeType: 'direct' });

// Or assert multiple exchanges at once
await consumer.assertExchange(['orders', 'payments'], {
  exchangeType: 'direct',
});

Why? Publisher and consumer use separate connections. Even if publisher creates the exchange, the consumer's connection might not see it immediately. Asserting exchanges on the consumer's connection ensures they exist before binding.

Consumer: Setup Queue (RECOMMENDED - Best Practice)

Use setupQueue() for complete setup in one atomic operation - prevents race conditions and supports multiple routing keys:

// ✅ RECOMMENDED: Complete setup in one call
await consumer.setupQueue({
  queueName: 'orders-processor',
  exchangeName: 'orders',
  exchangeType: 'topic',
  routingKeys: ['order.created', 'order.updated', 'order.cancelled'], // Multiple routing keys!
  retryConfig: {
    maxRetries: 5,
    retryDelayMs: 2000,
    backoffStrategy: 'exponential',
    jitterMs: 1000,
  },
});

// Then consume (queue already set up)
await consumer.consumeQueue({
  queueName: 'orders-processor',
  onMessage: async (message) => {
    console.log('Processing order:', message);
    await processOrder(message);
  },
});

What setupQueue() does:

  1. ✅ Asserts exchange
  2. ✅ Creates queue with retry/DLQ infrastructure
  3. ✅ Binds queue to exchange with all routing keys
  4. ✅ All in ONE atomic operation (no race conditions)

Consumer: Assert Queues (Alternative - More Control)

Use assertQueues() when you need more control or want to add bindings dynamically:

// Assert a single queue
await consumer.assertQueues('orders-worker');

// Assert multiple queues
await consumer.assertQueues(['orders-worker', 'payments-worker']);

// Assert queue with exchange binding and retry/DLQ configuration
await consumer.assertQueues('orders-worker', {
  exchangeName: 'orders',
  exchangeType: 'direct',
  routingKey: 'order.created', // Single routing key
  retryConfig: {
    maxRetries: 5,
    retryDelayMs: 2000,
    backoffStrategy: 'exponential',
  },
});

Consumer: Bind Queue to Exchange (For Additional Bindings)

Use bindQueue() to add more routing key bindings after initial setup:

// IMPORTANT: Call consumer.assertExchange() FIRST to prevent race conditions!

// Bind queue to exchange with routing key
await consumer.bindQueue('orders-worker', 'orders', 'order.created', {
  exchangeType: 'topic', // Ensures exchange exists (recommended)
});

// Bind with pattern (for topic exchanges)
await consumer.bindQueue('all-orders', 'events', 'order.*', {
  exchangeType: 'topic',
});

// Add multiple bindings to same queue
await consumer.bindQueue('orders-worker', 'orders', 'order.created');
await consumer.bindQueue('orders-worker', 'orders', 'order.updated');
await consumer.bindQueue('orders-worker', 'orders', 'order.cancelled');

Complete Best Practice Example (RECOMMENDED)

// === PUBLISHER SIDE ===
// Publisher only knows about exchanges
await publisher.assertExchange('orders', { exchangeType: 'topic' });

await publisher.publishToExchange({
  exchangeName: 'orders',
  routingKey: 'order.created',
  message: { orderId: 123, amount: 99.99 },
});

// === CONSUMER SIDE ===
// Step 1: Setup queue with exchange and multiple routing keys (RECOMMENDED)
await consumer.setupQueue({
  queueName: 'orders-processor',
  exchangeName: 'orders',
  exchangeType: 'topic',
  routingKeys: ['order.created', 'order.updated'], // Multiple keys!
  retryConfig: { maxRetries: 3, retryDelayMs: 5000 },
});

// Step 2: Start consuming (queue already set up)
await consumer.consumeQueue({
  queueName: 'orders-processor',
  onMessage: async (message) => {
    console.log('Processing order:', message);
    await processOrder(message);
  },
});

Alternative: Step-by-Step Setup

If you need more granular control, use separate methods:

// Step 1: Assert exchanges on consumer's connection (defensive)
await consumer.assertExchange('orders', { exchangeType: 'topic' });

// Step 2: Create queues with retry/DLQ config
await consumer.assertQueues('orders-processor', {
  retryConfig: { maxRetries: 3, retryDelayMs: 5000 },
});

// Step 3: Bind queues to exchanges (can add multiple routing keys)
await consumer.bindQueue('orders-processor', 'orders', 'order.created', {
  exchangeType: 'topic',
});
await consumer.bindQueue('orders-processor', 'orders', 'order.updated', {
  exchangeType: 'topic',
});

// Step 4: Start consuming
await consumer.consumeQueue({
  queueName: 'orders-processor',
  onMessage: async (message) => {
    await processOrder(message);
  },
});

Direct Queue Publishing (Without Exchange)

For simple point-to-point messaging without exchanges:

Publish to Queue

// Note: Queue should be set up by consumer first using assertQueues() or setupQueue()

await publisher.publishToQueue({
  queueName: 'orders',
  message: {
    orderId: 'ORDER-123',
    amount: 99.99,
  },
  options: {
    persistent: true, // Survives broker restart (default: true)
    priority: 5, // 0-10 priority (optional)
    expiration: '5000', // TTL in ms (optional)
  },
  // Optional: retryConfig if queue wasn't set up with retry
  retryConfig: {
    maxRetries: 5,
    retryDelayMs: 2000,
  },
});

Publish to Exchange

// Publisher only needs exchange and routing key (not queue!)
await publisher.publishToExchange({
  exchangeName: 'orders_exchange',
  exchangeType: 'direct',
  routingKey: 'order.created',
  message: { orderId: 'ORDER-456' },
  options: {
    persistent: true,
    priority: 5,
  },
});

Consumer (Best Practice)

// Set up error handler FIRST
consumer.on('error', (errorEvent) => {
  console.error('RabbitMQ Error:', errorEvent);
  if (errorEvent.type === 'DLQ_FAILED') {
    console.error('CRITICAL: Message lost!');
  }
});

// Step 1: Setup queue with exchange and routing keys
await consumer.setupQueue({
  queueName: 'orders-processor',
  exchangeName: 'orders_exchange',
  exchangeType: 'direct',
  routingKeys: ['order.created', 'order.updated'],
  retryConfig: {
    maxRetries: 5,
    retryDelayMs: 2000,
    backoffStrategy: 'exponential',
  },
});

// Step 2: Start consuming (queue already set up)
await consumer.consumeQueue({
  queueName: 'orders-processor',
  onMessage: async (message, messageInfo) => {
    console.log('Processing:', message);

    // Your business logic
    await processOrder(message);

    // Success = auto-ack
    // Throw error = retry with backoff → DLQ
  },
  options: {
    prefetch: 5, // Max unacked messages (default: 5)
    noAck: false, // Auto-ack (default: false)
  },
});

7. Graceful Shutdown (Required!)

import { closeRabbitMQ } from 'rabbitmq-with-retry-and-dlq';

const shutdown = async () => {
  console.log('Shutting down...');
  await closeRabbitMQ();
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Retry Configuration

Retry Config Keys

interface RetryConfig {
  maxRetries: number; // Required: max retry attempts
  retryDelayMs?: number; // Optional: base delay in ms (default: 5000)
  backoffStrategy?: string; // Optional: 'exponential' | 'linear' (default: 'exponential')
  maxDelayMs?: number; // Optional: max delay cap (default: 300000)
  jitterMs?: number; // Optional: random jitter range (default: 0)
}

Exponential Backoff Example

retryConfig: {
  maxRetries: 4,
  retryDelayMs: 1000,
  backoffStrategy: 'exponential'
}
// Delays: 1s → 2s → 4s → 8s → DLQ

Linear Backoff Example

retryConfig: {
  maxRetries: 4,
  retryDelayMs: 5000,
  backoffStrategy: 'linear'
}
// Delays: 5s → 10s → 15s → 20s → DLQ

With Jitter (Prevents Thundering Herd)

retryConfig: {
  maxRetries: 3,
  retryDelayMs: 2000,
  backoffStrategy: 'exponential',
  jitterMs: 1000
}
// Retry 1: 2000ms + random(0-1000ms) = 2000-3000ms
// Retry 2: 4000ms + random(0-1000ms) = 4000-5000ms
// Retry 3: 8000ms + random(0-1000ms) = 8000-9000ms

Complete Example (Best Practice Pattern)

import express from 'express';
import {
  initializeRabbitMQ,
  closeRabbitMQ,
  publisher,
  consumer,
  isPublisherConnected,
  isConsumerConnected,
  type RabbitMQErrorEvent,
} from 'rabbitmq-with-retry-and-dlq';

const app = express();
app.use(express.json());

async function startApp() {
  try {
    // 1. Initialize RabbitMQ connection (required - use await)
    console.log('Connecting to RabbitMQ...');
    await initializeRabbitMQ('amqp://user:pass@localhost:5672');

    console.log('✓ Publisher connected:', isPublisherConnected()); // true
    console.log('✓ Consumer connected:', isConsumerConnected()); // true

    // 2. Set up error handler
    consumer.on('error', (errorEvent: RabbitMQErrorEvent) => {
      console.error('RabbitMQ Error:', errorEvent);
    });

    // 3. Publisher: Assert exchange (publisher owns exchanges)
    await publisher.assertExchange('orders', { exchangeType: 'topic' });
    console.log('✓ Publisher exchange asserted');

    // 4. Consumer: Setup queue with exchange and routing keys (RECOMMENDED)
    await consumer.setupQueue({
      queueName: 'orders-processor',
      exchangeName: 'orders',
      exchangeType: 'topic',
      routingKeys: ['order.created', 'order.updated'],
      retryConfig: {
        maxRetries: 5,
        retryDelayMs: 2000,
        backoffStrategy: 'exponential',
        jitterMs: 1000,
      },
    });
    console.log('✓ Queue setup complete');

    // 5. Start consumer (queue already set up)
    await consumer.consumeQueue({
      queueName: 'orders-processor',
      onMessage: async (message) => {
        console.log('Processing order:', message);
        await processOrder(message);
      },
    });
    console.log('✓ Consumer started');

    // 8. Start server
    app.listen(3000, () => {
      console.log('✓ Server running on port 3000');
    });
  } catch (error) {
    console.error('❌ Failed to start app:', error);
    console.error('   Make sure RabbitMQ is running and URL is correct');
    process.exit(1);
  }
}

// API endpoint - Publisher only knows about exchanges (not queues!)
app.post('/orders', async (req, res) => {
  try {
    await publisher.publishToExchange({
      exchangeName: 'orders',
      routingKey: 'order.created',
      message: req.body,
    });
    res.json({ success: true });
  } catch (error) {
    console.error('Failed to publish:', error);
    res.status(500).json({ error: 'Failed to publish order' });
  }
});

// Graceful shutdown
const shutdown = async () => {
  console.log('Shutting down...');
  await closeRabbitMQ();
  process.exit(0);
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

// Helper function
async function processOrder(order: any) {
  // Your business logic here
  console.log('Order processed:', order.orderId);
}

// Start the app
startApp();

API Reference

Publisher Methods

| Method | Description | | --------------------------------- | -------------------------------------------------------------- | | assertExchange(names, options?) | Create exchange(s) - Publisher owns exchanges | | publishToExchange(config) | Publish to exchange with routing key | | publishToQueue(config) | Publish directly to queue (queue should be set up by consumer) | | publishBatch(configs) | Publish multiple messages | | close() | Close publisher connection |

Consumer Methods

| Method | Description | | -------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | | setupQueue(config) | ⭐ RECOMMENDED - Complete setup: exchange + queue + bindings + retry/DLQ in one atomic operation | | assertExchange(names, options?) | Assert exchange(s) on consumer connection (defensive) | | assertQueues(names, options?) | Create queue(s) with optional exchange binding and retry/DLQ config | | bindQueue(queue, exchange, routingKey, options?) | Bind queue to exchange (for additional routing keys) | | deleteQueues(names, options?) | Delete queue(s) and related retry/DLQ queues | | consumeQueue(config) | Start consuming from queue (queue must be set up first) | | consumeMultipleQueues(config) | Consume from multiple queues | | consumeWithPattern(exchange, pattern, handler) | Consume with pattern matching (topic) | | getConsumerStats() | Get consumer statistics | | close() | Close consumer connection |


Additional Options

Exchange Options (for assertExchange)

{
  exchangeType: 'direct',  // 'direct' | 'topic' | 'fanout' | 'headers' (default: 'direct')
  durable: true,           // Exchange survives restart (default: true)
}

Queue Options (for assertQueues)

{
  durable: true,        // Queue survives restart (default: true)
  exclusive: false,     // Exclusive to connection (default: false)
  autoDelete: false,    // Auto-delete when unused (default: false)
  retryConfig: {...},   // Optional: retry and DLQ configuration
}

Bind Queue (consumer.bindQueue)

await consumer.bindQueue(
  'queue-name', // Queue to bind
  'exchange-name', // Exchange to bind to
  'routing.key', // Routing key or pattern (for topic exchanges)
  {
    // Options (optional)
    exchangeType: 'topic', // If provided, asserts exchange before binding (RECOMMENDED)
    durable: true, // Exchange durability (default: true)
  }
);

// Pattern examples for topic exchanges:
// 'order.*'         - matches order.created, order.updated
// 'order.#'         - matches order.created, order.created.v2
// '*.created'       - matches order.created, payment.created

Note: When publisher and consumer use separate connections, there can be race conditions where the consumer tries to bind before the exchange is fully created. Specifying exchangeType in bindQueue ensures the exchange exists on the consumer's connection (idempotent operation).

Publish Options

{
  persistent: true,     // Message survives restart (default: true)
  priority: 5,          // Priority 0-10 (optional)
  expiration: '5000',   // TTL in ms (optional)
}

Consumer Options

{
  prefetch: 5,          // Max unacked messages (default: 5)
  noAck: false,         // Auto-acknowledge (default: false)
  exclusive: false,     // Exclusive consumer (default: false)
  durable: true,        // Queue survives restart (default: true)
}

Error Event Types

  • RETRY_QUEUE_FAILED - Failed to send to retry queue
  • DLQ_FAILED - Failed to send to DLQ (CRITICAL!)
  • MESSAGE_PROCESSING_FAILED - Processing error
  • PUBLISH_FAILED - Publish failed
  • CONNECTION_ERROR - Connection error

Queue Management

// Delete queue (includes retry and DLQ by default)
await consumer.deleteQueues('orders');

// Delete multiple queues
await consumer.deleteQueues(['orders', 'payments']);

// Delete only main queue
await consumer.deleteQueues('orders', {
  includeRetry: false,
  includeDLQ: false,
});

Important Notes

Required (Must Implement)

  • ⚠️ Must call await initializeRabbitMQ(url) with try/catch - throws error on connection failure
  • ⚠️ Must implement graceful shutdown handlers (SIGTERM, SIGINT)
  • ⚠️ Must monitor DLQ failures (indicates message loss)

Error Handling

  • initializeRabbitMQ() throws error if:
    • RabbitMQ URL is wrong
    • RabbitMQ is not running
    • Connection timeout (default: 30 seconds)
  • Always wrap in try/catch for fail-fast behavior

Automatic (No Action Needed)

  • ✅ Auto-reconnection on connection loss
  • ✅ Lazy loading - connects on first use (after initialization)
  • ✅ Message durability with persistent: true
  • ✅ Queue durability with durable: true
  • ✅ Thread-safe initialization

Best Practices

  • Call initializeRabbitMQ(url) once in your setup/helper file
  • Use separation of concerns pattern:
    • Publisher → assertExchange() + publishToExchange() (owns exchanges)
    • Consumer → setupQueue() or assertQueues() + bindQueue() (owns queues and bindings)
  • Recommended workflow:
    1. Setup infrastructure at startup: consumer.setupQueue() or consumer.assertQueues()
    2. Start consuming: consumer.consumeQueue()
  • Set retryConfig in setupQueue() or assertQueues() - auto-used when processing
  • assertExchange(), assertQueues(), setupQueue(), and bindQueue() are idempotent - safe to call multiple times
  • Use exponential backoff with jitter to prevent thundering herd
  • Always set up error event handlers before consuming
  • Use graceful shutdown in production

Troubleshooting

Error: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'

Problem:

Error: Operation failed: QueueDeclare; 406 (PRECONDITION_FAILED) with message
"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue
'my_queue' in vhost '/': received 'my_exchange' but current is ''"

Cause: The queue already exists in RabbitMQ with different configuration arguments than what your code is trying to set.

Solution: Delete the existing queue and let your code recreate it with the correct configuration.

Option 1 - Using the utility script:

npx ts-node scripts/delete-queue.ts my_queue

Option 2 - Using RabbitMQ Management UI:

  1. Navigate to http://localhost:15672 (default credentials: guest/guest)
  2. Go to the "Queues" tab
  3. Find and delete the problematic queue

Option 3 - Using RabbitMQ CLI:

rabbitmqadmin delete queue name=my_queue

Option 4 - Using this library:

import { consumer } from 'rabbitmq-with-retry-and-dlq';

await consumer.deleteQueues('my_queue', {
  includeRetry: true,
  includeDLQ: true,
});

Prevention: Always use consumer.setupQueue() or consumer.assertQueues() to create queues with the proper retry configuration before consuming.


License

MIT