npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@slzsid/rabbitmq-multinode-connector

v0.3.2

Published

Enterprise-grade RabbitMQ client with connection pooling, circuit breaker, cluster support, and comprehensive monitoring

Readme

RabbitMQ Multi-Node Connector

npm version TypeScript Node License: MIT

A robust, production-ready RabbitMQ client for Node.js and TypeScript with advanced features including connection pooling, circuit breaker pattern, cluster failover, and comprehensive monitoring.

Table of Contents

Features

  • Automatic Reconnection with exponential backoff
  • Connection Pooling for optimal resource utilization
  • Circuit Breaker Pattern for fault tolerance
  • Cluster Support with intelligent failover
  • Real-time Metrics and health monitoring
  • SSL/TLS Support for secure connections
  • Message Batching for improved throughput
  • Channel Recovery with automatic healing
  • Comprehensive Logging with structured data
  • Full TypeScript Support with type definitions
  • Event-Driven Architecture with comprehensive events

Installation

# Using npm
npm install @slzsid/rabbitmq-multinode-connector

# Using yarn
yarn add @slzsid/rabbitmq-multinode-connector

# Using pnpm
pnpm add @slzsid/rabbitmq-multinode-connector

Requirements:

  • Node.js >= 18.0.0
  • TypeScript >= 5.0 (optional, for TypeScript projects)

Quick Start

Basic Usage

import RabbitMQClient from '@slzsid/rabbitmq-multinode-connector';

// Create a client instance
const client = new RabbitMQClient({
  urls: ['amqp://localhost:5672'],
  connectionName: 'my-app-connection',
  prefetchCount: 10,
});

// Connect to RabbitMQ
await client.connect();

// Publish a message
await client.publish('my-exchange', 'routing.key', Buffer.from('Hello World'));

// Consume messages (auto-acknowledgment on success)
const consumerTag = await client.consume('my-queue', async (msg) => {
  if (msg) {
    console.log('Received:', msg.content.toString());
    // No need to manually ack - done automatically on success
    // Throwing an error will auto-nack and requeue the message
  }
});

// Graceful shutdown
process.on('SIGINT', async () => {
  await client.gracefulShutdown();
  process.exit(0);
});

CommonJS

const RabbitMQClient = require('@slzsid/rabbitmq-multinode-connector').default;

// Same usage as above

Configuration

Basic Configuration

interface RabbitMQOptions {
  url?: string | Options.Connect;
  heartbeat?: number; // Connection heartbeat (1-60 seconds)
  connectionName?: string; // For debugging/monitoring
  prefetchCount?: number; // Messages to prefetch per channel
  prefetchGlobal?: boolean; // Global prefetch setting
  reconnectDelay?: number; // Base reconnection delay (ms)
  maxReconnectAttempts?: number; // Max reconnection attempts (-1 = infinite)
  exponentialBackoff?: boolean; // Use exponential backoff
  connectionTimeout?: number; // Connection timeout (ms)
  vhost?: string; // Virtual host
}

Advanced Configuration

const client = new RabbitMQClient({
  // Cluster configuration
  urls: [
    'amqp://node1.rabbitmq.local:5672',
    'amqp://node2.rabbitmq.local:5672',
    'amqp://node3.rabbitmq.local:5672',
  ],
  failoverStrategy: 'round-robin', // or 'random'

  // Connection pooling
  poolConfig: {
    maxChannels: 20,
    acquireTimeout: 5000,
  },

  // Circuit breaker
  circuitBreaker: {
    failureThreshold: 5,
    resetTimeout: 30000,
  },

  // Message batching
  batchConfig: {
    size: 100,
    timeoutMs: 1000,
  },

  // SSL/TLS
  ssl: {
    enabled: true,
    validate: true,
    ca: ['path/to/ca.pem'],
    cert: 'path/to/cert.pem',
    key: 'path/to/key.pem',
  },

  // Cluster options
  clusterOptions: {
    retryConnectTimeout: 5000,
    nodeRecoveryInterval: 30000,
    shuffleNodes: true,
    priorityNodes: ['amqp://primary.rabbitmq.local:5672'],
  },

  // Channel recovery
  channelOptions: {
    maxRetries: 3,
    retryDelay: 1000,
    autoRecovery: true,
  },
});

