npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

rabbit-axon

v2.0.3

Published

Class-based TypeScript RabbitMQ wrapper with auto-reconnect, DLX, publisher confirms and exponential backoff

Downloads

895

Readme

rabbit-axon

Production-ready class-based TypeScript RabbitMQ wrapper — auto-reconnect, dead-letter exchange, publisher confirms, exponential backoff retries, message buffering, connection pooling, and structured Winston logging.


Installation

pnpm add rabbit-axon

Project Structure

src/
├── connection/
│   ├── single.ts       — Single connection handler with auto-reconnect
│   └── pool.ts         — Connection pool handler (round-robin load balancing)
├── channel/
│   └── manager.ts      — Static channel utility (create, close)
├── exchange/
│   └── exchange.ts     — Abstract exchange/queue setup
├── producer/
│   └── producer.ts     — Publish, confirm publish, message buffering
├── consumer/
│   └── consumer.ts     — Consume, DLX routing, retry backoff
├── serializer/
│   └── serializer.ts   — ISerializer, JsonSerializer, MsgpackSerializer, IdentitySerializer
├── logger/
│   └── logger.ts       — Winston-based structured logger
└── types.ts            — All interfaces and type aliases

Quick Start

import {
  RabbitSingleConnectionHandler,
  RabbitMqQueueExchange,
  RabbitProducer,
  RabbitConsumer,
} from "rabbit-axon";

// 1. Connect
const handler = new RabbitSingleConnectionHandler("amqp://localhost");
await handler.ConnectToService();

// 2. Setup exchange + queue
class OrderExchange extends RabbitMqQueueExchange {
  constructor() {
    super(handler, "orders", "topic");
  }
  async setup() {
    await this.createExchange();
    await this.createQueue("orders.created", "order.created.#");
  }
}
const exchange = new OrderExchange();
await exchange.setup();

// 3. Produce
const producer = new RabbitProducer(handler, "orders", "order.created.us");
await producer.publish({ orderId: "ORD-1", region: "us" });

// 4. Consume
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.created", async (data) => {
  console.log("Order received:", data.orderId);
});

// 5. Shutdown
process.on("SIGTERM", async () => {
  await handler.gracefulShutdown();
});

Connection

RabbitSingleConnectionHandler

Single AMQP connection with auto-reconnect.

import { RabbitSingleConnectionHandler } from "rabbit-axon";

const handler = new RabbitSingleConnectionHandler("amqp://user:pass@localhost:5672", {
  heartbeat: 30,
  circuitBreaker: {
    threshold: 5,        // failures before opening circuit
    resetTimeout: 30000, // ms to wait in OPEN before probing
    maxResetTimeout: 300000, // cap on doubling — default 5 minutes
  },
});

await handler.ConnectToService();

// Register reconnect callback
handler.onReconnect(async () => {
  console.log("Reconnected — re-setup if needed");
});

// Graceful shutdown
await handler.gracefulShutdown();

RabbitConnectionPoolHandler

Multiple connections for high-throughput scenarios. Each connection in the pool has its own channel, and publishes are distributed via round-robin across all connections automatically — no manual acquire/release needed.

import { RabbitConnectionPoolHandler } from "rabbit-axon";

const pool = new RabbitConnectionPoolHandler("amqp://localhost", 5, {
  heartbeat: 30,
});

await pool.ConnectToService();

// use pool directly with producer — round-robin load balancing across 5 connections
const producer = new RabbitProducer(pool, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // uses conn1
await producer.publish({ orderId: "ORD-2" }); // uses conn2
await producer.publish({ orderId: "ORD-3" }); // uses conn3
// ... wraps around automatically

// use pool directly with consumer
const consumer = new RabbitConsumer(pool);
await consumer.consume("orders.created", async (data) => {
  console.log(data.orderId);
});

await pool.gracefulShutdown();

Internally the pool maintains a Map<connection, channel> — one channel per connection. Round-robin picks the next connection on each getChannel() call, reusing the existing channel for that connection if it's still alive.


Logger

Every class creates a default Winston logger internally. Replace it with your own:

import { RabbitLogger, RabbitSingleConnectionHandler } from "rabbit-axon";
import winston from "winston";

const myWinstonLogger = winston.createLogger({
  level: "info",
  format: winston.format.combine(winston.format.timestamp(), winston.format.json()),
  transports: [
    new winston.transports.Console(),
    new winston.transports.File({ filename: "rabbit.log" }),
  ],
});

const logger = new RabbitLogger(myWinstonLogger);

const handler = new RabbitSingleConnectionHandler("amqp://localhost");
handler.addLogger(logger);

Default behavior (no custom logger):

  • NODE_ENV=production → JSON logs (one line per entry, parseable by Loki/CloudWatch/Datadog/ELK)
  • NODE_ENV=development → colored human-readable logs

Exchange Types

fanout — Broadcast to all queues

class BroadcastExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "events.broadcast", "fanout");
  }
  async setup() {
    await this.createExchange();
    await this.createQueue("service-a.events", "");
    await this.createQueue("service-b.events", "");
  }
}

