npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

pp-command-bus

v2.0.2

Published

Distributed Command Bus library with Redis Streams transport, MessagePack serialization, and RPC via LPUSH/BRPOP for DragonflyDB

Readme

PP Command Bus

Rozproszona biblioteka Command Bus oparta na Redis Streams z serializacją MessagePack i RPC przez LPUSH/BRPOP. Zoptymalizowana pod DragonflyDB.

Spis treści

Opis

pp-command-bus to biblioteka do obsługi rozproszonych komend zgodna ze wzorcem CQRS. Zapewnia:

  • Fire-and-forget dispatch — komendy wysyłane przez XADD do Redis Streams
  • Batch dispatch — wiele komend w jednym pipeline Redis
  • RPC (request/response) — synchroniczne wywołania z timeout przez LPUSH/BRPOP
  • Consumer Groups — dokładnie jedno przetworzenie komendy (exactly-once delivery)
  • Dead Letter Queue — wiadomości po przekroczeniu prób trafiają do dlq:{stream}
  • Auto-recovery — automatyczne przejmowanie stalled wiadomości przez XPENDING/XCLAIM
  • MessagePack — binarna serializacja z natywną obsługą Date (extension type)

Architektura

Diagram komponentów

┌─────────────────────────────────────────────────────────────┐
│                        CommandBus                           │
│  dispatch() | dispatchBatch() | call() | handle() | close() │
└────────────────────────────┬────────────────────────────────┘
                             │
                    ┌────────┴────────┐
                    │   ITransport    │
                    └────────┬────────┘
                             │
┌────────────────────────────┴────────────────────────────────┐
│                  RedisStreamsTransport                       │
│                    (fasada / kompozycja)                     │
├─────────────┬──────────────┬──────────────┬─────────────────┤
│             │              │              │                  │
│ ┌───────────┴─┐ ┌─────────┴──────┐ ┌────┴─────┐ ┌─────────┴──┐
│ │   Stream    │ │    Stream      │ │   Rpc    │ │  Pending   │
│ │  Producer   │ │   Consumer     │ │  Handler │ │  Recovery  │
│ │  (XADD)    │ │  (koordynator) │ │ (BRPOP)  │ │ (XCLAIM)   │
│ └─────────────┘ └───────┬────────┘ └──────────┘ └────────────┘
│                         │                                    │
│              ┌──────────┴──────────┐                         │
│              │                     │                         │
│     ┌────────┴─────────┐ ┌────────┴────────┐                │
│     │    Message       │ │   Consumer      │                │
│     │   Processor      │ │     Loop        │                │
│     │ (parse/XACK/RPC) │ │ (XREADGROUP)    │                │
│     └──────────────────┘ └─────────────────┘                │
├─────────────────────────────────────────────────────────────┤
│                    Warstwa połączeń                          │
│  ┌─────────────────────┐  ┌──────────────────────┐          │
│  │ RedisConnectionPool │  │  RpcConnectionPool   │          │
│  │ (round-robin, eager)│  │ (bounded, lazy, BRPOP)│          │
│  └─────────────────────┘  └──────────────────────┘          │
├─────────────────────────────────────────────────────────────┤
│                    Warstwa serializacji                      │
│  ┌─────────────────────┐  ┌──────────────────────┐          │
│  │  MsgpackSerializer  │  │     RedisCodec       │          │
│  │ (Date extension)    │  │   (base64 encode)    │          │
│  └─────────────────────┘  └──────────────────────┘          │
└─────────────────────────────────────────────────────────────┘

Segregacja interfejsów (ISP)

interface IStreamProducer {
  enqueue(streamName: string, data: Buffer): Promise<string>;
  enqueueBatch(entries: Array<{ streamName: string; data: Buffer }>): Promise<string[]>;
}

interface IStreamConsumer {
  consume(streamName: string, groupName: string, handler: ConsumerHandler): Promise<void>;
}

interface IRpcTransport {
  rpcCall(commandName: string, data: Buffer, timeout: number): Promise<Buffer>;
  rpcRespond(responseKey: string, data: Buffer, ttl: number): Promise<void>;
}