Core Algorithms

1. Exponential Backoff Reconnection

The client implements an intelligent reconnection strategy with exponential backoff and jitter:

private calculateReconnectDelay(): number {
  const baseDelay = this.options.reconnectDelay || 1000;
  const maxDelay = 60000; // 1 minute max

  if (!this.options.exponentialBackoff) {
    return baseDelay;
  }

  // Exponential backoff: delay = baseDelay * 2^attempts
  const exponentialDelay = Math.min(
    baseDelay * Math.pow(2, this.reconnectAttempts),
    maxDelay
  );

  // Add jitter (±20%) to prevent thundering herd
  const jitter = exponentialDelay * 0.2 * (Math.random() * 2 - 1);
  return Math.max(baseDelay, Math.min(exponentialDelay + jitter, maxDelay));
}

Algorithm Benefits:

  • Prevents overwhelming the broker during outages
  • Reduces connection storms in distributed systems
  • Adaptive delay based on failure frequency

2. Circuit Breaker Pattern

Protects against cascading failures by monitoring error rates:

// Circuit breaker states: CLOSED → OPEN → HALF_OPEN → CLOSED
private circuitBreaker = {
  failures: 0,
  isOpen: false,
  lastFailure: null as Date | null,
};

// Opens circuit when failure threshold is reached
private handleConnectionError(error: unknown, reject: (error: Error) => void): void {
  this.circuitBreaker.failures++;
  this.circuitBreaker.isOpen =
    this.circuitBreaker.failures >= (this.options.circuitBreaker?.failureThreshold ?? 5);
  this.circuitBreaker.lastFailure = new Date();
}

States:

  • CLOSED: Normal operation, requests pass through
  • OPEN: Failures exceed threshold, requests fail fast
  • HALF_OPEN: Testing if service has recovered

3. Channel Pool Management

Efficient channel reuse with automatic cleanup:

interface ChannelPool {
  channels: (Channel | ConfirmChannel)[];
  maxChannels: number;
  inUse: Set<Channel | ConfirmChannel>;
}

public async getChannel(): Promise<Channel | ConfirmChannel> {
  // 1. Try to get available channel from pool
  const availableChannel = this.channelPool.channels.find(
    (ch) => !this.channelPool.inUse.has(ch) && this.isChannelOpen(ch)
  );

  if (availableChannel) {
    this.channelPool.inUse.add(availableChannel);
    return availableChannel;
  }

  // 2. Create new channel if under limit
  if (this.channelPool.channels.length < this.channelPool.maxChannels) {
    const newChannel = await this.connection.createConfirmChannel();
    this.channelPool.channels.push(newChannel);
    this.channelPool.inUse.add(newChannel);
    return newChannel;
  }

  // 3. Wait for channel to become available
  return this.waitForAvailableChannel();
}

4. Cluster Failover Strategy

Intelligent node selection with health tracking:

private getSortedNodes(nodes: string[]): string[] {
  const { failoverStrategy, clusterOptions } = this.options;

  // 1. Priority nodes first
  const priorityNodes = clusterOptions?.priorityNodes || [];
  let sortedNodes = [
    ...priorityNodes.filter(node => nodes.includes(node)),
    ...nodes.filter(node => !priorityNodes.includes(node))
  ];

  // 2. Apply failover strategy
  if (failoverStrategy === 'random') {
    sortedNodes = sortedNodes.sort(() => Math.random() - 0.5);
  } else if (failoverStrategy === 'round-robin') {
    const rotateAmount = this.currentUrlIndex % sortedNodes.length;
    sortedNodes = [
      ...sortedNodes.slice(rotateAmount),
      ...sortedNodes.slice(0, rotateAmount)
    ];
    this.currentUrlIndex++;
  }

  return sortedNodes;
}

API Reference

Connection Management

connect(): Promise<void>

Establishes connection to RabbitMQ with automatic retry logic.

await client.connect();

close(): Promise<void>

Closes all channels and connection immediately.

await client.close();

gracefulShutdown(): Promise<void>

Performs graceful shutdown waiting for in-flight messages.

await client.gracefulShutdown();

Message Operations

publish(exchange, routingKey, content, options): Promise<void>

Publishes a single message with confirmation.

