@bimetal/broker-rabbitmq
v0.17.0
Published
RabbitMQ adapter for @bimetal/broker. Implements the Broker<TMessage> interface with amqplib. Native pattern support (Topic-Exchange).
Downloads
1,362
Maintainers
Readme
@bimetal/broker-rabbitmq
RabbitMQ-Adapter für @bimetal/broker. Implementiert das Broker<TMessage>-Interface mit amqplib und einem Topic-Exchange. Native Pattern-Subscriptions, dynamische subscribe-Operationen, Pub/Sub als Default.
Installation
npm install @bimetal/broker @bimetal/broker-rabbitmq amqplibQuick Start
import { createRabbitMQBroker } from '@bimetal/broker-rabbitmq';
const broker = createRabbitMQBroker<{ id: string; title: string }>({
url: 'amqp://localhost:5672',
exchange: 'bimetal',
});
// Subscribes sind dynamisch — kein start()-Phase nötig (anders als Kafka).
broker.subscribe('bimetal.calendar.CalendarEventCreated', async (event) => {
console.log('exact:', event);
});
// Native RabbitMQ-Wildcards: * = ein Segment, # = mehrere.
broker.subscribePattern!('bimetal.calendar.*', async (event, topic) => {
console.log(`pattern (${topic}):`, event);
});
await broker.publish('bimetal.calendar.CalendarEventCreated', {
id: 'evt-1',
title: 'Sprint Planning',
});
await broker.close();Capability-Manifest
{
patterns: true, // Topic-Exchange routet * und # nativ
replay: false, // RabbitMQ liefert keine History an neue Subscriber
ordering: 'per-topic', // FIFO innerhalb einer Queue
}Im Vergleich zu @bimetal/broker-kafka:
| Aspekt | RabbitMQ | Kafka |
|---|---|---|
| patterns | true (Topic-Exchange) | false |
| replay | false (statisch) | Konstruktor-Option (Default false) |
| Lifecycle | Dynamische Subscribes jederzeit | Explizite start()-Phase, subscribes vor start |
| Pub/Sub-Default | Natürlich (anonyme Queues pro Subscribe) | Über eindeutige groupId pro Instanz |
Pub/Sub vs Load-Balancing (kritisch!)
RabbitMQ macht Pub/Sub natürlich. Jeder subscribe()-Aufruf legt eine eigene anonyme, exclusive, auto-delete Queue an, die mit dem entsprechenden Routing-Key an den Topic-Exchange gebunden wird. Jede solche Queue bekommt eine eigene Kopie jeder matchenden Message.
Das ist das umgekehrte Default-Verhalten zu Kafka. Wer aus dem Kafka-Background kommt und erwartet, dass mehrere Subscriber sich Messages teilen, wird hier überrascht: bei RabbitMQ bekommt jeder Subscriber alles.
Wer Load-Balancing-Semantik braucht (Messages werden auf Consumer verteilt), muss eine geteilte Named Queue konfigurieren — kommt in einer späteren Adapter-Variante oder via eigener AMQP-Logik daneben.
Async-Vertrag: Confirm-Channel default
publish() läuft per Default über einen Confirm-Channel und wartet auf Broker-Ack. Das erfüllt den @bimetal/broker-Async-Vertrag („publish resolved, sobald Zustellung angenommen") strikt: Promise resolved erst nach Broker-Confirmation.
// Default — sicher aber langsamer:
const broker = createRabbitMQBroker({ url, exchange: 'bimetal' });
// Fire-and-forget — schneller aber Verlust-Risiko:
const broker = createRabbitMQBroker({ url, exchange: 'bimetal', confirms: false });confirms: false analog acks: 0 beim Kafka-Adapter — Messages können bei Broker-/Netzwerk-Problemen verloren gehen.
Persistente Messages
const broker = createRabbitMQBroker({
url, exchange: 'bimetal',
persistentMessages: true, // Default
});persistent: true weist RabbitMQ an, Messages auf Disk zu schreiben. Wirkt nur in Kombination mit durable Queues/Exchanges — der Adapter assertiert den Exchange als durable: true, aber die anonymen Subscribe-Queues sind bewusst nicht durable (sie sind exclusive und auto-delete für Pub/Sub-Default).
Ack-Semantik
Der Adapter nutzt manuelle Acks (noAck: false). Eine Message wird erst geack-ed, nachdem der zugehörige Handler erfolgreich durchgelaufen ist (sync oder async). Bei werfendem oder rejectendem Handler:
onErrorwird gerufen (mittopicund ggf.pattern)- Die Message wird trotzdem geack-ed (statt requeue), damit es keine endless-redelivery-Schleifen gibt
Wer Dead-Letter-Routing braucht, baut das via onError-Callback + separater Publish auf einen DLX-Topic.
Wildcards: 1:1 zu RabbitMQ
Die Topic-Convention aus @bimetal/event-sourcing (bimetal.<domain>.<EventName>) trifft direkt auf RabbitMQ-Topic-Routing:
| Pattern | Matcht |
|---|---|
| bimetal.# | Alle Bimetal-Events |
| bimetal.calendar.* | Direkte Calendar-Events (kein tieferes Nesting) |
| bimetal.calendar.CalendarEventCreated | Exakt dieser Typ |
| bimetal.*.CalendarEventCreated | Alle Domänen mit diesem Event-Namen |
Das ist nicht emuliert wie beim In-Process-Default — RabbitMQ routet selbst, der Adapter reicht das Pattern unverändert weiter.
Lifecycle
const broker = createRabbitMQBroker(opts);
broker.subscribe('topic-a', handlerA); // sofort verfügbar
await broker.publish('topic-a', { id: 'x' }); // löst Connection + Channel-Setup aus
broker.subscribe('topic-b', handlerB); // dynamisch — kein restart nötig
broker.subscribePattern!('topic.#', handlerC);
await broker.close(); // cancel-t alle subs, schließt channels + connectionKeine explizite start()-Methode. Dynamisches Subscribe ist RabbitMQ-typisch — auch nach dem ersten Message-Flow können weitere Subscriber dazukommen.
Was dieser Adapter NICHT macht
- Keine Named/Durable Queues für Subscribes (nur anonyme exclusive).
- Kein Load-Balancing-Modus (Pub/Sub-Only).
- Kein Retry, kein Dead-Letter-Topic — extern via
onError. - Kein RPC-Pattern, kein Request/Reply.
- Kein Backpressure-Handling bei normalem Channel (
confirms: false) —publish()kannfalsereturnen bei Buffer-Voll, wird (für jetzt) nicht aktiv behandelt. - Keine Exchange-Bindings über die Adapter-Konfiguration hinaus (z. B. Exchange-zu-Exchange-Routing).
Diese Auslassungen sind bewusst: 11f validiert das Capability-Modell mit nativer Pattern-Unterstützung. Erweiterungen kommen in dedizierten Slices.