interface IClosable {
  close(): Promise<void>;
}

// Pełny interfejs transportu — kompozycja segregowanych interfejsów
interface ITransport extends IStreamProducer, IStreamConsumer, IRpcTransport, IClosable {}

Instalacja

npm install pp-command-bus

Wymagania

  • Node.js >= 18
  • Redis >= 6.2 lub DragonflyDB >= 1.0
  • TypeScript >= 5.0 (opcjonalnie, pełne typowanie)

Zależności

| Pakiet | Opis | |--------|------| | ioredis | Klient Redis z pipelining, Cluster, Sentinel | | @msgpack/msgpack | Binarna serializacja z extension types |

Szybki start

Definiowanie komendy

import { Command } from 'pp-command-bus';

class CreateUserCommand extends Command<{ name: string; email: string }> {
  constructor(payload: { name: string; email: string }) {
    super(payload);
  }
}

Konfiguracja i inicjalizacja

import { CommandBus, CommandBusConfig } from 'pp-command-bus';

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  logger: console,
});

const bus = new CommandBus(config);

Rejestracja handlera (worker)

bus.handle(CreateUserCommand, async (command) => {
  const { name, email } = command.__payload;
  // Logika biznesowa
  await userService.create({ name, email });
  return { userId: '123' };
});

Wysyłanie komendy (fire-and-forget)

const cmd = new CreateUserCommand({ name: 'Jan Kowalski', email: '[email protected]' });
await bus.dispatch(cmd);

Wysyłanie batch

const commands = [
  new CreateUserCommand({ name: 'Anna Nowak', email: '[email protected]' }),
  new CreateUserCommand({ name: 'Piotr Wiśniewski', email: '[email protected]' }),
];
await bus.dispatchBatch(commands);

Wywołanie RPC (request/response)

const result = await bus.call<{ userId: string }>(cmd, 5000); // timeout 5s
console.log(result.userId); // '123'

Zamykanie

await bus.close(); // Graceful shutdown — czeka na aktywne zadania

API

CommandBus

| Metoda | Opis | |--------|------| | dispatch(command) | Wysyła komendę fire-and-forget (XADD) | | dispatchBatch(commands) | Wysyła wiele komend w jednym pipeline | | call<T>(command, timeout?) | RPC — wysyła i czeka na wynik (XADD + BRPOP). Timeout per wywołanie w ms (domyślnie 30000) | | handle(CommandClass, handler) | Rejestruje handler dla komendy | | close() | Graceful shutdown — zamyka połączenia i czeka na aktywne zadania |

Command<T>

Bazowa klasa abstrakcyjna dla komend:

| Pole | Typ | Opis | |------|-----|------| | __name | string | Nazwa komendy (nazwa klasy) | | __id | string | UUID v4 | | __time | number | Timestamp utworzenia (ms) | | __payload | T | Dane biznesowe komendy |

Konfiguracja

Zmienne środowiskowe

| Zmienna | Domyślna | Opis | |---------|----------|------| | REDIS_URL | redis://localhost:6379 | URL połączenia Redis/DragonflyDB | | REDIS_RETRY_DELAY | 5000 | Opóźnienie między próbami reconnect (ms) | | REDIS_MAX_RETRIES | 0 | Maks. prób reconnect do Redis (0 = nieskończoność) | | LOG_LEVEL | log | Poziom logowania (debug, log, warn, error) | | COMMAND_BUS_CONCURRENCY | 60 × CPU | Maks. równoległych wiadomości per konsument (I/O-bound: default, CPU-bound: availableParallelism()) | | COMMAND_BUS_MAX_ATTEMPTS | 3 | Maks. prób przetworzenia wiadomości | | COMMAND_BUS_LOG | (puste) | Ścieżka do katalogu logów komend | | COMMAND_BUS_POOL_SIZE | 2 × CPU | Rozmiar puli połączeń Redis | | COMMAND_BUS_MAX_CONCURRENT_RPC | 50 | Maks. równoległych wywołań RPC | | COMMAND_BUS_BATCH_SIZE | 100 | Wiadomości pobieranych w jednym XREADGROUP (mniej roundtripów = wyższy throughput) | | COMMAND_BUS_CLAIM_TIMEOUT | 30000 | Czas (ms) po którym stalled wiadomość jest przejmowana | | COMMAND_BUS_MAX_RETAINED | 10000 | Maks. wiadomości w strumieniu (XTRIM ~) |