await client.publish('user-events', 'user.created', Buffer.from(JSON.stringify({ userId: 123 })), {
  persistent: true,
  timestamp: Date.now(),
});

publishBatch(messages): Promise<void>

Publishes multiple messages in a single operation.

const messages = [
  {
    exchange: 'events',
    routingKey: 'user.created',
    content: Buffer.from('{"userId": 1}'),
    options: { persistent: true },
  },
  {
    exchange: 'events',
    routingKey: 'user.updated',
    content: Buffer.from('{"userId": 2}'),
    options: { persistent: true },
  },
];

await client.publishBatch(messages);

consume(queue, onMessage, options): Promise<string>

Consumes messages from a queue. Supports both auto and manual acknowledgment modes.

// Auto-acknowledgment mode (default)
const consumerTag = await client.consume(
  'user-processing-queue',
  async (msg) => {
    if (msg) {
      const data = JSON.parse(msg.content.toString());
      await processUser(data);
      // Message auto-acked on success, auto-nacked on error
    }
  },
  { timeout: 30000 },
);

// Manual acknowledgment mode
await client.consume(
  'payment-queue',
  async (msg, actions) => {
    if (msg && actions) {
      try {
        const payment = JSON.parse(msg.content.toString());
        await processPayment(payment);
        await actions.ack(); // Manually acknowledge
      } catch (error) {
        if (isRetryable(error)) {
          await actions.nack(true); // Requeue for retry
        } else {
          await actions.reject(false); // Send to DLQ
        }
      }
    }
  },
  { manualAck: true },
);

sendToQueue(queue, content, options): Promise<void>

Sends a message directly to a queue (bypassing exchanges).

await client.sendToQueue('direct-queue', Buffer.from(JSON.stringify({ id: 1 })), {
  persistent: true,
  priority: 5,
});

get(queue, options): Promise<Message | false>

Pulls a single message from a queue (synchronous fetch).

const msg = await client.get('my-queue');
if (msg) {
  console.log('Got message:', msg.content.toString());
  client.ack(msg); // Manual ack required
} else {
  console.log('Queue is empty');
}

ack(msg, allUpTo?): void

Acknowledges a message (for use with get() or manualAck mode).

client.ack(msg); // Ack single message
client.ack(msg, true); // Ack all messages up to this one

nack(msg, allUpTo?, requeue?): void

Negative acknowledges a message.

client.nack(msg, false, true); // Requeue the message
client.nack(msg, false, false); // Don't requeue (goes to DLQ if configured)

reject(msg, requeue?): void

Rejects a message (typically sends to DLQ if configured).

client.reject(msg, false); // Send to DLQ
client.reject(msg, true); // Requeue

cancel(consumerTag): Promise<void>

Cancels a consumer.

const consumerTag = await client.consume('my-queue', handler);
// Later...
await client.cancel(consumerTag);

Queue and Exchange Management

assertQueue(queue, options): Promise<AssertQueue>

Creates or verifies a queue exists.

await client.assertQueue('user-events', {
  durable: true,
  deadLetterExchange: 'dlx',
  deadLetterRoutingKey: 'failed',
  messageTtl: 3600000, // 1 hour
  maxLength: 10000,
});

assertExchange(exchange, type, options): Promise<AssertExchange>

Creates or verifies an exchange exists.

await client.assertExchange('user-events', 'topic', {
  durable: true,
  alternateExchange: 'unrouted-messages',
});

bindQueue(queue, exchange, pattern): Promise<void>

Binds a queue to an exchange with a routing pattern.

await client.bindQueue('user-notifications', 'user-events', 'user.*.created');

unbindQueue(queue, exchange, pattern): Promise<void>

Unbinds a queue from an exchange.

await client.unbindQueue('user-notifications', 'user-events', 'user.*.created');

deleteQueue(queue, options?): Promise<{ messageCount: number }>

Deletes a queue.

const result = await client.deleteQueue('temp-queue');
console.log(`Deleted queue with ${result.messageCount} messages`);

// Delete only if empty and unused
await client.deleteQueue('my-queue', { ifEmpty: true, ifUnused: true });

purgeQueue(queue): Promise<{ messageCount: number }>

Removes all messages from a queue.

const result = await client.purgeQueue('my-queue');
console.log(`Purged ${result.messageCount} messages`);

