@babelqueue/rabbitmq
v1.0.0
Published
RabbitMQ adapter for BabelQueue — a canonical-envelope publisher and a URN-routed consumer with the §2 AMQP 0-9-1 property projection (type/correlation_id/message_id + x- headers) over amqplib.
Maintainers
Readme
@babelqueue/rabbitmq
RabbitMQ adapter for BabelQueue — a canonical-envelope publisher and a URN-routed consumer over RabbitMQ (AMQP 0-9-1, amqplib), so a RabbitMQ-based Node service speaks the same wire contract as the PHP, Python, Go, Java and .NET SDKs. Implements §2 of the broker-bindings contract.
The envelope JSON is the message body; the contract fields are projected onto native AMQP 0-9-1
properties so a consumer routes without decoding the body: type = URN, correlation_id =
trace_id, message_id = meta.id, app_id = babelqueue, plus the native-typed
x-schema-version / x-source-lang / x-attempts headers (AMQP field-tables carry typed values —
integers stay integers). Consume is basic.get + manual ack (at-least-once).
Install
npm install @babelqueue/rabbitmq amqplibamqplib is an optional peer — you provide the channel; an amqplib Channel satisfies the
adapter structurally.
Produce
import amqp from "amqplib";
import { RabbitMQPublisher } from "@babelqueue/rabbitmq";
const conn = await amqp.connect("amqp://guest:guest@localhost:5672/");
const channel = await conn.createChannel();
await channel.assertQueue("orders", { durable: true });
const id = await RabbitMQPublisher.create(channel, "orders").publish("urn:babel:orders:created", { order_id: 1042 });publish(urn, data, { traceId? }) returns the message meta.id. Messages are persistent
(delivery_mode = 2).
Consume
import { RabbitMQConsumer, type BabelHandlers } from "@babelqueue/rabbitmq";
const handlers: BabelHandlers = {
"urn:babel:orders:created": (envelope, message) => {
// envelope.data, envelope.trace_id, envelope.attempts ...
},
};
const consumer = new RabbitMQConsumer(channel, "orders", handlers, {
maxTries: 3,
onError: (err) => console.error(err),
});
await consumer.run(() => true); // basic.get → process → ack, until you stop itA successful handler acks the message. A throwing handler republishes the envelope with
attempts + 1 (at-least-once) up to maxTries, then dead-letters to <queue>.dlq with a
dead_letter block. The consumer routes on properties.type (falling back to the body URN).
Unknown-URN strategy is one of fail / delete / release / dead_letter. poll() and
handle(message) are exposed for testing.
Contract mapping (§2)
| Envelope | RabbitMQ (AMQP 0-9-1) |
| :--- | :--- |
| body | message body (the canonical envelope JSON) |
| job (URN) | properties.type (consumer routes on this) |
| trace_id | properties.correlation_id |
| meta.id | properties.message_id |
| — | properties.app_id = babelqueue, content_type = application/json, persistent |
| meta.schema_version | header x-schema-version (number) |
| meta.lang | header x-source-lang |
| attempts | header x-attempts (number; the body owns the count) |
| reserve / ack | basic.get → process → basic.ack |
| retry | republish with attempts + 1 |
| dead-letter | <queue>.dlq + dead_letter block |
The envelope is unchanged (schema_version stays 1); the amqplib channel is replaced with a
fake in the unit suite — no RabbitMQ, no network.
License
MIT