Konfiguracja programowa

const config = new CommandBusConfig({
  redisUrl: 'redis://localhost:6379',
  logger: console,
  logLevel: 'log',                // debug | log | warn | error
  redisRetryDelay: 5000,          // Opóźnienie reconnect (ms)
  redisMaxRetries: 0,             // 0 = nieskończoność
  concurrency: 960,               // I/O-bound: 60 × CPU, CPU-bound: availableParallelism()
  poolSize: 32,                   // Rozmiar puli połączeń Redis
  maxConcurrentRpc: 100,          // Maks. równoległych BRPOP
  batchSize: 100,                 // Wiadomości per XREADGROUP
  claimTimeout: 60000,            // Stalled message timeout (ms)
  maxRetained: 50000,             // XTRIM ~ limit
  maxAttempts: 5,                 // Maks. prób przetworzenia
});

Flow danych

1. Dispatch (fire-and-forget)

Producer                          Redis                          Worker
   │                                │                              │
   │  1. serialize(command)         │                              │
   │     → MessagePack encode      │                              │
   │                                │                              │
   │  2. RedisCodec.encode(buffer) │                              │
   │     → base64 string           │                              │
   │                                │                              │
   │  3. XADD cmd:CreateUser * ──→ │ stream: cmd:CreateUser       │
   │     data <base64>             │ ┌─────────────────────┐      │
   │                                │ │ msg-1: data=<b64>  │      │
   │                                │ └─────────────────────┘      │
   │                                │                              │
   │                                │ ←── 4. XREADGROUP GROUP     │
   │                                │     workers consumer-1      │
   │                                │     COUNT 100 BLOCK 5000    │
   │                                │     STREAMS cmd:CreateUser > │
   │                                │                              │
   │                                │ ──→ [msg-1, [data, <b64>]]  │
   │                                │                              │
   │                                │     5. RedisCodec.decode     │
   │                                │     6. deserialize(buffer)   │
   │                                │     7. handler(command)      │
   │                                │                              │
   │                                │ ←── 8. XACK cmd:CreateUser  │
   │                                │     workers msg-1            │
   │                                │                              │
   │                                │ ←── 9. XTRIM cmd:CreateUser │
   │                                │     MAXLEN ~ 10000          │
   │                                │     (co batchSize wiadomości)│

2. RPC (request/response)

Caller                            Redis                          Worker
   │                                │                              │
   │  1. serialize(command)         │                              │
   │  2. Wrap w RpcEnvelope:       │                              │
   │     { commandData, rpc: {     │                              │
   │       correlationId,          │                              │
   │       responseKey             │                              │
   │     }}                        │                              │
   │                                │                              │
   │  3. XADD cmd:GetUser * ────→ │ stream: cmd:GetUser           │
   │     data <envelope>           │ ┌─────────────────────┐      │
   │     rpc 1                     │ │ msg-1: data=<env>   │      │
   │                                │ │        rpc=1        │      │
   │  4. BRPOP rpc:res:{uuid} ──→ │ └─────────────────────┘      │
   │     timeout 30s               │                              │
   │     (dedykowane połączenie    │ ←── 5. XREADGROUP            │
   │      z RpcConnectionPool)     │                              │
   │                                │     6. Wykryj marker rpc=1   │
   │                                │     7. Rozpakuj envelope     │
   │                                │     8. handler(commandData)  │
   │                                │     9. serialize({result})   │
   │                                │                              │
   │                                │ ←── 10. LPUSH rpc:res:{uuid}│
   │                                │         <result>             │
   │                                │     EXPIRE rpc:res:{uuid} 60│
   │                                │                              │
   │ ←── result z BRPOP            │                              │
   │  11. deserialize(result)      │                              │
   │  12. DEL rpc:res:{uuid}      │                              │