deleteExchange(exchange, options?): Promise<void>

Deletes an exchange.

await client.deleteExchange('temp-exchange');

// Delete only if unused
await client.deleteExchange('my-exchange', { ifUnused: true });

prefetch(count, global?): Promise<void>

Sets the prefetch count (QoS) for the channel.

await client.prefetch(10); // Per consumer
await client.prefetch(100, true); // Global for channel

Health and Monitoring

healthCheck(): Promise<boolean>

Performs a comprehensive health check.

const isHealthy = await client.healthCheck();
if (!isHealthy) {
  console.log('RabbitMQ connection is unhealthy');
}

getMetrics(): Metrics

Returns current performance metrics.

const metrics = client.getMetrics();
console.log(`Messages sent: ${metrics.messagesSent}`);
console.log(`Messages received: ${metrics.messagesReceived}`);
console.log(`Errors: ${metrics.errors}`);
console.log(`Reconnections: ${metrics.reconnections}`);

Examples

Basic Producer

import RabbitMQClient from '@slzsid/rabbitmq-multinode-connector';

class UserEventProducer {
  private client: RabbitMQClient;

  constructor() {
    this.client = new RabbitMQClient({
      urls: ['amqp://localhost:5672'],
      connectionName: 'user-event-producer',
      prefetchCount: 1,
    });
  }

  async initialize() {
    await this.client.connect();
    await this.client.assertExchange('user-events', 'topic', { durable: true });
  }

  async publishUserCreated(userId: number, userData: any) {
    const event = {
      type: 'user.created',
      userId,
      data: userData,
      timestamp: new Date().toISOString(),
    };

    await this.client.publish('user-events', 'user.created', Buffer.from(JSON.stringify(event)), {
      persistent: true,
    });
  }

  async shutdown() {
    await this.client.gracefulShutdown();
  }
}

Basic Consumer

class UserEventConsumer {
  private client: RabbitMQClient;

  constructor() {
    this.client = new RabbitMQClient({
      urls: ['amqp://localhost:5672'],
      connectionName: 'user-event-consumer',
      prefetchCount: 10,
    });
  }

  async initialize() {
    await this.client.connect();

    // Setup infrastructure
    await this.client.assertExchange('user-events', 'topic', { durable: true });
    await this.client.assertQueue('user-processing', {
      durable: true,
      deadLetterExchange: 'dlx',
      deadLetterRoutingKey: 'failed',
    });
    await this.client.bindQueue('user-processing', 'user-events', 'user.*');
  }

  async startConsuming() {
    await this.client.consume('user-processing', async (msg) => {
      if (msg) {
        try {
          const event = JSON.parse(msg.content.toString());
          await this.processUserEvent(event);
        } catch (error) {
          console.error('Error processing message:', error);
          throw error; // Will trigger nack and requeue
        }
      }
    });
  }

  private async processUserEvent(event: any) {
    console.log(`Processing event: ${event.type} for user ${event.userId}`);
    // Your business logic here
  }
}

Cluster Configuration

const clusterClient = new RabbitMQClient({
  urls: [
    'amqp://rabbit1.example.com:5672',
    'amqp://rabbit2.example.com:5672',
    'amqp://rabbit3.example.com:5672',
  ],
  failoverStrategy: 'round-robin',
  clusterOptions: {
    retryConnectTimeout: 5000,
    nodeRecoveryInterval: 30000,
    priorityNodes: ['amqp://rabbit1.example.com:5672'],
  },
  circuitBreaker: {
    failureThreshold: 3,
    resetTimeout: 60000,
  },
});

SSL/TLS Configuration

import fs from 'fs';

const secureClient = new RabbitMQClient({
  urls: ['amqps://secure-rabbit.example.com:5671'],
  ssl: {
    enabled: true,
    validate: true,
    ca: [fs.readFileSync('ca-cert.pem').toString()],
    cert: fs.readFileSync('client-cert.pem').toString(),
    key: fs.readFileSync('client-key.pem').toString(),
    passphrase: 'your-key-passphrase',
  },
});

Message Batching

class BatchProcessor {
  private client: RabbitMQClient;