direct — Exact routing key match

class LogExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "logs", "direct");
  }
  async setup() {
    await this.createExchange();
    await this.createQueue("logs.errors", "error");
    await this.createQueue("logs.warnings", "warning");
  }
}

// Only "logs.errors" receives this
const producer = new RabbitProducer(handler, "logs", "error");
await producer.publish({ msg: "DB connection failed" });

topic — Pattern routing key

class OrderExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "orders", "topic");
  }
  async setup() {
    await this.createExchange();
    await this.createQueue("orders.all", "order.#");
    await this.createQueue("orders.created", "order.created.*");
    await this.createQueue("orders.us", "order.*.us");
  }
}

// Matches "orders.all", "orders.created", "orders.us"
const producer = new RabbitProducer(handler, "orders", "order.created.us");
await producer.publish({ orderId: "ORD-1" });

headers — Route by message headers

class AlertExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "alerts", "headers");
  }
  async setup() {
    await this.createExchange();
    // Both headers must match
    await this.createQueue("alerts.critical", "", { durable: true }, undefined, {
      "x-match": "all",
      type: "error",
      severity: "critical",
    });
    // Any header matches
    await this.createQueue("alerts.any", "", { durable: true }, undefined, {
      "x-match": "any",
      type: "error",
      severity: "critical",
    });
  }
}

const producer = new RabbitProducer(handler, "alerts", "");
await producer.publish({ message: "Disk full" }, {
  headers: { type: "error", severity: "critical" },
});

Dead Letter Exchange (DLX)

class OrderExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "orders", "topic");
  }
  async setup() {
    await this.createExchange();

    // DLX queue to catch failed messages
    await this.createQueue("orders.failed", "order.failed.#");

    // Main queue with DLX configured
    await this.createDeadLetterQueue(
      "orders.processing",  // queue name
      "orders",             // DLX exchange
      "order.failed.all",   // DLX routing key
      30000                 // TTL: 30s before expiry also routes to DLX
    );
  }
}

// Consumer with dlx: true — failed messages nack'd straight to DLX
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.processing", async (data) => {
  throw new Error("processing failed"); // → goes to orders.failed queue
}, { dlx: true });

Producer

Basic publish

const producer = new RabbitProducer(handler, "orders", "order.created");

await producer.publish({ orderId: "ORD-1" }, {
  persistent: true,
  priority: 5,
  messageId: "msg-001",
  headers: { "x-source": "order-service" },
});

TypeScript generics

interface OrderPayload {
  orderId: string;
  amount: number;
  region: "us" | "eu";
}