3. Recovery + Dead Letter Queue

PendingRecovery                   Redis
   │                                │
   │  (setInterval co claimTimeout) │
   │                                │
   │  1. XPENDING cmd:CreateUser ─→│ Pending entries:
   │     workers - + 10            │ [msg-5, consumer-1,
   │                                │  idle: 45000, delivery: 2]
   │                                │
   │  if idle > claimTimeout       │
   │  AND delivery < maxAttempts:  │
   │                                │
   │  2. XCLAIM cmd:CreateUser ──→ │ Przejęcie wiadomości
   │     workers consumer-2        │
   │     30000 msg-5               │
   │                                │
   │  3. processMessage(msg-5)     │ → handler() → XACK
   │                                │
   │  if delivery >= maxAttempts:  │
   │                                │
   │  4. XCLAIM msg-5 ───────────→ │
   │  5. XADD dlq:cmd:CreateUser → │ Dead Letter Queue
   │     * ...fields               │ ┌──────────────────────┐
   │     original_stream           │ │ msg: data + metadata │
   │     cmd:CreateUser            │ │ original_stream      │
   │     delivery_count 3          │ │ delivery_count: 3    │
   │  6. XACK cmd:CreateUser ────→ │ └──────────────────────┘
   │     workers msg-5             │

Komponenty

RedisStreamsTransport (fasada)

Kompozycja 4 komponentów implementująca ITransport:

| Komponent | Odpowiedzialność | Komendy Redis | |-----------|------------------|---------------| | StreamProducer | Enqueue / enqueueBatch | XADD, pipeline | | StreamConsumer | Cykl życia konsumenta, deduplicacja | XGROUP CREATE | | RpcHandler | Request/response RPC | XADD, BRPOP, LPUSH, DEL | | PendingRecovery | Automatyczny retry + DLQ | XPENDING, XCLAIM, XACK |

StreamConsumer (koordynator)

StreamConsumer komponuje dwa podkomponenty:

| Podkomponent | Odpowiedzialność | LOC | |--------------|------------------|-----| | MessageProcessor | Parsowanie fields, walidacja, RPC detection, handler, XACK/XTRIM | ~147 | | ConsumerLoop | while(running) loop, XREADGROUP BLOCK, concurrency limiter, backoff | ~120 | | StreamConsumer | Koordynacja: compose, deduplicacja (Map z TTL sweep), lifecycle | ~188 |

Pule połączeń

| Pula | Wzorzec | Tworzenie | Przeznaczenie | |------|---------|-----------|---------------| | RedisConnectionPool | Round-robin, eager | Przy starcie (size połączeń) | Operacje nieblokujące: XADD, XACK, DEL, pipeline | | RpcConnectionPool | Bounded, lazy | Przy acquire() (max maxSize) | Operacje blokujące: BRPOP |

RedisConnectionPool dodatkowo udostępnia createDedicated() — tworzy izolowane połączenie poza pulą, używane przez ConsumerLoop (XREADGROUP BLOCK blokuje socket).

Serializacja

| Komponent | Warstwa | Opis | |-----------|---------|------| | MsgpackSerializer | Aplikacja | MessagePack z Date extension type — 1 krok zamiast 7 | | RedisCodec | Transport | Base64 encode/decode — chroni binarne dane przed korupcją UTF-8 przez ioredis |

Concurrency Limiter (ConsumerLoop)

                ┌─ slot 1: handler(msg-1) ──→ done → slot wolny
XREADGROUP ────→├─ slot 2: handler(msg-2) ──→ done → slot wolny
(COUNT=N)       └─ slot 3: handler(msg-3) ──→ (trwa...)
                                                │
                    czekam (Promise.race)  ←────┘
                    aż zwolni się slot
  • concurrency kontroluje liczbę slotów (równoległych handlerów)
  • batchSize kontroluje ile wiadomości czytanych na raz
  • Dostępne sloty = concurrency - active.size
  • Nowy batch = Math.min(batchSize, availableSlots)

Deduplicacja (StreamConsumer)

