rabbit-axon
v2.0.3
Published
Class-based TypeScript RabbitMQ wrapper with auto-reconnect, DLX, publisher confirms and exponential backoff
Downloads
895
Maintainers
Readme
rabbit-axon
Production-ready class-based TypeScript RabbitMQ wrapper — auto-reconnect, dead-letter exchange, publisher confirms, exponential backoff retries, message buffering, connection pooling, and structured Winston logging.
Installation
pnpm add rabbit-axonProject Structure
src/
├── connection/
│ ├── single.ts — Single connection handler with auto-reconnect
│ └── pool.ts — Connection pool handler (round-robin load balancing)
├── channel/
│ └── manager.ts — Static channel utility (create, close)
├── exchange/
│ └── exchange.ts — Abstract exchange/queue setup
├── producer/
│ └── producer.ts — Publish, confirm publish, message buffering
├── consumer/
│ └── consumer.ts — Consume, DLX routing, retry backoff
├── serializer/
│ └── serializer.ts — ISerializer, JsonSerializer, MsgpackSerializer, IdentitySerializer
├── logger/
│ └── logger.ts — Winston-based structured logger
└── types.ts — All interfaces and type aliasesQuick Start
import {
RabbitSingleConnectionHandler,
RabbitMqQueueExchange,
RabbitProducer,
RabbitConsumer,
} from "rabbit-axon";
// 1. Connect
const handler = new RabbitSingleConnectionHandler("amqp://localhost");
await handler.ConnectToService();
// 2. Setup exchange + queue
class OrderExchange extends RabbitMqQueueExchange {
constructor() {
super(handler, "orders", "topic");
}
async setup() {
await this.createExchange();
await this.createQueue("orders.created", "order.created.#");
}
}
const exchange = new OrderExchange();
await exchange.setup();
// 3. Produce
const producer = new RabbitProducer(handler, "orders", "order.created.us");
await producer.publish({ orderId: "ORD-1", region: "us" });
// 4. Consume
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.created", async (data) => {
console.log("Order received:", data.orderId);
});
// 5. Shutdown
process.on("SIGTERM", async () => {
await handler.gracefulShutdown();
});Connection
RabbitSingleConnectionHandler
Single AMQP connection with auto-reconnect.
import { RabbitSingleConnectionHandler } from "rabbit-axon";
const handler = new RabbitSingleConnectionHandler("amqp://user:pass@localhost:5672", {
heartbeat: 30,
circuitBreaker: {
threshold: 5, // failures before opening circuit
resetTimeout: 30000, // ms to wait in OPEN before probing
maxResetTimeout: 300000, // cap on doubling — default 5 minutes
},
});
await handler.ConnectToService();
// Register reconnect callback
handler.onReconnect(async () => {
console.log("Reconnected — re-setup if needed");
});
// Graceful shutdown
await handler.gracefulShutdown();RabbitConnectionPoolHandler
Multiple connections for high-throughput scenarios. Each connection in the pool has its own channel, and publishes are distributed via round-robin across all connections automatically — no manual acquire/release needed.
import { RabbitConnectionPoolHandler } from "rabbit-axon";
const pool = new RabbitConnectionPoolHandler("amqp://localhost", 5, {
heartbeat: 30,
});
await pool.ConnectToService();
// use pool directly with producer — round-robin load balancing across 5 connections
const producer = new RabbitProducer(pool, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // uses conn1
await producer.publish({ orderId: "ORD-2" }); // uses conn2
await producer.publish({ orderId: "ORD-3" }); // uses conn3
// ... wraps around automatically
// use pool directly with consumer
const consumer = new RabbitConsumer(pool);
await consumer.consume("orders.created", async (data) => {
console.log(data.orderId);
});
await pool.gracefulShutdown();Internally the pool maintains a Map<connection, channel> — one channel per connection. Round-robin picks the next connection on each getChannel() call, reusing the existing channel for that connection if it's still alive.
Logger
Every class creates a default Winston logger internally. Replace it with your own:
import { RabbitLogger, RabbitSingleConnectionHandler } from "rabbit-axon";
import winston from "winston";
const myWinstonLogger = winston.createLogger({
level: "info",
format: winston.format.combine(winston.format.timestamp(), winston.format.json()),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: "rabbit.log" }),
],
});
const logger = new RabbitLogger(myWinstonLogger);
const handler = new RabbitSingleConnectionHandler("amqp://localhost");
handler.addLogger(logger);Default behavior (no custom logger):
NODE_ENV=production→ JSON logs (one line per entry, parseable by Loki/CloudWatch/Datadog/ELK)NODE_ENV=development→ colored human-readable logs
Exchange Types
fanout — Broadcast to all queues
class BroadcastExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "events.broadcast", "fanout");
}
async setup() {
await this.createExchange();
await this.createQueue("service-a.events", "");
await this.createQueue("service-b.events", "");
}
}direct — Exact routing key match
class LogExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "logs", "direct");
}
async setup() {
await this.createExchange();
await this.createQueue("logs.errors", "error");
await this.createQueue("logs.warnings", "warning");
}
}
// Only "logs.errors" receives this
const producer = new RabbitProducer(handler, "logs", "error");
await producer.publish({ msg: "DB connection failed" });topic — Pattern routing key
class OrderExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "orders", "topic");
}
async setup() {
await this.createExchange();
await this.createQueue("orders.all", "order.#");
await this.createQueue("orders.created", "order.created.*");
await this.createQueue("orders.us", "order.*.us");
}
}
// Matches "orders.all", "orders.created", "orders.us"
const producer = new RabbitProducer(handler, "orders", "order.created.us");
await producer.publish({ orderId: "ORD-1" });headers — Route by message headers
class AlertExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "alerts", "headers");
}
async setup() {
await this.createExchange();
// Both headers must match
await this.createQueue("alerts.critical", "", { durable: true }, undefined, {
"x-match": "all",
type: "error",
severity: "critical",
});
// Any header matches
await this.createQueue("alerts.any", "", { durable: true }, undefined, {
"x-match": "any",
type: "error",
severity: "critical",
});
}
}
const producer = new RabbitProducer(handler, "alerts", "");
await producer.publish({ message: "Disk full" }, {
headers: { type: "error", severity: "critical" },
});Dead Letter Exchange (DLX)
class OrderExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "orders", "topic");
}
async setup() {
await this.createExchange();
// DLX queue to catch failed messages
await this.createQueue("orders.failed", "order.failed.#");
// Main queue with DLX configured
await this.createDeadLetterQueue(
"orders.processing", // queue name
"orders", // DLX exchange
"order.failed.all", // DLX routing key
30000 // TTL: 30s before expiry also routes to DLX
);
}
}
// Consumer with dlx: true — failed messages nack'd straight to DLX
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.processing", async (data) => {
throw new Error("processing failed"); // → goes to orders.failed queue
}, { dlx: true });Producer
Basic publish
const producer = new RabbitProducer(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }, {
persistent: true,
priority: 5,
messageId: "msg-001",
headers: { "x-source": "order-service" },
});TypeScript generics
interface OrderPayload {
orderId: string;
amount: number;
region: "us" | "eu";
}
const producer = new RabbitProducer<OrderPayload>(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1", amount: 99.99, region: "us" }); // fully type checkedPublish with broker confirm
const confirmed = await producer.publishWithConfirm({ orderId: "ORD-1" }, {
persistent: true,
expiration: "60000",
});
if (!confirmed) console.error("Broker rejected the message");Message buffering on disconnect
When the connection drops, publish does not throw — it buffers the message and resolves once the connection is restored:
// hangs until reconnected, then resolves automatically
const result = await producer.publish({ orderId: "ORD-2" });
console.log(producer.getBufferSize()); // number of pending buffered messagesConsumer
Basic consume
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.created", async (data, msg) => {
console.log("Processing:", data.orderId);
}, { prefetchCount: 1 });TypeScript generics
interface OrderPayload {
orderId: string;
amount: number;
}
const consumer = new RabbitConsumer<OrderPayload>(handler);
await consumer.consume("orders.created", async (data) => {
console.log(data.orderId); // string — fully typed
console.log(data.amount); // number — fully typed
});Retry with exponential backoff (no DLX)
When dlx: false (default), failed messages are retried in-process with exponential backoff. After retryLimit, the message is acked and skipped:
await consumer.consume("orders.created", async (data) => {
await processOrder(data);
}, {
retryLimit: 3,
dlx: false,
});| Attempt | Delay | |---------|-------| | 1st retry | 1s | | 2nd retry | 2s | | 3rd retry | 4s | | After limit | ack + skip |
DLX routing on failure
When dlx: true, any failure immediately nacks the message — RabbitMQ routes it to the configured DLX:
await consumer.consume("orders.processing", async (data) => {
await processOrder(data);
}, { dlx: true });Custom Logger per class
const logger = new RabbitLogger(myWinstonLogger);
handler.addLogger(logger);
pool.addLogger(logger); // propagates to all connections in pool
exchange.addLogger(logger);
producer.addLogger(logger);
consumer.addLogger(logger);Graceful Shutdown
process.on("SIGTERM", async () => {
await handler.gracefulShutdown();
process.exit(0);
});Production Considerations
persistent: trueby default — messages survive broker restartsnoAck: false— all messages must be explicitly acknowledged- After
maxReconnectAttempts,process.exitCode = 1— let PM2/K8s/ECS restart the process - Keep
prefetchCountlow (1–5) for fair distribution - Use
dlx: true+ a dead letter queue for guaranteed message capture on failure - Monitor DLQ depth — a growing DLQ means your handler is consistently failing
- Use
publishWithConfirmfor critical messages where loss is unacceptable - Use
RabbitConnectionPoolHandlerfor high-throughput services needing parallel TCP connections - Scale consumers by deploying multiple instances/containers — each gets its own connection and channel
Serialization
By default all messages are serialized as JSON. You can swap to MessagePack for smaller, faster payloads or use the Identity serializer for raw binary data.
import { JsonSerializer, MsgpackSerializer, IdentitySerializer } from "rabbit-axon";JSON (default)
const producer = new RabbitProducer(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // serialized as JSONMessagePack — smaller and faster
const producer = new RabbitProducer(handler, "orders", "order.created", {
serializer: new MsgpackSerializer(),
});
await producer.publish({ orderId: "ORD-1" }); // serialized as msgpack, ~2-3x smallerIdentity — raw Buffer passthrough
For protobuf, avro, encrypted payloads, or any binary data you encode yourself:
const producer = new RabbitProducer<Buffer>(handler, "orders", "order.created", {
serializer: new IdentitySerializer(),
});
// you handle encoding
const bytes = OrderProto.encode({ orderId: "ORD-1" }).finish();
await producer.publish(Buffer.from(bytes));
// consumer receives raw Buffer
const consumer = new RabbitConsumer<Buffer>(handler, {
serializer: new IdentitySerializer(),
});
await consumer.consume("orders.created", async (data) => {
const decoded = OrderProto.decode(data); // data is Buffer — fully typed
});TypeScript generics — T extends Record<string, any> | Buffer
Both RabbitProducer<T> and RabbitConsumer<T> accept either an object type or Buffer. Default is Record<string, any>:
// default — object payload, fully typed
const producer = new RabbitProducer<{ orderId: string }>(handler, "orders", "order.created");
await producer.publish({ orderId: "ORD-1" }); // type checked
// binary — Buffer payload
const producer = new RabbitProducer<Buffer>(handler, "rpc", "call", {
serializer: new IdentitySerializer(),
});
await producer.publish(Buffer.from([0x01, 0x02, 0x03]));Auto-detection on consumer
The producer sets a contentType header on every message (application/json, application/msgpack, application/octet-stream). The consumer reads it and picks the correct deserializer automatically — no manual configuration needed:
// consumer uses JSON by default but auto-detects msgpack from contentType
const consumer = new RabbitConsumer(handler);
await consumer.consume("orders.created", async (data) => {
console.log(data.orderId); // correctly deserialized regardless of producer serializer
});Custom serializer
Implement ISerializer to plug in any format:
import { ISerializer } from "rabbit-axon";
class AvroSerializer implements ISerializer {
readonly contentType = "application/avro";
serialize(data: unknown): Buffer { /* your avro encode */ }
deserialize(buffer: Buffer): unknown { /* your avro decode */ }
}
const producer = new RabbitProducer(handler, "orders", "order.created", {
serializer: new AvroSerializer(),
});API Reference
Connection Options
Passed as the second argument to RabbitSingleConnectionHandler and RabbitConnectionPoolHandler.
new RabbitSingleConnectionHandler(url, options)
new RabbitConnectionPoolHandler(url, poolSize, options)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| heartbeat | number | 60 | Heartbeat interval in seconds. Detects dead connections. |
| frameMax | number | 0 | Max frame size in bytes. 0 = no limit. |
| channelMax | number | 0 | Max channels per connection. 0 = no limit. |
| circuitBreaker.threshold | number | 5 | Consecutive failures before opening the circuit. |
| circuitBreaker.resetTimeout | number | 30000 | Ms to wait in OPEN state before probing. Doubles on each failed probe. |
| circuitBreaker.maxResetTimeout | number | 300000 | Cap on how much resetTimeout can double. |
const handler = new RabbitSingleConnectionHandler("amqp://localhost", {
heartbeat: 30,
circuitBreaker: {
threshold: 5,
resetTimeout: 30000,
maxResetTimeout: 300000,
},
});Exchange Options
Passed as the fourth argument to RabbitMqQueueExchange constructor.
super(handler, "exchange-name", "topic", exchangeOptions)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| durable | boolean | true | Exchange survives broker restart. |
| autoDelete | boolean | false | Exchange deleted when last queue unbinds. |
| internal | boolean | false | Exchange only receives messages from other exchanges, not producers. |
| alternateExchange | string | — | Exchange to route unroutable messages to. |
| arguments | Record<string, any> | — | Additional broker-specific arguments. |
class MyExchange extends RabbitMqQueueExchange {
constructor(handler: RabbitSingleConnectionHandler) {
super(handler, "my.exchange", "topic", {
durable: true,
autoDelete: false,
alternateExchange: "my.unrouted",
});
}
}Queue Arguments
Passed as the fourth argument to createQueue and createDeadLetterQueue.
await this.createQueue(queueName, bindKey, queueOptions, queueArguments)| Argument | Type | Description |
|----------|------|-------------|
| x-dead-letter-exchange | string | Exchange to route rejected/expired messages to. |
| x-dead-letter-routing-key | string | Routing key to use when forwarding to DLX. |
| x-message-ttl | number | Milliseconds before a message expires and routes to DLX. |
| x-max-length | number | Max number of messages in the queue. Oldest dropped when exceeded. |
| x-max-priority | number | Enables priority queue. Value is the max priority level (1–255). |
| x-queue-mode | "default" \| "lazy" | lazy keeps messages on disk, reduces memory usage. |
| x-expires | number | Milliseconds before an unused queue is deleted. |
| x-overflow | "drop-head" \| "reject-publish" | Behaviour when x-max-length is exceeded. |
await this.createQueue("orders.processing", "order.processing", { durable: true }, {
"x-dead-letter-exchange": "orders",
"x-dead-letter-routing-key": "order.failed",
"x-message-ttl": 30000,
"x-max-length": 10000,
"x-overflow": "reject-publish",
});Publish Options
Passed as the second argument to producer.publish() and producer.publishWithConfirm().
await producer.publish(data, publishOptions)
await producer.publishWithConfirm(data, publishOptions)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| persistent | boolean | true | Message survives broker restart. |
| priority | number | — | Message priority (requires x-max-priority on queue). |
| expiration | string | — | Milliseconds as string before message expires. |
| correlationId | string | — | Used to correlate RPC responses with requests. |
| replyTo | string | — | Queue name for RPC reply. |
| messageId | string | — | Application-level message identifier. |
| timestamp | number | Date.now() | Unix timestamp of when the message was created. |
| contentType | string | "application/json" | MIME type of the message body. |
| headers | Record<string, any> | — | Arbitrary message headers. Used for headers exchange routing. |
await producer.publish({ orderId: "ORD-1" }, {
persistent: true,
priority: 5,
expiration: "60000",
messageId: "msg-001",
headers: {
"x-source": "order-service",
"x-region": "us",
},
});Producer Options
Passed as the fourth argument to RabbitProducer constructor.
new RabbitProducer(connInstance, exchangeName, routingKey, options)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| maxBufferSize | number | 10000 | Max messages to buffer when connection is down. |
| serializer | ISerializer | JsonSerializer | Serializer to use for message encoding. |
const producer = new RabbitProducer(handler, "orders", "order.created", {
maxBufferSize: 5000,
serializer: new MsgpackSerializer(),
});Consume Options
Passed as the third argument to consumer.consume().
await consumer.consume(queueName, handler, consumeOptions)| Option | Type | Default | Description |
|--------|------|---------|-------------|
| prefetchCount | number | 1 | Max unacknowledged messages held by this consumer at a time. |
| retryLimit | number | 3 | Max retry attempts before acking and skipping (only when dlx: false). |
| dlx | boolean | false | If true, failed messages are nacked immediately to DLX. If false, retried with exponential backoff. |
| serializer | ISerializer | JsonSerializer | Override default serializer for this consume call. Consumer still auto-detects from contentType first. |
// with DLX
await consumer.consume("orders.processing", async (data) => {
await processOrder(data);
}, {
prefetchCount: 1,
dlx: true,
});
// with retry backoff
await consumer.consume("orders.created", async (data) => {
await processOrder(data);
}, {
prefetchCount: 5,
retryLimit: 5,
dlx: false,
});