const producer = new RabbitProducer<OrderPayload>(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1", amount: 99.99, region: "us" }); // fully type checked

Publish with broker confirm

const confirmed = await producer.publishWithConfirm({ orderId: "ORD-1" }, {
  persistent: true,
  expiration: "60000",
});

if (!confirmed) console.error("Broker rejected the message");

Message buffering on disconnect

When the connection drops, publish does not throw — it buffers the message and resolves once the connection is restored:

// hangs until reconnected, then resolves automatically
const result = await producer.publish({ orderId: "ORD-2" });

console.log(producer.getBufferSize()); // number of pending buffered messages

Consumer

Basic consume

const consumer = new RabbitConsumer(handler);

await consumer.consume("orders.created", async (data, msg) => {
  console.log("Processing:", data.orderId);
}, { prefetchCount: 1 });

TypeScript generics

interface OrderPayload {
  orderId: string;
  amount: number;
}

const consumer = new RabbitConsumer<OrderPayload>(handler);
await consumer.consume("orders.created", async (data) => {
  console.log(data.orderId); // string — fully typed
  console.log(data.amount);  // number — fully typed
});

Retry with exponential backoff (no DLX)

When dlx: false (default), failed messages are retried in-process with exponential backoff. After retryLimit, the message is acked and skipped:

await consumer.consume("orders.created", async (data) => {
  await processOrder(data);
}, {
  retryLimit: 3,
  dlx: false,
});

| Attempt | Delay | |---------|-------| | 1st retry | 1s | | 2nd retry | 2s | | 3rd retry | 4s | | After limit | ack + skip |

DLX routing on failure

When dlx: true, any failure immediately nacks the message — RabbitMQ routes it to the configured DLX:

await consumer.consume("orders.processing", async (data) => {
  await processOrder(data);
}, { dlx: true });

Custom Logger per class

const logger = new RabbitLogger(myWinstonLogger);

handler.addLogger(logger);
pool.addLogger(logger);       // propagates to all connections in pool
exchange.addLogger(logger);
producer.addLogger(logger);
consumer.addLogger(logger);

Graceful Shutdown

process.on("SIGTERM", async () => {
  await handler.gracefulShutdown();
  process.exit(0);
});

Production Considerations

  • persistent: true by default — messages survive broker restarts
  • noAck: false — all messages must be explicitly acknowledged
  • After maxReconnectAttempts, process.exitCode = 1 — let PM2/K8s/ECS restart the process
  • Keep prefetchCount low (1–5) for fair distribution
  • Use dlx: true + a dead letter queue for guaranteed message capture on failure
  • Monitor DLQ depth — a growing DLQ means your handler is consistently failing
  • Use publishWithConfirm for critical messages where loss is unacceptable
  • Use RabbitConnectionPoolHandler for high-throughput services needing parallel TCP connections
  • Scale consumers by deploying multiple instances/containers — each gets its own connection and channel

Serialization

By default all messages are serialized as JSON. You can swap to MessagePack for smaller, faster payloads or use the Identity serializer for raw binary data.

import { JsonSerializer, MsgpackSerializer, IdentitySerializer } from "rabbit-axon";

JSON (default)

const producer = new RabbitProducer(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // serialized as JSON

MessagePack — smaller and faster

const producer = new RabbitProducer(handler, "orders", "order.created", {
  serializer: new MsgpackSerializer(),
});
await producer.publish({ orderId: "ORD-1" }); // serialized as msgpack, ~2-3x smaller

Identity — raw Buffer passthrough

For protobuf, avro, encrypted payloads, or any binary data you encode yourself:

const producer = new RabbitProducer<Buffer>(handler, "orders", "order.created", {
  serializer: new IdentitySerializer(),
});

// you handle encoding
const bytes = OrderProto.encode({ orderId: "ORD-1" }).finish();
await producer.publish(Buffer.from(bytes));

// consumer receives raw Buffer
const consumer = new RabbitConsumer<Buffer>(handler, {
  serializer: new IdentitySerializer(),
});
await consumer.consume("orders.created", async (data) => {
  const decoded = OrderProto.decode(data); // data is Buffer — fully typed
});

TypeScript generics — T extends Record<string, any> | Buffer

Both RabbitProducer<T> and RabbitConsumer<T> accept either an object type or Buffer. Default is Record<string, any>:

// default — object payload, fully typed
const producer = new RabbitProducer<{ orderId: string }>(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // type checked

// binary — Buffer payload
const producer = new RabbitProducer<Buffer>(handler, "rpc", "call", {
  serializer: new IdentitySerializer(),
});
await producer.publish(Buffer.from([0x01, 0x02, 0x03]));

Auto-detection on consumer

The producer sets a contentType header on every message (application/json, application/msgpack, application/octet-stream). The consumer reads it and picks the correct deserializer automatically — no manual configuration needed:

// consumer uses JSON by default but auto-detects msgpack from contentType
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.created", async (data) => {
  console.log(data.orderId); // correctly deserialized regardless of producer serializer
});

Custom serializer

Implement ISerializer to plug in any format:

import { ISerializer } from "rabbit-axon";

class AvroSerializer implements ISerializer {
  readonly contentType = "application/avro";
  serialize(data: unknown): Buffer { /* your avro encode */ }
  deserialize(buffer: Buffer): unknown { /* your avro decode */ }
}

const producer = new RabbitProducer(handler, "orders", "order.created", {
  serializer: new AvroSerializer(),
});

API Reference

Connection Options

Passed as the second argument to RabbitSingleConnectionHandler and RabbitConnectionPoolHandler.

new RabbitSingleConnectionHandler(url, options)
new RabbitConnectionPoolHandler(url, poolSize, options)

| Option | Type | Default | Description | |--------|------|---------|-------------| | heartbeat | number | 60 | Heartbeat interval in seconds. Detects dead connections. | | frameMax | number | 0 | Max frame size in bytes. 0 = no limit. | | channelMax | number | 0 | Max channels per connection. 0 = no limit. | | circuitBreaker.threshold | number | 5 | Consecutive failures before opening the circuit. | | circuitBreaker.resetTimeout | number | 30000 | Ms to wait in OPEN state before probing. Doubles on each failed probe. | | circuitBreaker.maxResetTimeout | number | 300000 | Cap on how much resetTimeout can double. |

const handler = new RabbitSingleConnectionHandler("amqp://localhost", {
  heartbeat: 30,
  circuitBreaker: {
    threshold: 5,
    resetTimeout: 30000,
    maxResetTimeout: 300000,
  },
});

Exchange Options

Passed as the fourth argument to RabbitMqQueueExchange constructor.

super(handler, "exchange-name", "topic", exchangeOptions)

| Option | Type | Default | Description | |--------|------|---------|-------------| | durable | boolean | true | Exchange survives broker restart. | | autoDelete | boolean | false | Exchange deleted when last queue unbinds. | | internal | boolean | false | Exchange only receives messages from other exchanges, not producers. | | alternateExchange | string | — | Exchange to route unroutable messages to. | | arguments | Record<string, any> | — | Additional broker-specific arguments. |

class MyExchange extends RabbitMqQueueExchange {
  constructor(handler: RabbitSingleConnectionHandler) {
    super(handler, "my.exchange", "topic", {
      durable: true,
      autoDelete: false,
      alternateExchange: "my.unrouted",
    });
  }
}

Queue Arguments

Passed as the fourth argument to createQueue and createDeadLetterQueue.

await this.createQueue(queueName, bindKey, queueOptions, queueArguments)

| Argument | Type | Description | |----------|------|-------------| | x-dead-letter-exchange | string | Exchange to route rejected/expired messages to. | | x-dead-letter-routing-key | string | Routing key to use when forwarding to DLX. | | x-message-ttl | number | Milliseconds before a message expires and routes to DLX. | | x-max-length | number | Max number of messages in the queue. Oldest dropped when exceeded. | | x-max-priority | number | Enables priority queue. Value is the max priority level (1–255). | | x-queue-mode | "default" \| "lazy" | lazy keeps messages on disk, reduces memory usage. | | x-expires | number | Milliseconds before an unused queue is deleted. | | x-overflow | "drop-head" \| "reject-publish" | Behaviour when x-max-length is exceeded. |

await this.createQueue("orders.processing", "order.processing", { durable: true }, {
  "x-dead-letter-exchange": "orders",
  "x-dead-letter-routing-key": "order.failed",
  "x-message-ttl": 30000,
  "x-max-length": 10000,
  "x-overflow": "reject-publish",
});

Publish Options

Passed as the second argument to producer.publish() and producer.publishWithConfirm().

await producer.publish(data, publishOptions)
await producer.publishWithConfirm(data, publishOptions)

| Option | Type | Default | Description | |--------|------|---------|-------------| | persistent | boolean | true | Message survives broker restart. | | priority | number | — | Message priority (requires x-max-priority on queue). | | expiration | string | — | Milliseconds as string before message expires. | | correlationId | string | — | Used to correlate RPC responses with requests. | | replyTo | string | — | Queue name for RPC reply. | | messageId | string | — | Application-level message identifier. | | timestamp | number | Date.now() | Unix timestamp of when the message was created. | | contentType | string | "application/json" | MIME type of the message body. | | headers | Record<string, any> | — | Arbitrary message headers. Used for headers exchange routing. |

await producer.publish({ orderId: "ORD-1" }, {
  persistent: true,
  priority: 5,
  expiration: "60000",
  messageId: "msg-001",
  headers: {
    "x-source": "order-service",
    "x-region": "us",
  },
});

Producer Options

Passed as the fourth argument to RabbitProducer constructor.

new RabbitProducer(connInstance, exchangeName, routingKey, options)

| Option | Type | Default | Description | |--------|------|---------|-------------| | maxBufferSize | number | 10000 | Max messages to buffer when connection is down. | | serializer | ISerializer | JsonSerializer | Serializer to use for message encoding. |

const producer = new RabbitProducer(handler, "orders", "order.created", {
  maxBufferSize: 5000,
  serializer: new MsgpackSerializer(),
});

Consume Options

Passed as the third argument to consumer.consume().

await consumer.consume(queueName, handler, consumeOptions)

| Option | Type | Default | Description | |--------|------|---------|-------------| | prefetchCount | number | 1 | Max unacknowledged messages held by this consumer at a time. | | retryLimit | number | 3 | Max retry attempts before acking and skipping (only when dlx: false). | | dlx | boolean | false | If true, failed messages are nacked immediately to DLX. If false, retried with exponential backoff. | | serializer | ISerializer | JsonSerializer | Override default serializer for this consume call. Consumer still auto-detects from contentType first. |

// with DLX
await consumer.consume("orders.processing", async (data) => {
  await processOrder(data);
}, {
  prefetchCount: 1,
  dlx: true,
});

// with retry backoff
await consumer.consume("orders.created", async (data) => {
  await processOrder(data);
}, {
  prefetchCount: 5,
  retryLimit: 5,
  dlx: false,
});