Map<string, number> — messageId → timestamp:

  • Przed przetworzeniem: sweepStaleProcessing() usuwa wpisy starsze niż staleThreshold (5 min)
  • Duplikat: processing.has(messageId) → return (skip)
  • Po przetworzeniu: processing.delete(messageId)

Chroni przed podwójnym przetworzeniem gdy PendingRecovery przejmuje wiadomość która jest jeszcze aktywna w consumer loop.

XTRIM (MessageProcessor)

Throttled per-stream: counter wiadomości na strumień, co batchSize wiadomości pipeline XACK + XTRIM ~ maxRetained.

Strumień A: [1] [2] [3] ... [10] → pipeline XACK + XTRIM → reset counter
Strumień B: [1] [2] [3]          → zwykły XACK (nie osiągnął batchSize)

Graceful Shutdown

close():
  1. consumer.running = false        → propaguje do wszystkich ConsumerLoop
  2. recovery.stop()                 → clearInterval
  3. conn.disconnect()               → przerywa XREADGROUP BLOCK
  4. await consumerLoopPromises      → czeka na aktywne handlery
  5. pool.close()                    → quit() na connection pool
  6. rpcPool.close()                 → quit() available, disconnect() borrowed

Struktura projektu

src/
├── index.ts                              # Eksporty publiczne
├── command-bus/
│   ├── index.ts                          # CommandBus (fasada wyższego poziomu)
│   ├── command.ts                        # Bazowa klasa Command<T>
│   ├── config/
│   │   └── command-bus-config.ts         # Konfiguracja (env + params)
│   ├── transport/
│   │   ├── transport.interface.ts        # Segregowane interfejsy ISP
│   │   ├── redis-streams-transport.ts    # Fasada transportu (kompozycja 4 komponentów)
│   │   ├── stream-producer.ts            # XADD enqueue / enqueueBatch
│   │   ├── stream-consumer.ts            # Koordynator: lifecycle, dedup, compose
│   │   ├── message-processor.ts          # Parse, validate, XACK/XTRIM, RPC detect
│   │   ├── consumer-loop.ts             # XREADGROUP loop, concurrency limiter
│   │   ├── rpc-handler.ts               # BRPOP/LPUSH request/response
│   │   ├── pending-recovery.ts          # XPENDING/XCLAIM + Dead Letter Queue
│   │   └── redis-codec.ts              # Base64 encode/decode
│   ├── serialization/
│   │   ├── serializer.interface.ts       # ISerializer
│   │   └── msgpack-serializer.ts         # MessagePack z Date extension
│   ├── logging/
│   │   └── command-logger.ts             # Opcjonalny logger komend do plików
│   └── types/
│       └── index.ts                      # Typy (CommandPayload, CommandHandler)
├── shared/
│   ├── types.ts                          # ILogger, TDict, TCallableAsync
│   ├── redis/
│   │   ├── connection-pool.ts            # Round-robin eager pool
│   │   ├── rpc-connection-pool.ts        # Bounded lazy pool (BRPOP)
│   │   └── redis-error-formatter.ts      # Formatowanie błędów Redis
│   ├── logging/
│   │   ├── logger.ts                     # Logger implementacja
│   │   └── log-level.ts                 # Poziomy logowania
│   └── utils/
│       └── error-utils.ts               # getErrorMessage()
└── examples/
    ├── rpc.demo.ts                       # Demo RPC call
    └── rpc-throughput.demo.ts            # Demo throughput

Testowanie

# Wszystkie testy
npm test

# Z coverage
npm run test:coverage

# Lint
npm run lint

# Format
npm run format:check

Architektura testów

| Typ | Pliki | Opis | |-----|-------|------| | Unit | *.spec.ts per komponent | Izolowane testy każdego komponentu | | Integracja | redis-streams-transport.spec.ts | Testy fasady z mockami | | Integracja | command-bus.spec.ts | Testy CommandBus end-to-end z mockami |

Pokrycie: 193 testów, zero timing-dependent setTimeout, jest.useFakeTimers() dla backoff.

Changelog

Pełny changelog dostępny w CHANGELOG.md.