pp-command-bus
v2.0.2
Published
Distributed Command Bus library with Redis Streams transport, MessagePack serialization, and RPC via LPUSH/BRPOP for DragonflyDB
Maintainers
Readme
PP Command Bus
Rozproszona biblioteka Command Bus oparta na Redis Streams z serializacją MessagePack i RPC przez LPUSH/BRPOP. Zoptymalizowana pod DragonflyDB.
Spis treści
- Opis
- Architektura
- Instalacja
- Szybki start
- API
- Konfiguracja
- Flow danych
- Komponenty
- Struktura projektu
- Testowanie
- Changelog
Opis
pp-command-bus to biblioteka do obsługi rozproszonych komend zgodna ze wzorcem CQRS. Zapewnia:
- Fire-and-forget dispatch — komendy wysyłane przez
XADDdo Redis Streams - Batch dispatch — wiele komend w jednym pipeline Redis
- RPC (request/response) — synchroniczne wywołania z timeout przez
LPUSH/BRPOP - Consumer Groups — dokładnie jedno przetworzenie komendy (exactly-once delivery)
- Dead Letter Queue — wiadomości po przekroczeniu prób trafiają do
dlq:{stream} - Auto-recovery — automatyczne przejmowanie stalled wiadomości przez
XPENDING/XCLAIM - MessagePack — binarna serializacja z natywną obsługą
Date(extension type)
Architektura
Diagram komponentów
┌─────────────────────────────────────────────────────────────┐
│ CommandBus │
│ dispatch() | dispatchBatch() | call() | handle() | close() │
└────────────────────────────┬────────────────────────────────┘
│
┌────────┴────────┐
│ ITransport │
└────────┬────────┘
│
┌────────────────────────────┴────────────────────────────────┐
│ RedisStreamsTransport │
│ (fasada / kompozycja) │
├─────────────┬──────────────┬──────────────┬─────────────────┤
│ │ │ │ │
│ ┌───────────┴─┐ ┌─────────┴──────┐ ┌────┴─────┐ ┌─────────┴──┐
│ │ Stream │ │ Stream │ │ Rpc │ │ Pending │
│ │ Producer │ │ Consumer │ │ Handler │ │ Recovery │
│ │ (XADD) │ │ (koordynator) │ │ (BRPOP) │ │ (XCLAIM) │
│ └─────────────┘ └───────┬────────┘ └──────────┘ └────────────┘
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ ┌────────┴─────────┐ ┌────────┴────────┐ │
│ │ Message │ │ Consumer │ │
│ │ Processor │ │ Loop │ │
│ │ (parse/XACK/RPC) │ │ (XREADGROUP) │ │
│ └──────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Warstwa połączeń │
│ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ RedisConnectionPool │ │ RpcConnectionPool │ │
│ │ (round-robin, eager)│ │ (bounded, lazy, BRPOP)│ │
│ └─────────────────────┘ └──────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Warstwa serializacji │
│ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ MsgpackSerializer │ │ RedisCodec │ │
│ │ (Date extension) │ │ (base64 encode) │ │
│ └─────────────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────┘Segregacja interfejsów (ISP)
interface IStreamProducer {
enqueue(streamName: string, data: Buffer): Promise<string>;
enqueueBatch(entries: Array<{ streamName: string; data: Buffer }>): Promise<string[]>;
}
interface IStreamConsumer {
consume(streamName: string, groupName: string, handler: ConsumerHandler): Promise<void>;
}
interface IRpcTransport {
rpcCall(commandName: string, data: Buffer, timeout: number): Promise<Buffer>;
rpcRespond(responseKey: string, data: Buffer, ttl: number): Promise<void>;
}
interface IClosable {
close(): Promise<void>;
}
// Pełny interfejs transportu — kompozycja segregowanych interfejsów
interface ITransport extends IStreamProducer, IStreamConsumer, IRpcTransport, IClosable {}Instalacja
npm install pp-command-busWymagania
- Node.js >= 18
- Redis >= 6.2 lub DragonflyDB >= 1.0
- TypeScript >= 5.0 (opcjonalnie, pełne typowanie)
Zależności
| Pakiet | Opis |
|--------|------|
| ioredis | Klient Redis z pipelining, Cluster, Sentinel |
| @msgpack/msgpack | Binarna serializacja z extension types |
Szybki start
Definiowanie komendy
import { Command } from 'pp-command-bus';
class CreateUserCommand extends Command<{ name: string; email: string }> {
constructor(payload: { name: string; email: string }) {
super(payload);
}
}Konfiguracja i inicjalizacja
import { CommandBus, CommandBusConfig } from 'pp-command-bus';
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
});
const bus = new CommandBus(config);Rejestracja handlera (worker)
bus.handle(CreateUserCommand, async (command) => {
const { name, email } = command.__payload;
// Logika biznesowa
await userService.create({ name, email });
return { userId: '123' };
});Wysyłanie komendy (fire-and-forget)
const cmd = new CreateUserCommand({ name: 'Jan Kowalski', email: '[email protected]' });
await bus.dispatch(cmd);Wysyłanie batch
const commands = [
new CreateUserCommand({ name: 'Anna Nowak', email: '[email protected]' }),
new CreateUserCommand({ name: 'Piotr Wiśniewski', email: '[email protected]' }),
];
await bus.dispatchBatch(commands);Wywołanie RPC (request/response)
const result = await bus.call<{ userId: string }>(cmd, 5000); // timeout 5s
console.log(result.userId); // '123'Zamykanie
await bus.close(); // Graceful shutdown — czeka na aktywne zadaniaAPI
CommandBus
| Metoda | Opis |
|--------|------|
| dispatch(command) | Wysyła komendę fire-and-forget (XADD) |
| dispatchBatch(commands) | Wysyła wiele komend w jednym pipeline |
| call<T>(command, timeout?) | RPC — wysyła i czeka na wynik (XADD + BRPOP). Timeout per wywołanie w ms (domyślnie 30000) |
| handle(CommandClass, handler) | Rejestruje handler dla komendy |
| close() | Graceful shutdown — zamyka połączenia i czeka na aktywne zadania |
Command<T>
Bazowa klasa abstrakcyjna dla komend:
| Pole | Typ | Opis |
|------|-----|------|
| __name | string | Nazwa komendy (nazwa klasy) |
| __id | string | UUID v4 |
| __time | number | Timestamp utworzenia (ms) |
| __payload | T | Dane biznesowe komendy |
Konfiguracja
Zmienne środowiskowe
| Zmienna | Domyślna | Opis |
|---------|----------|------|
| REDIS_URL | redis://localhost:6379 | URL połączenia Redis/DragonflyDB |
| REDIS_RETRY_DELAY | 5000 | Opóźnienie między próbami reconnect (ms) |
| REDIS_MAX_RETRIES | 0 | Maks. prób reconnect do Redis (0 = nieskończoność) |
| LOG_LEVEL | log | Poziom logowania (debug, log, warn, error) |
| COMMAND_BUS_CONCURRENCY | 60 × CPU | Maks. równoległych wiadomości per konsument (I/O-bound: default, CPU-bound: availableParallelism()) |
| COMMAND_BUS_MAX_ATTEMPTS | 3 | Maks. prób przetworzenia wiadomości |
| COMMAND_BUS_LOG | (puste) | Ścieżka do katalogu logów komend |
| COMMAND_BUS_POOL_SIZE | 2 × CPU | Rozmiar puli połączeń Redis |
| COMMAND_BUS_MAX_CONCURRENT_RPC | 50 | Maks. równoległych wywołań RPC |
| COMMAND_BUS_BATCH_SIZE | 100 | Wiadomości pobieranych w jednym XREADGROUP (mniej roundtripów = wyższy throughput) |
| COMMAND_BUS_CLAIM_TIMEOUT | 30000 | Czas (ms) po którym stalled wiadomość jest przejmowana |
| COMMAND_BUS_MAX_RETAINED | 10000 | Maks. wiadomości w strumieniu (XTRIM ~) |
Konfiguracja programowa
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
logLevel: 'log', // debug | log | warn | error
redisRetryDelay: 5000, // Opóźnienie reconnect (ms)
redisMaxRetries: 0, // 0 = nieskończoność
concurrency: 960, // I/O-bound: 60 × CPU, CPU-bound: availableParallelism()
poolSize: 32, // Rozmiar puli połączeń Redis
maxConcurrentRpc: 100, // Maks. równoległych BRPOP
batchSize: 100, // Wiadomości per XREADGROUP
claimTimeout: 60000, // Stalled message timeout (ms)
maxRetained: 50000, // XTRIM ~ limit
maxAttempts: 5, // Maks. prób przetworzenia
});Flow danych
1. Dispatch (fire-and-forget)
Producer Redis Worker
│ │ │
│ 1. serialize(command) │ │
│ → MessagePack encode │ │
│ │ │
│ 2. RedisCodec.encode(buffer) │ │
│ → base64 string │ │
│ │ │
│ 3. XADD cmd:CreateUser * ──→ │ stream: cmd:CreateUser │
│ data <base64> │ ┌─────────────────────┐ │
│ │ │ msg-1: data=<b64> │ │
│ │ └─────────────────────┘ │
│ │ │
│ │ ←── 4. XREADGROUP GROUP │
│ │ workers consumer-1 │
│ │ COUNT 100 BLOCK 5000 │
│ │ STREAMS cmd:CreateUser > │
│ │ │
│ │ ──→ [msg-1, [data, <b64>]] │
│ │ │
│ │ 5. RedisCodec.decode │
│ │ 6. deserialize(buffer) │
│ │ 7. handler(command) │
│ │ │
│ │ ←── 8. XACK cmd:CreateUser │
│ │ workers msg-1 │
│ │ │
│ │ ←── 9. XTRIM cmd:CreateUser │
│ │ MAXLEN ~ 10000 │
│ │ (co batchSize wiadomości)│2. RPC (request/response)
Caller Redis Worker
│ │ │
│ 1. serialize(command) │ │
│ 2. Wrap w RpcEnvelope: │ │
│ { commandData, rpc: { │ │
│ correlationId, │ │
│ responseKey │ │
│ }} │ │
│ │ │
│ 3. XADD cmd:GetUser * ────→ │ stream: cmd:GetUser │
│ data <envelope> │ ┌─────────────────────┐ │
│ rpc 1 │ │ msg-1: data=<env> │ │
│ │ │ rpc=1 │ │
│ 4. BRPOP rpc:res:{uuid} ──→ │ └─────────────────────┘ │
│ timeout 30s │ │
│ (dedykowane połączenie │ ←── 5. XREADGROUP │
│ z RpcConnectionPool) │ │
│ │ 6. Wykryj marker rpc=1 │
│ │ 7. Rozpakuj envelope │
│ │ 8. handler(commandData) │
│ │ 9. serialize({result}) │
│ │ │
│ │ ←── 10. LPUSH rpc:res:{uuid}│
│ │ <result> │
│ │ EXPIRE rpc:res:{uuid} 60│
│ │ │
│ ←── result z BRPOP │ │
│ 11. deserialize(result) │ │
│ 12. DEL rpc:res:{uuid} │ │3. Recovery + Dead Letter Queue
PendingRecovery Redis
│ │
│ (setInterval co claimTimeout) │
│ │
│ 1. XPENDING cmd:CreateUser ─→│ Pending entries:
│ workers - + 10 │ [msg-5, consumer-1,
│ │ idle: 45000, delivery: 2]
│ │
│ if idle > claimTimeout │
│ AND delivery < maxAttempts: │
│ │
│ 2. XCLAIM cmd:CreateUser ──→ │ Przejęcie wiadomości
│ workers consumer-2 │
│ 30000 msg-5 │
│ │
│ 3. processMessage(msg-5) │ → handler() → XACK
│ │
│ if delivery >= maxAttempts: │
│ │
│ 4. XCLAIM msg-5 ───────────→ │
│ 5. XADD dlq:cmd:CreateUser → │ Dead Letter Queue
│ * ...fields │ ┌──────────────────────┐
│ original_stream │ │ msg: data + metadata │
│ cmd:CreateUser │ │ original_stream │
│ delivery_count 3 │ │ delivery_count: 3 │
│ 6. XACK cmd:CreateUser ────→ │ └──────────────────────┘
│ workers msg-5 │Komponenty
RedisStreamsTransport (fasada)
Kompozycja 4 komponentów implementująca ITransport:
| Komponent | Odpowiedzialność | Komendy Redis |
|-----------|------------------|---------------|
| StreamProducer | Enqueue / enqueueBatch | XADD, pipeline |
| StreamConsumer | Cykl życia konsumenta, deduplicacja | XGROUP CREATE |
| RpcHandler | Request/response RPC | XADD, BRPOP, LPUSH, DEL |
| PendingRecovery | Automatyczny retry + DLQ | XPENDING, XCLAIM, XACK |
StreamConsumer (koordynator)
StreamConsumer komponuje dwa podkomponenty:
| Podkomponent | Odpowiedzialność | LOC | |--------------|------------------|-----| | MessageProcessor | Parsowanie fields, walidacja, RPC detection, handler, XACK/XTRIM | ~147 | | ConsumerLoop | while(running) loop, XREADGROUP BLOCK, concurrency limiter, backoff | ~120 | | StreamConsumer | Koordynacja: compose, deduplicacja (Map z TTL sweep), lifecycle | ~188 |
Pule połączeń
| Pula | Wzorzec | Tworzenie | Przeznaczenie |
|------|---------|-----------|---------------|
| RedisConnectionPool | Round-robin, eager | Przy starcie (size połączeń) | Operacje nieblokujące: XADD, XACK, DEL, pipeline |
| RpcConnectionPool | Bounded, lazy | Przy acquire() (max maxSize) | Operacje blokujące: BRPOP |
RedisConnectionPool dodatkowo udostępnia createDedicated() — tworzy izolowane połączenie poza pulą, używane przez ConsumerLoop (XREADGROUP BLOCK blokuje socket).
Serializacja
| Komponent | Warstwa | Opis | |-----------|---------|------| | MsgpackSerializer | Aplikacja | MessagePack z Date extension type — 1 krok zamiast 7 | | RedisCodec | Transport | Base64 encode/decode — chroni binarne dane przed korupcją UTF-8 przez ioredis |
Concurrency Limiter (ConsumerLoop)
┌─ slot 1: handler(msg-1) ──→ done → slot wolny
XREADGROUP ────→├─ slot 2: handler(msg-2) ──→ done → slot wolny
(COUNT=N) └─ slot 3: handler(msg-3) ──→ (trwa...)
│
czekam (Promise.race) ←────┘
aż zwolni się slotconcurrencykontroluje liczbę slotów (równoległych handlerów)batchSizekontroluje ile wiadomości czytanych na raz- Dostępne sloty =
concurrency - active.size - Nowy batch =
Math.min(batchSize, availableSlots)
Deduplicacja (StreamConsumer)
Map<string, number> — messageId → timestamp:
- Przed przetworzeniem:
sweepStaleProcessing()usuwa wpisy starsze niżstaleThreshold(5 min) - Duplikat:
processing.has(messageId) → return(skip) - Po przetworzeniu:
processing.delete(messageId)
Chroni przed podwójnym przetworzeniem gdy PendingRecovery przejmuje wiadomość która jest jeszcze aktywna w consumer loop.
XTRIM (MessageProcessor)
Throttled per-stream: counter wiadomości na strumień, co batchSize wiadomości pipeline XACK + XTRIM ~ maxRetained.
Strumień A: [1] [2] [3] ... [10] → pipeline XACK + XTRIM → reset counter
Strumień B: [1] [2] [3] → zwykły XACK (nie osiągnął batchSize)Graceful Shutdown
close():
1. consumer.running = false → propaguje do wszystkich ConsumerLoop
2. recovery.stop() → clearInterval
3. conn.disconnect() → przerywa XREADGROUP BLOCK
4. await consumerLoopPromises → czeka na aktywne handlery
5. pool.close() → quit() na connection pool
6. rpcPool.close() → quit() available, disconnect() borrowedStruktura projektu
src/
├── index.ts # Eksporty publiczne
├── command-bus/
│ ├── index.ts # CommandBus (fasada wyższego poziomu)
│ ├── command.ts # Bazowa klasa Command<T>
│ ├── config/
│ │ └── command-bus-config.ts # Konfiguracja (env + params)
│ ├── transport/
│ │ ├── transport.interface.ts # Segregowane interfejsy ISP
│ │ ├── redis-streams-transport.ts # Fasada transportu (kompozycja 4 komponentów)
│ │ ├── stream-producer.ts # XADD enqueue / enqueueBatch
│ │ ├── stream-consumer.ts # Koordynator: lifecycle, dedup, compose
│ │ ├── message-processor.ts # Parse, validate, XACK/XTRIM, RPC detect
│ │ ├── consumer-loop.ts # XREADGROUP loop, concurrency limiter
│ │ ├── rpc-handler.ts # BRPOP/LPUSH request/response
│ │ ├── pending-recovery.ts # XPENDING/XCLAIM + Dead Letter Queue
│ │ └── redis-codec.ts # Base64 encode/decode
│ ├── serialization/
│ │ ├── serializer.interface.ts # ISerializer
│ │ └── msgpack-serializer.ts # MessagePack z Date extension
│ ├── logging/
│ │ └── command-logger.ts # Opcjonalny logger komend do plików
│ └── types/
│ └── index.ts # Typy (CommandPayload, CommandHandler)
├── shared/
│ ├── types.ts # ILogger, TDict, TCallableAsync
│ ├── redis/
│ │ ├── connection-pool.ts # Round-robin eager pool
│ │ ├── rpc-connection-pool.ts # Bounded lazy pool (BRPOP)
│ │ └── redis-error-formatter.ts # Formatowanie błędów Redis
│ ├── logging/
│ │ ├── logger.ts # Logger implementacja
│ │ └── log-level.ts # Poziomy logowania
│ └── utils/
│ └── error-utils.ts # getErrorMessage()
└── examples/
├── rpc.demo.ts # Demo RPC call
└── rpc-throughput.demo.ts # Demo throughputTestowanie
# Wszystkie testy
npm test
# Z coverage
npm run test:coverage
# Lint
npm run lint
# Format
npm run format:checkArchitektura testów
| Typ | Pliki | Opis |
|-----|-------|------|
| Unit | *.spec.ts per komponent | Izolowane testy każdego komponentu |
| Integracja | redis-streams-transport.spec.ts | Testy fasady z mockami |
| Integracja | command-bus.spec.ts | Testy CommandBus end-to-end z mockami |
Pokrycie: 193 testów, zero timing-dependent setTimeout, jest.useFakeTimers() dla backoff.
Changelog
Pełny changelog dostępny w CHANGELOG.md.
