@abinashpatri/rabbitmq
v1.0.1
Published
Production-grade RabbitMQ event utility library
Maintainers
Readme
@abinashpatri/rabbitmq
Production-grade RabbitMQ event utility library for TypeScript and JavaScript services.
Highlights
- RabbitMQ-only API surface.
- Durable exchanges/queues with configurable retry and DLQ topology.
- Confirm-channel publishing for safer delivery semantics.
- Non-blocking retries using delayed redelivery via TTL + dead-letter routing.
- Scoped clients for multi-service, multi-tenant, and test isolation.
- CJS + ESM + bundled
.d.tstypes.
Install
npm install @abinashpatri/rabbitmqCompatibility
- Node.js:
>=18 - Runtime: CommonJS and ESM
- Types: bundled TypeScript declarations
Import Patterns
// Root namespace import
import { rabbitMQ } from "@abinashpatri/rabbitmq";
// Subpath import
import * as rabbitMQApi from "@abinashpatri/rabbitmq/rabbitMQ";Quick Start (TypeScript)
import { rabbitMQ } from "@abinashpatri/rabbitmq";
type InvoiceCreatedEvent = {
eventId: string;
invoiceId: string;
customerId: string;
amount: number;
};
await rabbitMQ.connect({
url: "amqp://localhost",
reconnect: {
enabled: true,
baseDelayMs: 1000,
maxDelayMs: 30000,
jitterMs: 250,
},
});
await rabbitMQ.publish<InvoiceCreatedEvent>({
exchange: "billing.events",
routingKey: "invoice.created",
messageId: "evt_10",
type: "invoice.created",
appId: "billing-service",
message: {
eventId: "evt_10",
invoiceId: "inv_1",
customerId: "cust_1",
amount: 42,
},
});
const consumer = await rabbitMQ.consume<InvoiceCreatedEvent>({
exchange: "billing.events",
queue: "notifications.invoice.created",
routingKey: "invoice.created",
prefetch: 20,
retryLimit: 5,
retryBaseDelayMs: 500,
retryBackoffMultiplier: 2,
retryMaxDelayMs: 45000,
retryJitterMs: 250,
handler: async (event) => {
console.log("processing invoice", event.invoiceId);
},
});
process.on("SIGTERM", async () => {
await consumer.disconnect();
await rabbitMQ.disconnect();
process.exit(0);
});Quick Start (JavaScript)
const { rabbitMQ } = require("@abinashpatri/rabbitmq");
async function run() {
await rabbitMQ.connect("amqp://localhost");
await rabbitMQ.publish({
exchange: "shipping.events",
routingKey: "shipment.created",
messageId: "evt_200",
message: {
eventId: "evt_200",
shipmentId: "ship_1",
orderId: "order_1",
},
});
return rabbitMQ.consume({
exchange: "shipping.events",
queue: "tracking.shipment.created",
routingKey: "shipment.created",
retryLimit: 4,
handler: async (event) => {
console.log("tracking:", event.shipmentId);
},
});
}
run().catch(console.error);API Reference
Root export
rabbitMQnamespace
RabbitMQ namespace
rabbitMQ.connect("amqp://...")rabbitMQ.connect({ url, socketOptions?, clientProperties?, reconnect? })rabbitMQ.disconnect()rabbitMQ.publish({ exchange, routingKey?, message, headers?, messageId?, type?, appId?, persistent?, mandatory?, client? })rabbitMQ.consume({ exchange, queue, routingKey?, handler, prefetch?, retryLimit?, retryBaseDelayMs?, retryBackoffMultiplier?, retryMaxDelayMs?, retryJitterMs?, deadLetterExchange?, deadLetterQueue?, deadLetterRoutingKey?, retryExchange?, retryQueue?, retryRoutingKey?, client? })rabbitMQ.createRabbitMQClient()rabbitMQ.createScopedRabbitMQClient()rabbitMQ.getRetryHeaders(msg)rabbitMQ.buildRetryHeaders(retryCount)rabbitMQ.withRetryCount(options, retryCount)
Consumer controls returned by consume():
stop()disconnect()
Reliability Semantics
- Delivery semantics are at-least-once.
- Publishing uses confirm channels (
waitForConfirms) before returning. - Retries use delayed redelivery (message
expiration) through retry queues. - Messages that exceed retry limit are dead-lettered to DLQ exchange/queue.
- Handlers should be idempotent and safe for reprocessing.
Default Topology
For exchange = app.events and queue = worker.q:
- Main exchange:
app.events(topic) - Main queue:
worker.q(durable) - Retry exchange:
app.events.retry(topic) - Retry queue:
worker.q.retry(durable) - DLQ exchange:
app.events.dlq(topic) - DLQ queue:
worker.q.dlq(durable)
All names can be overridden through consume() options.
Operational Guidance
- Set
prefetchbased on downstream capacity (CPU, DB, external APIs). - Keep payloads JSON and include an immutable event identifier.
- Emit metrics for retry count, DLQ volume, consumer lag, and handler latency.
- Use scoped clients when multiple services/tenants run in one process.
- Tune reconnect and retry settings per workload and failure profile.
License
MIT License.