  constructor() {
    this.client = new RabbitMQClient({
      urls: ['amqp://localhost:5672'],
      batchConfig: {
        size: 50, // Batch size
        timeoutMs: 1000, // Max wait time
      },
    });
  }

  async processBulkUsers(users: any[]) {
    const messages = users.map((user) => ({
      exchange: 'user-events',
      routingKey: 'user.bulk.created',
      content: Buffer.from(JSON.stringify(user)),
      options: { persistent: true },
    }));

    await this.client.publishBatch(messages);
  }
}

Monitoring & Metrics

Event Listeners

client.on('connected', () => {
  console.log('Connected to RabbitMQ');
});

client.on('connectionError', (error) => {
  console.error('Connection error:', error);
});

client.on('reconnecting', () => {
  console.log('Attempting to reconnect...');
});

client.on('reconnected', () => {
  console.log('Successfully reconnected');
});

client.on('metrics', (metrics) => {
  console.log('Current metrics:', metrics);
});

client.on('blocked', (reason) => {
  console.warn('Connection blocked:', reason);
});

client.on('unblocked', () => {
  console.log('Connection unblocked');
});

Metrics Collection

// Get current metrics
const metrics = client.getMetrics();

// Metrics structure
interface Metrics {
  messagesSent: number;
  messagesReceived: number;
  errors: number;
  reconnections: number;
  lastReconnectTime: Date | null;
  avgProcessingTime: number;
}

Error Handling

Automatic Error Recovery

The client automatically handles various error scenarios:

  1. Connection Errors: Automatic reconnection with exponential backoff
  2. Channel Errors: Channel recovery and recreation
  3. Message Processing Errors: Automatic nack and requeue
  4. Cluster Node Failures: Failover to healthy nodes

Custom Error Handling

client.on('error', (error) => {
  // Log error to monitoring system
  logger.error('RabbitMQ error:', error);

  // Send alert if critical
  if (error.message.includes('ECONNREFUSED')) {
    alerting.sendAlert('RabbitMQ connection failed');
  }
});

// Handle specific message processing errors
await client.consume('my-queue', async (msg) => {
  try {
    await processMessage(msg);
  } catch (error) {
    if (error instanceof ValidationError) {
      // Don't requeue invalid messages
      return; // Auto-ack
    }
    throw error; // Requeue for retry
  }
});

Best Practices

1. Connection Management

// Good: Single connection per application
const client = new RabbitMQClient({ url: 'amqp://localhost' });

// Bad: Multiple connections
const client1 = new RabbitMQClient({ url: 'amqp://localhost' });
const client2 = new RabbitMQClient({ url: 'amqp://localhost' });

2. Channel Usage

// Good: Use channel pool
const channel = await client.getChannel();
try {
  // Use channel
} finally {
  client.releaseChannel(channel);
}

// Better: Use built-in methods
await client.publish('exchange', 'key', buffer);

3. Error Handling

// Good: Comprehensive error handling
client.on('error', (error) => {
  logger.error('RabbitMQ error:', error);
  metrics.increment('rabbitmq.errors');
});

client.on('reconnected', () => {
  logger.info('RabbitMQ reconnected');
  metrics.increment('rabbitmq.reconnections');
});

4. Graceful Shutdown

// Good: Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully...');
  await client.gracefulShutdown();
  process.exit(0);
});

5. Message Durability

// Good: Durable queues and persistent messages
await client.assertQueue('important-queue', { durable: true });
await client.publish('exchange', 'key', buffer, { persistent: true });

6. Monitoring

// Good: Regular health checks
setInterval(async () => {
  const isHealthy = await client.healthCheck();
  if (!isHealthy) {
    logger.warn('RabbitMQ health check failed');
  }
}, 30000);

Contributing

We welcome contributions! Please follow these steps:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for new functionality
  5. Run the linter and tests (npm run lint && npm test)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development

# Install dependencies
npm install

# Build the library
npm run build

# Run linter
npm run lint

# Fix linting issues
npm run lint:fix

# Format code
npm run format

# Run tests
npm test

# Run tests in watch mode
npm run test:watch

Publishing

# Update version (patch/minor/major)
npm version patch

# Build and publish
npm publish

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

SleepySid

Support

For issues and questions:

Changelog

See CHANGELOG.md for version history and release notes.


Made for the Node.js community